""" Weibo Account CRUD router. All endpoints require JWT authentication and enforce resource ownership. """ import logging from datetime import datetime from typing import Dict, List import httpx from fastapi import APIRouter, Depends, HTTPException, status, Body from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from shared.models import get_db, Account, SigninLog, User from shared.crypto import encrypt_cookie, decrypt_cookie, derive_key from shared.config import shared_settings from shared.response import success_response, error_response from api_service.app.dependencies import get_current_user from api_service.app.schemas.account import ( AccountCreate, AccountUpdate, AccountResponse, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/accounts", tags=["accounts"]) WEIBO_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ), "Referer": "https://weibo.com/", "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } def _encryption_key() -> bytes: return derive_key(shared_settings.COOKIE_ENCRYPTION_KEY) def _account_to_dict(account: Account) -> dict: return AccountResponse.model_validate(account).model_dump(mode="json") async def _get_owned_account( account_id: str, user: User, db: AsyncSession, ) -> Account: """Fetch an account and verify it belongs to the current user.""" result = await db.execute(select(Account).where(Account.id == account_id)) account = result.scalar_one_or_none() if account is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Account not found") if account.user_id != user.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied") return account # ---- CREATE ---- @router.post("", status_code=status.HTTP_201_CREATED) async def create_account( body: AccountCreate, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): key = _encryption_key() ciphertext, iv = encrypt_cookie(body.cookie, key) account = Account( user_id=user.id, weibo_user_id=body.weibo_user_id, remark=body.remark, encrypted_cookies=ciphertext, iv=iv, status="pending", ) db.add(account) await db.commit() await db.refresh(account) return success_response(_account_to_dict(account), "Account created") # ---- LIST ---- @router.get("") async def list_accounts( page: int = 1, size: int = 12, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): from sqlalchemy import func as sa_func # Total count count_q = select(sa_func.count()).select_from( select(Account).where(Account.user_id == user.id).subquery() ) total = (await db.execute(count_q)).scalar() or 0 # Status counts (for dashboard stats) status_q = ( select(Account.status, sa_func.count()) .where(Account.user_id == user.id) .group_by(Account.status) ) status_rows = (await db.execute(status_q)).all() status_counts = {row[0]: row[1] for row in status_rows} # Paginated list offset = (max(1, page) - 1) * size result = await db.execute( select(Account) .where(Account.user_id == user.id) .order_by(Account.created_at.desc()) .offset(offset) .limit(size) ) accounts = result.scalars().all() total_pages = (total + size - 1) // size if total > 0 else 0 return success_response( { "items": [_account_to_dict(a) for a in accounts], "total": total, "page": page, "size": size, "total_pages": total_pages, "status_counts": status_counts, }, "Accounts retrieved", ) # ---- DETAIL ---- @router.get("/{account_id}") async def get_account( account_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): account = await _get_owned_account(account_id, user, db) return success_response(_account_to_dict(account), "Account retrieved") # ---- UPDATE ---- @router.put("/{account_id}") async def update_account( account_id: str, body: AccountUpdate, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): account = await _get_owned_account(account_id, user, db) if body.remark is not None: account.remark = body.remark if body.cookie is not None: key = _encryption_key() ciphertext, iv = encrypt_cookie(body.cookie, key) account.encrypted_cookies = ciphertext account.iv = iv await db.commit() await db.refresh(account) return success_response(_account_to_dict(account), "Account updated") # ---- DELETE ---- @router.delete("/{account_id}") async def delete_account( account_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): account = await _get_owned_account(account_id, user, db) await db.delete(account) await db.commit() return success_response(None, "Account deleted") # ---- helpers for verify / signin ---- def _parse_cookie_str(cookie_str: str) -> Dict[str, str]: """Parse 'k1=v1; k2=v2' into a dict.""" cookies: Dict[str, str] = {} for pair in cookie_str.split(";"): pair = pair.strip() if "=" in pair: k, v = pair.split("=", 1) cookies[k.strip()] = v.strip() return cookies async def _verify_weibo_cookie(cookie_str: str) -> dict: """ Verify cookie via weibo.com PC API. Uses /ajax/side/cards which returns ok=1 when logged in. Returns {"valid": bool, "uid": str|None, "screen_name": str|None}. """ cookies = _parse_cookie_str(cookie_str) async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: # Step 1: check login via /ajax/side/cards resp = await client.get( "https://weibo.com/ajax/side/cards", params={"count": "1"}, headers=WEIBO_HEADERS, cookies=cookies, ) try: data = resp.json() except Exception: # 非 JSON 响应(可能是 GBK 编码的登录页),视为 Cookie 失效 return {"valid": False, "uid": None, "screen_name": None} if data.get("ok") != 1: return {"valid": False, "uid": None, "screen_name": None} # Step 2: get user info via /ajax/profile/detail uid = None screen_name = None try: resp2 = await client.get( "https://weibo.com/ajax/profile/info", headers=WEIBO_HEADERS, cookies=cookies, ) info = resp2.json() if info.get("ok") == 1: user = info.get("data", {}).get("user", {}) uid = str(user.get("idstr", user.get("id", ""))) screen_name = user.get("screen_name", "") except Exception: pass return {"valid": True, "uid": uid, "screen_name": screen_name} # ---- VERIFY COOKIE ---- @router.post("/{account_id}/verify") async def verify_account( account_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Verify the stored cookie is still valid and update account status.""" account = await _get_owned_account(account_id, user, db) key = _encryption_key() try: cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) except Exception: account.status = "invalid_cookie" await db.commit() await db.refresh(account) return success_response( {**_account_to_dict(account), "cookie_valid": False}, "Cookie decryption failed", ) result = await _verify_weibo_cookie(cookie_str) if result["valid"]: account.status = "active" account.last_checked_at = datetime.utcnow() else: account.status = "invalid_cookie" await db.commit() await db.refresh(account) return success_response( {**_account_to_dict(account), "cookie_valid": result["valid"], "weibo_screen_name": result.get("screen_name")}, "Cookie verified" if result["valid"] else "Cookie is invalid or expired", ) @router.get("/{account_id}/topics") async def list_topics( account_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """获取账号关注的超话列表,供用户勾选签到。""" account = await _get_owned_account(account_id, user, db) key = _encryption_key() try: cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) except Exception: return error_response("Cookie 解密失败", "COOKIE_ERROR", status_code=400) topics = await _get_super_topics(cookie_str, account.weibo_user_id) return success_response({ "topics": topics, "total": len(topics), "selected_topics": account.selected_topics, }) @router.put("/{account_id}/topics") async def save_selected_topics( account_id: str, body: dict = Body(...), user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """保存用户选择的签到超话列表。空列表或 null 表示签到全部。""" account = await _get_owned_account(account_id, user, db) selected = body.get("selected_topics") # null 或空列表都表示全部签到 if selected and isinstance(selected, list) and len(selected) > 0: account.selected_topics = selected else: account.selected_topics = None await db.commit() await db.refresh(account) return success_response( _account_to_dict(account), f"已保存 {len(selected) if selected else 0} 个超话" if selected else "已设为签到全部超话", ) # ---- MANUAL SIGNIN ---- async def _get_super_topics(cookie_str: str, weibo_uid: str = "") -> List[dict]: """ Fetch followed super topics via weibo.com PC API. GET /ajax/profile/topicContent?tabid=231093_-_chaohua Returns list of {"title": str, "containerid": str}. """ import re cookies = _parse_cookie_str(cookie_str) topics: List[dict] = [] async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: # First get XSRF-TOKEN by visiting weibo.com await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) xsrf = client.cookies.get("XSRF-TOKEN", "") headers = { **WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest", } if xsrf: headers["X-XSRF-TOKEN"] = xsrf page = 1 max_page = 10 while page <= max_page: params = {"tabid": "231093_-_chaohua", "page": str(page)} resp = await client.get( "https://weibo.com/ajax/profile/topicContent", params=params, headers=headers, cookies=cookies, ) try: data = resp.json() except Exception: break if data.get("ok") != 1: break topic_list = data.get("data", {}).get("list", []) if not topic_list: break for item in topic_list: title = item.get("topic_name", "") or item.get("title", "") # Extract containerid from oid "1022:100808xxx" or scheme containerid = "" oid = item.get("oid", "") if "100808" in oid: m = re.search(r"100808[0-9a-fA-F]+", oid) if m: containerid = m.group(0) if not containerid: scheme = item.get("scheme", "") m = re.search(r"100808[0-9a-fA-F]+", scheme) if m: containerid = m.group(0) if title and containerid: topics.append({"title": title, "containerid": containerid}) # Check pagination api_max = data.get("data", {}).get("max_page", 1) if page >= api_max: break page += 1 return topics async def _do_signin(cookie_str: str, topic_title: str, containerid: str) -> dict: """ Sign in to a single super topic via weibo.com PC API. GET /p/aj/general/button with full browser-matching parameters. Returns {"status": "success"|"already_signed"|"failed", "message": str}. """ import time as _time cookies = _parse_cookie_str(cookie_str) async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: # Get XSRF-TOKEN await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) xsrf = client.cookies.get("XSRF-TOKEN", "") headers = { **WEIBO_HEADERS, "Referer": f"https://weibo.com/p/{containerid}/super_index", "X-Requested-With": "XMLHttpRequest", } if xsrf: headers["X-XSRF-TOKEN"] = xsrf try: resp = await client.get( "https://weibo.com/p/aj/general/button", params={ "ajwvr": "6", "api": "http://i.huati.weibo.com/aj/super/checkin", "texta": "签到", "textb": "已签到", "status": "0", "id": containerid, "location": "page_100808_super_index", "timezone": "GMT+0800", "lang": "zh-cn", "plat": "Win32", "ua": WEIBO_HEADERS["User-Agent"], "screen": "1920*1080", "__rnd": str(int(_time.time() * 1000)), }, headers=headers, cookies=cookies, ) try: data = resp.json() except Exception: return {"status": "failed", "message": f"非JSON响应: {resp.text[:100]}"} code = str(data.get("code", "")) msg = data.get("msg", "") if code == "100000": tip = "" if isinstance(data.get("data"), dict): tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "") return {"status": "success", "message": tip or "签到成功"} elif code == "382004": return {"status": "already_signed", "message": msg or "今日已签到"} elif code == "382003": return {"status": "failed", "message": msg or "非超话成员"} else: return {"status": "failed", "message": f"code={code}, msg={msg}"} except Exception as e: return {"status": "failed", "message": str(e)} @router.post("/{account_id}/signin") async def manual_signin( account_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), body: dict = Body(default=None), ): """ Manually trigger sign-in for selected (or all) super topics. Body (optional): {"topic_indices": [0, 1, 3]} — indices of topics to sign. If omitted or empty, signs all topics. """ account = await _get_owned_account(account_id, user, db) key = _encryption_key() # Decrypt cookie try: cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) except Exception: account.status = "invalid_cookie" await db.commit() return error_response("Cookie decryption failed", "COOKIE_ERROR", status_code=400) # Verify cookie verify = await _verify_weibo_cookie(cookie_str) if not verify["valid"]: account.status = "invalid_cookie" await db.commit() return error_response("Cookie is invalid or expired", "COOKIE_EXPIRED", status_code=400) # Activate account if pending if account.status != "active": account.status = "active" account.last_checked_at = datetime.utcnow() # Get super topics topics = await _get_super_topics(cookie_str, account.weibo_user_id) if not topics: await db.commit() return success_response( {"signed": 0, "already_signed": 0, "failed": 0, "topics": []}, "No super topics found for this account", ) # Filter topics if specific indices provided selected_indices = None if body and isinstance(body, dict): selected_indices = body.get("topic_indices") if selected_indices and isinstance(selected_indices, list): topics = [topics[i] for i in selected_indices if 0 <= i < len(topics)] if not topics: return success_response( {"signed": 0, "already_signed": 0, "failed": 0, "topics": []}, "No valid topics selected", ) # Sign each topic results = [] signed = already = failed = 0 for topic in topics: import asyncio await asyncio.sleep(1.5) # anti-bot delay r = await _do_signin(cookie_str, topic["title"], topic["containerid"]) r["topic"] = topic["title"] results.append(r) # Write signin log log = SigninLog( account_id=account.id, topic_title=topic["title"], status="success" if r["status"] == "success" else "failed_already_signed" if r["status"] == "already_signed" else "failed_network", reward_info={"message": r["message"]}, signed_at=datetime.utcnow(), ) db.add(log) if r["status"] == "success": signed += 1 elif r["status"] == "already_signed": already += 1 else: failed += 1 account.last_checked_at = datetime.utcnow() await db.commit() return success_response( { "signed": signed, "already_signed": already, "failed": failed, "total_topics": len(topics), "details": results, }, f"Signed {signed} topics, {already} already signed, {failed} failed", )