""" 微博扫码登录 + 完整业务流程调试脚本。 模式: python debug_qrcode_flow.py # 直连模式:扫码 + 添加 + 验证 + 签到 python debug_qrcode_flow.py --frontend # 前端模式:通过 Flask 路由 python debug_qrcode_flow.py --api-only # 仅测试后端 API(跳过扫码,用已有账号) """ import re import json import time import base64 import requests import sys # ---- 配置 ---- AUTH_BASE_URL = 'http://localhost:8001' API_BASE_URL = 'http://localhost:8000' FRONTEND_URL = 'http://localhost:5000' TEST_EMAIL = 'admin@example.com' TEST_PASSWORD = 'Admin123!' WEIBO_HEADERS = { 'User-Agent': ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ), 'Referer': 'https://weibo.com/', 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', } WEIBO_SESSION = requests.Session() WEIBO_SESSION.headers.update(WEIBO_HEADERS) def parse_jsonp(text): m = re.search(r'\((.*)\)', text, re.DOTALL) if m: return json.loads(m.group(1)) try: return json.loads(text) except Exception: return None def api_headers(token): return {'Content-Type': 'application/json', 'Authorization': f'Bearer {token}'} # ================================================================ # STEP 0: 登录系统 # ================================================================ def step0_login(): print("=" * 60) print("STEP 0: 登录系统") print("=" * 60) resp = requests.post( f'{AUTH_BASE_URL}/auth/login', json={'email': TEST_EMAIL, 'password': TEST_PASSWORD}, timeout=10, ) print(f" Status: {resp.status_code}") if resp.status_code != 200: print(f" [ERROR] 登录失败: {resp.text[:300]}") return None data = resp.json() token = data['access_token'] user = data.get('user', {}) print(f" 用户: {user.get('username')} ({user.get('email')})") print(f" token: {token[:50]}...") # 验证 token 对 API 服务有效 verify = requests.get( f'{API_BASE_URL}/api/v1/accounts', headers=api_headers(token), timeout=10, ) print(f" 验证 API 服务: GET /accounts -> {verify.status_code}") if verify.status_code != 200: print(f" [ERROR] API 服务 token 无效: {verify.text[:300]}") return None print(f" ✅ token 有效") return token # ================================================================ # STEP 1: 清理重复账号 # ================================================================ def step1_cleanup(token): print("\n" + "=" * 60) print("STEP 1: 清理重复账号") print("=" * 60) resp = requests.get(f'{API_BASE_URL}/api/v1/accounts', headers=api_headers(token), timeout=10) accounts = resp.json().get('data', []) print(f" 当前账号数: {len(accounts)}") if len(accounts) <= 1: print(" 无需清理") return accounts[0]['id'] if accounts else None # 按 weibo_user_id 分组,每组只保留最新的 groups = {} for acc in accounts: wid = acc['weibo_user_id'] if wid not in groups: groups[wid] = [] groups[wid].append(acc) keep_id = None for wid, accs in groups.items(): accs.sort(key=lambda a: a['created_at'], reverse=True) keep = accs[0] keep_id = keep['id'] print(f" 保留: {keep['id'][:8]}... ({keep['remark']}) created={keep['created_at']}") for dup in accs[1:]: print(f" 删除: {dup['id'][:8]}... ({dup['remark']}) created={dup['created_at']}") del_resp = requests.delete( f'{API_BASE_URL}/api/v1/accounts/{dup["id"]}', headers=api_headers(token), timeout=10, ) print(f" -> {del_resp.status_code}") # 验证 resp2 = requests.get(f'{API_BASE_URL}/api/v1/accounts', headers=api_headers(token), timeout=10) remaining = resp2.json().get('data', []) print(f" 清理后账号数: {len(remaining)}") return keep_id # ================================================================ # STEP 2: 验证 Cookie(POST /accounts/{id}/verify) # ================================================================ def step2_verify(token, account_id): print("\n" + "=" * 60) print("STEP 2: 验证 Cookie") print("=" * 60) resp = requests.post( f'{API_BASE_URL}/api/v1/accounts/{account_id}/verify', headers=api_headers(token), timeout=20, ) print(f" Status: {resp.status_code}") print(f" Body: {resp.text[:500]}") if resp.status_code == 200: data = resp.json().get('data', {}) valid = data.get('cookie_valid') status = data.get('status') name = data.get('weibo_screen_name', '') print(f" cookie_valid: {valid}") print(f" status: {status}") print(f" screen_name: {name}") if valid: print(" ✅ Cookie 有效,账号已激活") else: print(" ❌ Cookie 无效") return valid else: print(f" ❌ 验证失败") return False # ================================================================ # STEP 3: 手动签到(POST /accounts/{id}/signin) # ================================================================ def step3_signin(token, account_id): print("\n" + "=" * 60) print("STEP 3: 手动签到") print("=" * 60) print(" 调用 POST /accounts/{id}/signin ...") print(" (这可能需要一些时间,每个超话间隔 1.5 秒)") resp = requests.post( f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin', headers=api_headers(token), timeout=300, # 签到可能很慢 ) print(f" Status: {resp.status_code}") if resp.status_code == 200: result = resp.json() data = result.get('data', {}) msg = result.get('message', '') print(f" 消息: {msg}") print(f" 签到成功: {data.get('signed', 0)}") print(f" 已签过: {data.get('already_signed', 0)}") print(f" 失败: {data.get('failed', 0)}") print(f" 总超话数: {data.get('total_topics', 0)}") details = data.get('details', []) for d in details[:10]: # 最多显示 10 条 icon = '✅' if d['status'] == 'success' else '⏭️' if d['status'] == 'already_signed' else '❌' print(f" {icon} {d.get('topic', '?')}: {d.get('message', '')}") if len(details) > 10: print(f" ... 还有 {len(details) - 10} 条") return True else: print(f" ❌ 签到失败: {resp.text[:500]}") return False # ================================================================ # STEP 4: 查看签到日志 # ================================================================ def step4_logs(token, account_id): print("\n" + "=" * 60) print("STEP 4: 查看签到日志") print("=" * 60) resp = requests.get( f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin-logs', params={'page': 1, 'size': 10}, headers=api_headers(token), timeout=10, ) print(f" Status: {resp.status_code}") if resp.status_code == 200: data = resp.json().get('data', {}) items = data.get('items', []) total = data.get('total', 0) print(f" 总日志数: {total}") for log in items[:5]: print(f" [{log.get('status')}] {log.get('topic_title', '?')} @ {log.get('signed_at', '?')[:19]}") if not items: print(" (暂无日志)") else: print(f" ❌ 获取日志失败: {resp.text[:300]}") # ================================================================ # STEP 5: 查看账号详情(验证前端会用到的接口) # ================================================================ def step5_detail(token, account_id): print("\n" + "=" * 60) print("STEP 5: 验证账号详情接口") print("=" * 60) # 账号详情 r1 = requests.get(f'{API_BASE_URL}/api/v1/accounts/{account_id}', headers=api_headers(token), timeout=10) print(f" GET /accounts/{{id}} -> {r1.status_code}") if r1.status_code == 200: acc = r1.json().get('data', {}) print(f" status={acc.get('status')}, remark={acc.get('remark')}") else: print(f" ❌ {r1.text[:200]}") # 任务列表 r2 = requests.get(f'{API_BASE_URL}/api/v1/accounts/{account_id}/tasks', headers=api_headers(token), timeout=10) print(f" GET /accounts/{{id}}/tasks -> {r2.status_code}") if r2.status_code == 200: tasks = r2.json().get('data', []) print(f" 任务数: {len(tasks)}") else: print(f" ❌ {r2.text[:200]}") # 签到日志 r3 = requests.get( f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin-logs', params={'page': 1, 'size': 5}, headers=api_headers(token), timeout=10, ) print(f" GET /accounts/{{id}}/signin-logs -> {r3.status_code}") if r3.status_code == 200: logs = r3.json().get('data', {}) print(f" 日志数: {logs.get('total', 0)}, items: {len(logs.get('items', []))}") else: print(f" ❌ {r3.text[:200]}") if all(r.status_code == 200 for r in [r1, r2, r3]): print(" ✅ 所有详情接口正常") else: print(" ❌ 部分接口异常") # ================================================================ # 扫码流程(直连模式用) # ================================================================ def qrcode_flow(token): """生成二维码 → 轮询 → SSO 登录 → 添加账号,返回 account_id""" # 生成二维码 print("\n" + "=" * 60) print("QRCODE: 生成二维码") print("=" * 60) url = 'https://login.sina.com.cn/sso/qrcode/image' params = {'entry': 'weibo', 'size': '180', 'callback': f'STK_{int(time.time() * 1000)}'} resp = WEIBO_SESSION.get(url, params=params, timeout=10) data = parse_jsonp(resp.text) if not data or data.get('retcode') != 20000000: print(" [ERROR] 生成二维码失败") return None qr_data = data.get('data', {}) qrid = qr_data.get('qrid') image = qr_data.get('image', '') if image.startswith('//'): image = 'https:' + image if image.startswith('http'): img_resp = WEIBO_SESSION.get(image, timeout=10) with open('qrcode.png', 'wb') as f: f.write(img_resp.content) print(f" qrid: {qrid}") print(f" 二维码已保存到 qrcode.png") # 轮询 print("\n 请用手机微博扫描 qrcode.png ...") check_url = 'https://login.sina.com.cn/sso/qrcode/check' last_retcode = None alt_token = None for i in range(120): time.sleep(2) params = {'entry': 'weibo', 'qrid': qrid, 'callback': f'STK_{int(time.time() * 1000)}'} resp = WEIBO_SESSION.get(check_url, params=params, timeout=10) data = parse_jsonp(resp.text) retcode = data.get('retcode') if data else None if retcode != last_retcode: msg = data.get('msg', '') if data else '' print(f" [{i+1}] retcode: {last_retcode} -> {retcode} ({msg})") last_retcode = retcode else: print(f" [{i+1}] retcode={retcode}", end='\r') if not data: continue nested = data.get('data') alt = nested.get('alt', '') if isinstance(nested, dict) else '' if retcode == 20000000 and alt: print(f"\n ✅ 登录成功! alt={alt[:40]}...") alt_token = alt break if retcode in (50114004, 50050002): print(f"\n ❌ 二维码失效") return None if not alt_token: print("\n ❌ 轮询超时") return None # SSO 登录 print("\n SSO 登录...") sso_url = ( f"https://login.sina.com.cn/sso/login.php" f"?entry=weibo&returntype=TEXT&crossdomain=1&cdult=3" f"&domain=weibo.com&alt={alt_token}&savestate=30" f"&callback=STK_{int(time.time() * 1000)}" ) sso_session = requests.Session() sso_session.headers.update(WEIBO_HEADERS) resp = sso_session.get(sso_url, allow_redirects=True, timeout=15) sso_data = parse_jsonp(resp.text) uid = str(sso_data.get('uid', '')) if sso_data else '' nick = sso_data.get('nick', '') if sso_data else '' cross_urls = sso_data.get('crossDomainUrlList', []) if sso_data else [] print(f" uid={uid}, nick={nick}, crossDomainUrls={len(cross_urls)}") for u in cross_urls: if isinstance(u, str) and u.startswith('http'): try: sso_session.get(u, allow_redirects=True, timeout=10) except Exception: pass all_cookies = {} for c in sso_session.cookies: if c.domain and 'weibo.com' in c.domain: all_cookies[c.name] = c.value cookie_str = '; '.join(f'{k}={v}' for k, v in all_cookies.items()) has_sub = 'SUB' in all_cookies print(f" weibo.com Cookie 字段 ({len(all_cookies)}): {list(all_cookies.keys())}") print(f" 包含 SUB: {'✅' if has_sub else '❌'}") if not has_sub: print(" [ERROR] 缺少 SUB cookie") return None # 添加账号 print("\n 添加账号到后端...") remark = f"{nick} (调试脚本)" if nick else "调试脚本添加" resp = requests.post( f'{API_BASE_URL}/api/v1/accounts', json={'weibo_user_id': uid, 'cookie': cookie_str, 'remark': remark}, headers=api_headers(token), timeout=10, ) print(f" Status: {resp.status_code}") if resp.status_code in (200, 201): acc = resp.json().get('data', {}) account_id = acc.get('id') print(f" ✅ 账号添加成功: {account_id}") return account_id else: print(f" ❌ 添加失败: {resp.text[:300]}") return None # ================================================================ # 前端模式 # ================================================================ def frontend_flow(): fe = requests.Session() print("=" * 60) print("FRONTEND: 登录") print("=" * 60) resp = fe.post(f'{FRONTEND_URL}/login', data={'email': TEST_EMAIL, 'password': TEST_PASSWORD}, allow_redirects=False, timeout=10) print(f" POST /login -> {resp.status_code}") if resp.status_code in (301, 302): fe.get(f'{FRONTEND_URL}{resp.headers["Location"]}', timeout=10) dash = fe.get(f'{FRONTEND_URL}/dashboard', timeout=10) if dash.status_code != 200: print(" [ERROR] 登录失败") return print(" ✅ 登录成功") print("\n" + "=" * 60) print("FRONTEND: 生成二维码") print("=" * 60) resp = fe.post(f'{FRONTEND_URL}/api/weibo/qrcode/generate', headers={'Content-Type': 'application/json'}, timeout=15) print(f" Status: {resp.status_code}, Body: {resp.text[:300]}") gen = resp.json() if not gen.get('success'): print(" [ERROR] 生成失败") return qrid = gen['qrid'] qr_image = gen.get('qr_image', '') if qr_image.startswith('http'): img = requests.get(qr_image, headers=WEIBO_HEADERS, timeout=10) with open('qrcode.png', 'wb') as f: f.write(img.content) print(" 二维码已保存到 qrcode.png") print("\n 请用手机微博扫描 qrcode.png ...") last_status = None for i in range(120): time.sleep(2) resp = fe.get(f'{FRONTEND_URL}/api/weibo/qrcode/check/{qrid}', timeout=15) data = resp.json() st = data.get('status') if st != last_status: print(f" [{i+1}] status: {last_status} -> {st} | {json.dumps(data, ensure_ascii=False)[:200]}") last_status = st else: print(f" [{i+1}] status={st}", end='\r') if st == 'success': print(f"\n ✅ 扫码成功! uid={data.get('weibo_uid')}") break if st in ('expired', 'cancelled', 'error'): print(f"\n ❌ {st}: {data.get('error', '')}") return else: print("\n ❌ 超时") return # 添加账号 print("\n 添加账号...") time.sleep(0.5) resp = fe.post( f'{FRONTEND_URL}/api/weibo/qrcode/add-account', json={'qrid': qrid}, headers={'Content-Type': 'application/json'}, timeout=15, ) print(f" Status: {resp.status_code}, Body: {resp.text[:500]}") result = resp.json() if result.get('success'): print(f" ✅ 账号添加成功!") else: print(f" ❌ 添加失败: {result.get('message')}") # ================================================================ # MAIN # ================================================================ if __name__ == '__main__': mode = 'direct' if '--frontend' in sys.argv: mode = 'frontend' elif '--api-only' in sys.argv: mode = 'api-only' if mode == 'frontend': print("🌐 前端模式") frontend_flow() else: # 直连模式 或 api-only 模式 token = step0_login() if not token: sys.exit(1) if mode == 'api-only': print("\n📡 API-only 模式:跳过扫码,使用已有账号") account_id = step1_cleanup(token) if not account_id: print("\n没有账号,请先用默认模式添加") sys.exit(1) else: print("\n🔗 直连模式:扫码 + 完整业务流程") # 先清理 step1_cleanup(token) # 扫码添加 account_id = qrcode_flow(token) if not account_id: sys.exit(1) # 后续业务验证 step2_verify(token, account_id) step3_signin(token, account_id) step4_logs(token, account_id) step5_detail(token, account_id) print("\n" + "=" * 60) print("全部流程完成!") print("=" * 60)