Files
weibo_signin/backend/api_service/app/routers/accounts.py

466 lines
15 KiB
Python
Raw Normal View History

2026-03-09 14:05:00 +08:00
"""
Weibo Account CRUD router.
All endpoints require JWT authentication and enforce resource ownership.
"""
import logging
from datetime import datetime
from typing import Dict, List
import httpx
2026-03-09 14:05:00 +08:00
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from shared.models import get_db, Account, SigninLog, User
2026-03-09 14:05:00 +08:00
from shared.crypto import encrypt_cookie, decrypt_cookie, derive_key
from shared.config import shared_settings
from shared.response import success_response, error_response
from api_service.app.dependencies import get_current_user
from api_service.app.schemas.account import (
AccountCreate,
AccountUpdate,
AccountResponse,
)
logger = logging.getLogger(__name__)
2026-03-09 14:05:00 +08:00
router = APIRouter(prefix="/api/v1/accounts", tags=["accounts"])
WEIBO_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Referer": "https://weibo.com/",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
2026-03-09 14:05:00 +08:00
def _encryption_key() -> bytes:
return derive_key(shared_settings.COOKIE_ENCRYPTION_KEY)
def _account_to_dict(account: Account) -> dict:
return AccountResponse.model_validate(account).model_dump(mode="json")
async def _get_owned_account(
account_id: str,
user: User,
db: AsyncSession,
) -> Account:
"""Fetch an account and verify it belongs to the current user."""
result = await db.execute(select(Account).where(Account.id == account_id))
account = result.scalar_one_or_none()
if account is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Account not found")
if account.user_id != user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
return account
# ---- CREATE ----
@router.post("", status_code=status.HTTP_201_CREATED)
async def create_account(
body: AccountCreate,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
key = _encryption_key()
ciphertext, iv = encrypt_cookie(body.cookie, key)
account = Account(
user_id=user.id,
weibo_user_id=body.weibo_user_id,
remark=body.remark,
encrypted_cookies=ciphertext,
iv=iv,
status="pending",
)
db.add(account)
await db.commit()
await db.refresh(account)
return success_response(_account_to_dict(account), "Account created")
# ---- LIST ----
@router.get("")
async def list_accounts(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(Account).where(Account.user_id == user.id)
)
accounts = result.scalars().all()
return success_response(
[_account_to_dict(a) for a in accounts],
"Accounts retrieved",
)
# ---- DETAIL ----
@router.get("/{account_id}")
async def get_account(
account_id: str,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
account = await _get_owned_account(account_id, user, db)
return success_response(_account_to_dict(account), "Account retrieved")
# ---- UPDATE ----
@router.put("/{account_id}")
async def update_account(
account_id: str,
body: AccountUpdate,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
account = await _get_owned_account(account_id, user, db)
if body.remark is not None:
account.remark = body.remark
if body.cookie is not None:
key = _encryption_key()
ciphertext, iv = encrypt_cookie(body.cookie, key)
account.encrypted_cookies = ciphertext
account.iv = iv
await db.commit()
await db.refresh(account)
return success_response(_account_to_dict(account), "Account updated")
# ---- DELETE ----
@router.delete("/{account_id}")
async def delete_account(
account_id: str,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
account = await _get_owned_account(account_id, user, db)
await db.delete(account)
await db.commit()
return success_response(None, "Account deleted")
# ---- helpers for verify / signin ----
def _parse_cookie_str(cookie_str: str) -> Dict[str, str]:
"""Parse 'k1=v1; k2=v2' into a dict."""
cookies: Dict[str, str] = {}
for pair in cookie_str.split(";"):
pair = pair.strip()
if "=" in pair:
k, v = pair.split("=", 1)
cookies[k.strip()] = v.strip()
return cookies
async def _verify_weibo_cookie(cookie_str: str) -> dict:
"""
Verify cookie via weibo.com PC API.
Uses /ajax/side/cards which returns ok=1 when logged in.
Returns {"valid": bool, "uid": str|None, "screen_name": str|None}.
"""
cookies = _parse_cookie_str(cookie_str)
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
# Step 1: check login via /ajax/side/cards
resp = await client.get(
"https://weibo.com/ajax/side/cards",
params={"count": "1"},
headers=WEIBO_HEADERS,
cookies=cookies,
)
data = resp.json()
if data.get("ok") != 1:
return {"valid": False, "uid": None, "screen_name": None}
# Step 2: get user info via /ajax/profile/detail
uid = None
screen_name = None
try:
resp2 = await client.get(
"https://weibo.com/ajax/profile/info",
headers=WEIBO_HEADERS,
cookies=cookies,
)
info = resp2.json()
if info.get("ok") == 1:
user = info.get("data", {}).get("user", {})
uid = str(user.get("idstr", user.get("id", "")))
screen_name = user.get("screen_name", "")
except Exception:
pass # profile info is optional, login check already passed
return {"valid": True, "uid": uid, "screen_name": screen_name}
# ---- VERIFY COOKIE ----
@router.post("/{account_id}/verify")
async def verify_account(
account_id: str,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Verify the stored cookie is still valid and update account status."""
account = await _get_owned_account(account_id, user, db)
key = _encryption_key()
try:
cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key)
except Exception:
account.status = "invalid_cookie"
await db.commit()
await db.refresh(account)
return success_response(
{**_account_to_dict(account), "cookie_valid": False},
"Cookie decryption failed",
)
result = await _verify_weibo_cookie(cookie_str)
if result["valid"]:
account.status = "active"
account.last_checked_at = datetime.utcnow()
else:
account.status = "invalid_cookie"
await db.commit()
await db.refresh(account)
return success_response(
{**_account_to_dict(account), "cookie_valid": result["valid"],
"weibo_screen_name": result.get("screen_name")},
"Cookie verified" if result["valid"] else "Cookie is invalid or expired",
)
# ---- MANUAL SIGNIN ----
async def _get_super_topics(cookie_str: str, weibo_uid: str = "") -> List[dict]:
"""
Fetch followed super topics via weibo.com PC API.
GET /ajax/profile/topicContent?tabid=231093_-_chaohua
Returns list of {"title": str, "containerid": str}.
"""
import re
cookies = _parse_cookie_str(cookie_str)
topics: List[dict] = []
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
# First get XSRF-TOKEN by visiting weibo.com
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
xsrf = client.cookies.get("XSRF-TOKEN", "")
headers = {
**WEIBO_HEADERS,
"X-Requested-With": "XMLHttpRequest",
}
if xsrf:
headers["X-XSRF-TOKEN"] = xsrf
page = 1
max_page = 10
while page <= max_page:
params = {"tabid": "231093_-_chaohua", "page": str(page)}
resp = await client.get(
"https://weibo.com/ajax/profile/topicContent",
params=params,
headers=headers,
cookies=cookies,
)
data = resp.json()
if data.get("ok") != 1:
break
topic_list = data.get("data", {}).get("list", [])
if not topic_list:
break
for item in topic_list:
title = item.get("topic_name", "") or item.get("title", "")
# Extract containerid from oid "1022:100808xxx" or scheme
containerid = ""
oid = item.get("oid", "")
if "100808" in oid:
m = re.search(r"100808[0-9a-fA-F]+", oid)
if m:
containerid = m.group(0)
if not containerid:
scheme = item.get("scheme", "")
m = re.search(r"100808[0-9a-fA-F]+", scheme)
if m:
containerid = m.group(0)
if title and containerid:
topics.append({"title": title, "containerid": containerid})
# Check pagination
api_max = data.get("data", {}).get("max_page", 1)
if page >= api_max:
break
page += 1
return topics
async def _do_signin(cookie_str: str, topic_title: str, containerid: str) -> dict:
"""
Sign in to a single super topic via weibo.com PC API.
GET /p/aj/general/button with full browser-matching parameters.
Returns {"status": "success"|"already_signed"|"failed", "message": str}.
"""
import time as _time
cookies = _parse_cookie_str(cookie_str)
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
# Get XSRF-TOKEN
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
xsrf = client.cookies.get("XSRF-TOKEN", "")
headers = {
**WEIBO_HEADERS,
"Referer": f"https://weibo.com/p/{containerid}/super_index",
"X-Requested-With": "XMLHttpRequest",
}
if xsrf:
headers["X-XSRF-TOKEN"] = xsrf
try:
resp = await client.get(
"https://weibo.com/p/aj/general/button",
params={
"ajwvr": "6",
"api": "http://i.huati.weibo.com/aj/super/checkin",
"texta": "签到",
"textb": "已签到",
"status": "0",
"id": containerid,
"location": "page_100808_super_index",
"timezone": "GMT+0800",
"lang": "zh-cn",
"plat": "Win32",
"ua": WEIBO_HEADERS["User-Agent"],
"screen": "1920*1080",
"__rnd": str(int(_time.time() * 1000)),
},
headers=headers,
cookies=cookies,
)
data = resp.json()
code = str(data.get("code", ""))
msg = data.get("msg", "")
if code == "100000":
tip = ""
if isinstance(data.get("data"), dict):
tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "")
return {"status": "success", "message": tip or "签到成功"}
elif code == "382004":
return {"status": "already_signed", "message": msg or "今日已签到"}
elif code == "382003":
return {"status": "failed", "message": msg or "非超话成员"}
else:
return {"status": "failed", "message": f"code={code}, msg={msg}"}
except Exception as e:
return {"status": "failed", "message": str(e)}
@router.post("/{account_id}/signin")
async def manual_signin(
account_id: str,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Manually trigger sign-in for all followed super topics.
Verifies cookie first, fetches topic list, signs each one, writes logs.
"""
account = await _get_owned_account(account_id, user, db)
key = _encryption_key()
# Decrypt cookie
try:
cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key)
except Exception:
account.status = "invalid_cookie"
await db.commit()
return error_response("Cookie decryption failed", "COOKIE_ERROR", status_code=400)
# Verify cookie
verify = await _verify_weibo_cookie(cookie_str)
if not verify["valid"]:
account.status = "invalid_cookie"
await db.commit()
return error_response("Cookie is invalid or expired", "COOKIE_EXPIRED", status_code=400)
# Activate account if pending
if account.status != "active":
account.status = "active"
account.last_checked_at = datetime.utcnow()
# Get super topics
topics = await _get_super_topics(cookie_str, account.weibo_user_id)
if not topics:
await db.commit()
return success_response(
{"signed": 0, "already_signed": 0, "failed": 0, "topics": []},
"No super topics found for this account",
)
# Sign each topic
results = []
signed = already = failed = 0
for topic in topics:
import asyncio
await asyncio.sleep(1.5) # anti-bot delay
r = await _do_signin(cookie_str, topic["title"], topic["containerid"])
r["topic"] = topic["title"]
results.append(r)
# Write signin log
log = SigninLog(
account_id=account.id,
topic_title=topic["title"],
status="success" if r["status"] == "success"
else "failed_already_signed" if r["status"] == "already_signed"
else "failed_network",
reward_info={"message": r["message"]},
signed_at=datetime.utcnow(),
)
db.add(log)
if r["status"] == "success":
signed += 1
elif r["status"] == "already_signed":
already += 1
else:
failed += 1
account.last_checked_at = datetime.utcnow()
await db.commit()
return success_response(
{
"signed": signed,
"already_signed": already,
"failed": failed,
"total_topics": len(topics),
"details": results,
},
f"Signed {signed} topics, {already} already signed, {failed} failed",
)