Files
weibo_signin/debug_qrcode_flow.py

510 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
微博扫码登录 + 完整业务流程调试脚本。
模式:
python debug_qrcode_flow.py # 直连模式:扫码 + 添加 + 验证 + 签到
python debug_qrcode_flow.py --frontend # 前端模式:通过 Flask 路由
python debug_qrcode_flow.py --api-only # 仅测试后端 API跳过扫码用已有账号
"""
import re
import json
import time
import base64
import requests
import sys
# ---- 配置 ----
AUTH_BASE_URL = 'http://localhost:8001'
API_BASE_URL = 'http://localhost:8000'
FRONTEND_URL = 'http://localhost:5000'
TEST_EMAIL = 'admin@example.com'
TEST_PASSWORD = 'Admin123!'
WEIBO_HEADERS = {
'User-Agent': (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
),
'Referer': 'https://weibo.com/',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
WEIBO_SESSION = requests.Session()
WEIBO_SESSION.headers.update(WEIBO_HEADERS)
def parse_jsonp(text):
m = re.search(r'\((.*)\)', text, re.DOTALL)
if m:
return json.loads(m.group(1))
try:
return json.loads(text)
except Exception:
return None
def api_headers(token):
return {'Content-Type': 'application/json', 'Authorization': f'Bearer {token}'}
# ================================================================
# STEP 0: 登录系统
# ================================================================
def step0_login():
print("=" * 60)
print("STEP 0: 登录系统")
print("=" * 60)
resp = requests.post(
f'{AUTH_BASE_URL}/auth/login',
json={'email': TEST_EMAIL, 'password': TEST_PASSWORD},
timeout=10,
)
print(f" Status: {resp.status_code}")
if resp.status_code != 200:
print(f" [ERROR] 登录失败: {resp.text[:300]}")
return None
data = resp.json()
token = data['access_token']
user = data.get('user', {})
print(f" 用户: {user.get('username')} ({user.get('email')})")
print(f" token: {token[:50]}...")
# 验证 token 对 API 服务有效
verify = requests.get(
f'{API_BASE_URL}/api/v1/accounts',
headers=api_headers(token),
timeout=10,
)
print(f" 验证 API 服务: GET /accounts -> {verify.status_code}")
if verify.status_code != 200:
print(f" [ERROR] API 服务 token 无效: {verify.text[:300]}")
return None
print(f" ✅ token 有效")
return token
# ================================================================
# STEP 1: 清理重复账号
# ================================================================
def step1_cleanup(token):
print("\n" + "=" * 60)
print("STEP 1: 清理重复账号")
print("=" * 60)
resp = requests.get(f'{API_BASE_URL}/api/v1/accounts', headers=api_headers(token), timeout=10)
accounts = resp.json().get('data', [])
print(f" 当前账号数: {len(accounts)}")
if len(accounts) <= 1:
print(" 无需清理")
return accounts[0]['id'] if accounts else None
# 按 weibo_user_id 分组,每组只保留最新的
groups = {}
for acc in accounts:
wid = acc['weibo_user_id']
if wid not in groups:
groups[wid] = []
groups[wid].append(acc)
keep_id = None
for wid, accs in groups.items():
accs.sort(key=lambda a: a['created_at'], reverse=True)
keep = accs[0]
keep_id = keep['id']
print(f" 保留: {keep['id'][:8]}... ({keep['remark']}) created={keep['created_at']}")
for dup in accs[1:]:
print(f" 删除: {dup['id'][:8]}... ({dup['remark']}) created={dup['created_at']}")
del_resp = requests.delete(
f'{API_BASE_URL}/api/v1/accounts/{dup["id"]}',
headers=api_headers(token),
timeout=10,
)
print(f" -> {del_resp.status_code}")
# 验证
resp2 = requests.get(f'{API_BASE_URL}/api/v1/accounts', headers=api_headers(token), timeout=10)
remaining = resp2.json().get('data', [])
print(f" 清理后账号数: {len(remaining)}")
return keep_id
# ================================================================
# STEP 2: 验证 CookiePOST /accounts/{id}/verify
# ================================================================
def step2_verify(token, account_id):
print("\n" + "=" * 60)
print("STEP 2: 验证 Cookie")
print("=" * 60)
resp = requests.post(
f'{API_BASE_URL}/api/v1/accounts/{account_id}/verify',
headers=api_headers(token),
timeout=20,
)
print(f" Status: {resp.status_code}")
print(f" Body: {resp.text[:500]}")
if resp.status_code == 200:
data = resp.json().get('data', {})
valid = data.get('cookie_valid')
status = data.get('status')
name = data.get('weibo_screen_name', '')
print(f" cookie_valid: {valid}")
print(f" status: {status}")
print(f" screen_name: {name}")
if valid:
print(" ✅ Cookie 有效,账号已激活")
else:
print(" ❌ Cookie 无效")
return valid
else:
print(f" ❌ 验证失败")
return False
# ================================================================
# STEP 3: 手动签到POST /accounts/{id}/signin
# ================================================================
def step3_signin(token, account_id):
print("\n" + "=" * 60)
print("STEP 3: 手动签到")
print("=" * 60)
print(" 调用 POST /accounts/{id}/signin ...")
print(" (这可能需要一些时间,每个超话间隔 1.5 秒)")
resp = requests.post(
f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin',
headers=api_headers(token),
timeout=300, # 签到可能很慢
)
print(f" Status: {resp.status_code}")
if resp.status_code == 200:
result = resp.json()
data = result.get('data', {})
msg = result.get('message', '')
print(f" 消息: {msg}")
print(f" 签到成功: {data.get('signed', 0)}")
print(f" 已签过: {data.get('already_signed', 0)}")
print(f" 失败: {data.get('failed', 0)}")
print(f" 总超话数: {data.get('total_topics', 0)}")
details = data.get('details', [])
for d in details[:10]: # 最多显示 10 条
icon = '' if d['status'] == 'success' else '⏭️' if d['status'] == 'already_signed' else ''
print(f" {icon} {d.get('topic', '?')}: {d.get('message', '')}")
if len(details) > 10:
print(f" ... 还有 {len(details) - 10}")
return True
else:
print(f" ❌ 签到失败: {resp.text[:500]}")
return False
# ================================================================
# STEP 4: 查看签到日志
# ================================================================
def step4_logs(token, account_id):
print("\n" + "=" * 60)
print("STEP 4: 查看签到日志")
print("=" * 60)
resp = requests.get(
f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin-logs',
params={'page': 1, 'size': 10},
headers=api_headers(token),
timeout=10,
)
print(f" Status: {resp.status_code}")
if resp.status_code == 200:
data = resp.json().get('data', {})
items = data.get('items', [])
total = data.get('total', 0)
print(f" 总日志数: {total}")
for log in items[:5]:
print(f" [{log.get('status')}] {log.get('topic_title', '?')} @ {log.get('signed_at', '?')[:19]}")
if not items:
print(" (暂无日志)")
else:
print(f" ❌ 获取日志失败: {resp.text[:300]}")
# ================================================================
# STEP 5: 查看账号详情(验证前端会用到的接口)
# ================================================================
def step5_detail(token, account_id):
print("\n" + "=" * 60)
print("STEP 5: 验证账号详情接口")
print("=" * 60)
# 账号详情
r1 = requests.get(f'{API_BASE_URL}/api/v1/accounts/{account_id}', headers=api_headers(token), timeout=10)
print(f" GET /accounts/{{id}} -> {r1.status_code}")
if r1.status_code == 200:
acc = r1.json().get('data', {})
print(f" status={acc.get('status')}, remark={acc.get('remark')}")
else:
print(f"{r1.text[:200]}")
# 任务列表
r2 = requests.get(f'{API_BASE_URL}/api/v1/accounts/{account_id}/tasks', headers=api_headers(token), timeout=10)
print(f" GET /accounts/{{id}}/tasks -> {r2.status_code}")
if r2.status_code == 200:
tasks = r2.json().get('data', [])
print(f" 任务数: {len(tasks)}")
else:
print(f"{r2.text[:200]}")
# 签到日志
r3 = requests.get(
f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin-logs',
params={'page': 1, 'size': 5},
headers=api_headers(token),
timeout=10,
)
print(f" GET /accounts/{{id}}/signin-logs -> {r3.status_code}")
if r3.status_code == 200:
logs = r3.json().get('data', {})
print(f" 日志数: {logs.get('total', 0)}, items: {len(logs.get('items', []))}")
else:
print(f"{r3.text[:200]}")
if all(r.status_code == 200 for r in [r1, r2, r3]):
print(" ✅ 所有详情接口正常")
else:
print(" ❌ 部分接口异常")
# ================================================================
# 扫码流程(直连模式用)
# ================================================================
def qrcode_flow(token):
"""生成二维码 → 轮询 → SSO 登录 → 添加账号,返回 account_id"""
# 生成二维码
print("\n" + "=" * 60)
print("QRCODE: 生成二维码")
print("=" * 60)
url = 'https://login.sina.com.cn/sso/qrcode/image'
params = {'entry': 'weibo', 'size': '180', 'callback': f'STK_{int(time.time() * 1000)}'}
resp = WEIBO_SESSION.get(url, params=params, timeout=10)
data = parse_jsonp(resp.text)
if not data or data.get('retcode') != 20000000:
print(" [ERROR] 生成二维码失败")
return None
qr_data = data.get('data', {})
qrid = qr_data.get('qrid')
image = qr_data.get('image', '')
if image.startswith('//'):
image = 'https:' + image
if image.startswith('http'):
img_resp = WEIBO_SESSION.get(image, timeout=10)
with open('qrcode.png', 'wb') as f:
f.write(img_resp.content)
print(f" qrid: {qrid}")
print(f" 二维码已保存到 qrcode.png")
# 轮询
print("\n 请用手机微博扫描 qrcode.png ...")
check_url = 'https://login.sina.com.cn/sso/qrcode/check'
last_retcode = None
alt_token = None
for i in range(120):
time.sleep(2)
params = {'entry': 'weibo', 'qrid': qrid, 'callback': f'STK_{int(time.time() * 1000)}'}
resp = WEIBO_SESSION.get(check_url, params=params, timeout=10)
data = parse_jsonp(resp.text)
retcode = data.get('retcode') if data else None
if retcode != last_retcode:
msg = data.get('msg', '') if data else ''
print(f" [{i+1}] retcode: {last_retcode} -> {retcode} ({msg})")
last_retcode = retcode
else:
print(f" [{i+1}] retcode={retcode}", end='\r')
if not data:
continue
nested = data.get('data')
alt = nested.get('alt', '') if isinstance(nested, dict) else ''
if retcode == 20000000 and alt:
print(f"\n ✅ 登录成功! alt={alt[:40]}...")
alt_token = alt
break
if retcode in (50114004, 50050002):
print(f"\n ❌ 二维码失效")
return None
if not alt_token:
print("\n ❌ 轮询超时")
return None
# SSO 登录
print("\n SSO 登录...")
sso_url = (
f"https://login.sina.com.cn/sso/login.php"
f"?entry=weibo&returntype=TEXT&crossdomain=1&cdult=3"
f"&domain=weibo.com&alt={alt_token}&savestate=30"
f"&callback=STK_{int(time.time() * 1000)}"
)
sso_session = requests.Session()
sso_session.headers.update(WEIBO_HEADERS)
resp = sso_session.get(sso_url, allow_redirects=True, timeout=15)
sso_data = parse_jsonp(resp.text)
uid = str(sso_data.get('uid', '')) if sso_data else ''
nick = sso_data.get('nick', '') if sso_data else ''
cross_urls = sso_data.get('crossDomainUrlList', []) if sso_data else []
print(f" uid={uid}, nick={nick}, crossDomainUrls={len(cross_urls)}")
for u in cross_urls:
if isinstance(u, str) and u.startswith('http'):
try:
sso_session.get(u, allow_redirects=True, timeout=10)
except Exception:
pass
all_cookies = {}
for c in sso_session.cookies:
if c.domain and 'weibo.com' in c.domain:
all_cookies[c.name] = c.value
cookie_str = '; '.join(f'{k}={v}' for k, v in all_cookies.items())
has_sub = 'SUB' in all_cookies
print(f" weibo.com Cookie 字段 ({len(all_cookies)}): {list(all_cookies.keys())}")
print(f" 包含 SUB: {'' if has_sub else ''}")
if not has_sub:
print(" [ERROR] 缺少 SUB cookie")
return None
# 添加账号
print("\n 添加账号到后端...")
remark = f"{nick} (调试脚本)" if nick else "调试脚本添加"
resp = requests.post(
f'{API_BASE_URL}/api/v1/accounts',
json={'weibo_user_id': uid, 'cookie': cookie_str, 'remark': remark},
headers=api_headers(token),
timeout=10,
)
print(f" Status: {resp.status_code}")
if resp.status_code in (200, 201):
acc = resp.json().get('data', {})
account_id = acc.get('id')
print(f" ✅ 账号添加成功: {account_id}")
return account_id
else:
print(f" ❌ 添加失败: {resp.text[:300]}")
return None
# ================================================================
# 前端模式
# ================================================================
def frontend_flow():
fe = requests.Session()
print("=" * 60)
print("FRONTEND: 登录")
print("=" * 60)
resp = fe.post(f'{FRONTEND_URL}/login', data={'email': TEST_EMAIL, 'password': TEST_PASSWORD}, allow_redirects=False, timeout=10)
print(f" POST /login -> {resp.status_code}")
if resp.status_code in (301, 302):
fe.get(f'{FRONTEND_URL}{resp.headers["Location"]}', timeout=10)
dash = fe.get(f'{FRONTEND_URL}/dashboard', timeout=10)
if dash.status_code != 200:
print(" [ERROR] 登录失败")
return
print(" ✅ 登录成功")
print("\n" + "=" * 60)
print("FRONTEND: 生成二维码")
print("=" * 60)
resp = fe.post(f'{FRONTEND_URL}/api/weibo/qrcode/generate', headers={'Content-Type': 'application/json'}, timeout=15)
print(f" Status: {resp.status_code}, Body: {resp.text[:300]}")
gen = resp.json()
if not gen.get('success'):
print(" [ERROR] 生成失败")
return
qrid = gen['qrid']
qr_image = gen.get('qr_image', '')
if qr_image.startswith('http'):
img = requests.get(qr_image, headers=WEIBO_HEADERS, timeout=10)
with open('qrcode.png', 'wb') as f:
f.write(img.content)
print(" 二维码已保存到 qrcode.png")
print("\n 请用手机微博扫描 qrcode.png ...")
last_status = None
for i in range(120):
time.sleep(2)
resp = fe.get(f'{FRONTEND_URL}/api/weibo/qrcode/check/{qrid}', timeout=15)
data = resp.json()
st = data.get('status')
if st != last_status:
print(f" [{i+1}] status: {last_status} -> {st} | {json.dumps(data, ensure_ascii=False)[:200]}")
last_status = st
else:
print(f" [{i+1}] status={st}", end='\r')
if st == 'success':
print(f"\n ✅ 扫码成功! uid={data.get('weibo_uid')}")
break
if st in ('expired', 'cancelled', 'error'):
print(f"\n{st}: {data.get('error', '')}")
return
else:
print("\n ❌ 超时")
return
# 添加账号
print("\n 添加账号...")
time.sleep(0.5)
resp = fe.post(
f'{FRONTEND_URL}/api/weibo/qrcode/add-account',
json={'qrid': qrid},
headers={'Content-Type': 'application/json'},
timeout=15,
)
print(f" Status: {resp.status_code}, Body: {resp.text[:500]}")
result = resp.json()
if result.get('success'):
print(f" ✅ 账号添加成功!")
else:
print(f" ❌ 添加失败: {result.get('message')}")
# ================================================================
# MAIN
# ================================================================
if __name__ == '__main__':
mode = 'direct'
if '--frontend' in sys.argv:
mode = 'frontend'
elif '--api-only' in sys.argv:
mode = 'api-only'
if mode == 'frontend':
print("🌐 前端模式")
frontend_flow()
else:
# 直连模式 或 api-only 模式
token = step0_login()
if not token:
sys.exit(1)
if mode == 'api-only':
print("\n📡 API-only 模式:跳过扫码,使用已有账号")
account_id = step1_cleanup(token)
if not account_id:
print("\n没有账号,请先用默认模式添加")
sys.exit(1)
else:
print("\n🔗 直连模式:扫码 + 完整业务流程")
# 先清理
step1_cleanup(token)
# 扫码添加
account_id = qrcode_flow(token)
if not account_id:
sys.exit(1)
# 后续业务验证
step2_verify(token, account_id)
step3_signin(token, account_id)
step4_logs(token, account_id)
step5_detail(token, account_id)
print("\n" + "=" * 60)
print("全部流程完成!")
print("=" * 60)