import os import re import json import time import uuid import logging import traceback from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify from flask_session import Session import requests from functools import wraps from dotenv import load_dotenv from datetime import datetime load_dotenv() app = Flask(__name__) app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key') app.config['SESSION_TYPE'] = 'filesystem' Session(app) API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:8000') AUTH_BASE_URL = os.getenv('AUTH_BASE_URL', 'http://localhost:8001') logger = logging.getLogger(__name__) def get_headers(): """获取请求头,包含认证令牌。如果 token 过期会自动刷新。""" headers = {'Content-Type': 'application/json'} if 'access_token' in session: headers['Authorization'] = f"Bearer {session['access_token']}" return headers def _try_refresh_token(): """ 尝试用 refresh_token 刷新 access_token。 成功返回 True 并更新 session,失败返回 False。 """ refresh_token = session.get('refresh_token') if not refresh_token: logger.warning("Token 刷新失败: session 中没有 refresh_token") return False try: resp = requests.post( f'{AUTH_BASE_URL}/auth/refresh', json={'refresh_token': refresh_token}, timeout=10, ) if resp.status_code == 200: data = resp.json() session['access_token'] = data['access_token'] session['refresh_token'] = data['refresh_token'] session.modified = True logger.info("Token 刷新成功") return True else: logger.warning(f"Token 刷新失败: HTTP {resp.status_code}, body={resp.text[:200]}") except Exception as e: logger.warning(f"Token 刷新异常: {e}") return False def api_request(method, url, **kwargs): """ 封装 API 请求,自动处理 token 过期刷新。 如果收到 401,尝试刷新 token 后重试一次。 """ headers = kwargs.pop('headers', None) or get_headers() token_preview = headers.get('Authorization', 'NONE')[:30] logger.info(f"api_request: {method} {url} token={token_preview}...") resp = requests.request(method, url, headers=headers, timeout=10, **kwargs) if resp.status_code == 401: logger.warning(f"api_request: 收到 401, 尝试刷新 token...") if _try_refresh_token(): # Token 已刷新,用新 token 重试 headers['Authorization'] = f"Bearer {session['access_token']}" logger.info(f"api_request: 刷新成功,重试请求...") resp = requests.request(method, url, headers=headers, timeout=10, **kwargs) else: logger.error(f"api_request: token 刷新失败,清除 session 让用户重新登录") # 清除无效的 token,下次访问会被 login_required 拦截 session.pop('access_token', None) session.pop('refresh_token', None) session.pop('user', None) session.modified = True return resp def login_required(f): """登录验证装饰器""" @wraps(f) def decorated_function(*args, **kwargs): if 'user' not in session: flash('请先登录', 'warning') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function @app.route('/') def index(): if 'user' in session: return redirect(url_for('dashboard')) return redirect(url_for('login')) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') password = request.form.get('password') confirm_password = request.form.get('confirm_password') if password != confirm_password: flash('两次输入的密码不一致', 'danger') return redirect(url_for('register')) try: response = requests.post( f'{AUTH_BASE_URL}/auth/register', json={'username': username, 'email': email, 'password': password}, timeout=10 ) if response.status_code == 201: data = response.json() session['user'] = data['user'] session['access_token'] = data['access_token'] session['refresh_token'] = data['refresh_token'] flash('注册成功', 'success') return redirect(url_for('dashboard')) else: error_data = response.json() flash(error_data.get('detail', '注册失败'), 'danger') except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') return render_template('register.html') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': email = request.form.get('email') password = request.form.get('password') try: response = requests.post( f'{AUTH_BASE_URL}/auth/login', json={'email': email, 'password': password}, timeout=10 ) if response.status_code == 200: data = response.json() session['user'] = data['user'] session['access_token'] = data['access_token'] session['refresh_token'] = data['refresh_token'] flash('登录成功', 'success') return redirect(url_for('dashboard')) else: error_data = response.json() flash(error_data.get('detail', '登录失败'), 'danger') except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') return render_template('login.html') @app.route('/api/auth/wx-login', methods=['POST']) def wx_login_proxy(): """ 微信小程序登录代理。 小程序端调这个接口,转发到 auth_service 的 /auth/wx-login。 Web 端也可以用(未来微信扫码登录)。 """ try: data = request.json response = requests.post( f'{AUTH_BASE_URL}/auth/wx-login', json=data, timeout=15, ) if response.status_code == 200: result = response.json() # 写入 session(Web 端用) session['user'] = result.get('user') session['access_token'] = result.get('access_token') session['refresh_token'] = result.get('refresh_token') session.modified = True return jsonify({'success': True, 'data': result}) else: detail = response.json().get('detail', '微信登录失败') return jsonify({'success': False, 'message': detail}), response.status_code except Exception as e: logger.exception("微信登录代理异常") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/logout') def logout(): session.clear() flash('已退出登录', 'success') return redirect(url_for('login')) @app.route('/dashboard') @login_required def dashboard(): try: response = api_request( 'GET', f'{API_BASE_URL}/api/v1/accounts', ) data = response.json() accounts = data.get('data', []) if data.get('success') else [] except requests.RequestException: accounts = [] flash('加载账号列表失败', 'warning') return render_template('dashboard.html', accounts=accounts, user=session.get('user')) @app.route('/accounts/new') @login_required def add_account(): return render_template('add_account.html') WEIBO_HEADERS = { 'User-Agent': ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ), 'Referer': 'https://weibo.com/', 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Connection': 'keep-alive', } def _parse_jsonp(text): """Strip JSONP wrapper and return parsed dict, or None.""" m = re.search(r'\((.*)\)', text, re.DOTALL) if m: return json.loads(m.group(1)) # Maybe it's already plain JSON try: return json.loads(text) except (json.JSONDecodeError, ValueError): return None @app.route('/api/weibo/qrcode/generate', methods=['POST']) @login_required def generate_weibo_qrcode(): """ 生成微博扫码登录二维码。 调用 https://login.sina.com.cn/sso/qrcode/image 获取 qrid + 二维码图片。 """ try: qr_api_url = 'https://login.sina.com.cn/sso/qrcode/image' params = { 'entry': 'weibo', 'size': '180', 'callback': f'STK_{int(time.time() * 1000)}', } resp = requests.get(qr_api_url, params=params, headers=WEIBO_HEADERS, timeout=10) logger.debug(f"qrcode/image status={resp.status_code} body={resp.text[:300]}") data = _parse_jsonp(resp.text) if not data or data.get('retcode') != 20000000: return jsonify({'success': False, 'error': '生成二维码失败'}), 500 qr_data = data.get('data', {}) qrid = qr_data.get('qrid') qr_image_url = qr_data.get('image', '') if not qrid or not qr_image_url: return jsonify({'success': False, 'error': '二维码数据不完整'}), 500 if qr_image_url.startswith('//'): qr_image_url = 'https:' + qr_image_url # 存储二维码状态 if 'weibo_qrcodes' not in session: session['weibo_qrcodes'] = {} session['weibo_qrcodes'][qrid] = { 'status': 'waiting', 'created_at': str(datetime.now()), } session.modified = True return jsonify({ 'success': True, 'qrid': qrid, 'qr_image': qr_image_url, 'expires_in': 180, }) except Exception as e: logger.exception("生成二维码异常") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/weibo/qrcode/check/', methods=['GET']) @login_required def check_weibo_qrcode(qrid): """ 轮询微博扫码状态。 微博 check 接口必须用 JSONP(带 callback),否则返回非 JSON 内容。 成功时 retcode=20000001 会携带 alt 跳转 URL。 用 requests.Session 跟踪 alt 的完整 SSO 重定向链来收集 Cookie。 """ try: qrcodes = session.get('weibo_qrcodes', {}) if qrid not in qrcodes: return jsonify({'status': 'expired'}) # 如果之前已经成功处理过,直接返回缓存结果 qr_info = qrcodes[qrid] if qr_info.get('status') == 'success': return jsonify({ 'status': 'success', 'weibo_uid': qr_info.get('weibo_uid'), 'screen_name': qr_info.get('screen_name'), }) # ---- 1. 调用 check 接口(JSONP 模式)---- check_url = 'https://login.sina.com.cn/sso/qrcode/check' params = { 'entry': 'weibo', 'qrid': qrid, 'callback': f'STK_{int(time.time() * 1000)}', } resp = requests.get(check_url, params=params, headers=WEIBO_HEADERS, timeout=10) logger.info(f"qrcode/check status={resp.status_code} body={resp.text[:500]}") data = _parse_jsonp(resp.text) if not data: logger.error(f"无法解析 check 响应: {resp.text[:300]}") return jsonify({'status': 'error', 'error': '解析响应失败'}) retcode = data.get('retcode') logger.info(f"check retcode={retcode}") # 等待扫码(50114001 = "未使用") if retcode == 50114001: return jsonify({'status': 'waiting'}) # 已扫码,等待确认 if retcode in (50050001, 50114002): return jsonify({'status': 'scanned'}) # 二维码过期 if retcode == 50050002: return jsonify({'status': 'expired'}) # 取消授权 if retcode == 50050004: return jsonify({'status': 'cancelled'}) # 50114004 = 二维码已被消费(重复轮询),不带 alt if retcode == 50114004: if qr_info.get('status') == 'success': return jsonify({ 'status': 'success', 'weibo_uid': qr_info.get('weibo_uid'), 'screen_name': qr_info.get('screen_name'), }) return jsonify({'status': 'error', 'error': '二维码已失效,请重新生成'}) # ---- 2. 登录成功 ---- # retcode=20000000 + data.alt 存在 = 扫码确认成功 # retcode=20000001 = 旧版成功状态 # alt 是一个 token(如 "ALT-xxx"),不是 URL,需要拼接成 SSO 登录 URL alt_token = '' nested = data.get('data') if isinstance(nested, dict): alt_token = nested.get('alt', '') if retcode == 20000000 and not alt_token: # 20000000 无 alt = 正常的等待扫码状态 return jsonify({'status': 'waiting'}) if not alt_token: if retcode in (20000001, 50114003): alt_token = data.get('alt', '') else: logger.warning(f"未知 retcode: {retcode}, data: {data}") return jsonify({'status': 'waiting'}) if not alt_token: return jsonify({'status': 'error', 'error': '微博未返回登录凭证,请重新扫码'}) logger.info(f"获取到 alt token: {alt_token}") # 将 alt token 拼接成完整的 SSO 登录 URL alt_url = ( f"https://login.sina.com.cn/sso/login.php" f"?entry=weibo&returntype=TEXT&crossdomain=1&cdult=3" f"&domain=weibo.com&alt={alt_token}&savestate=30" f"&callback=STK_{int(time.time() * 1000)}" ) logger.info(f"构造 SSO URL: {alt_url}") # ---- 3. 执行 SSO 登录,收集 Cookie 和用户信息 ---- cookie_str, uid, nick = _execute_sso_login(alt_url) if not cookie_str: return jsonify({ 'status': 'error', 'error': 'Cookie 获取失败,请重新扫码', }) screen_name = nick or f'用户{uid}' # 存储结果到 session(防止重复轮询时丢失) session['weibo_qrcodes'][qrid]['status'] = 'success' session['weibo_qrcodes'][qrid]['cookie'] = cookie_str session['weibo_qrcodes'][qrid]['weibo_uid'] = uid session['weibo_qrcodes'][qrid]['screen_name'] = screen_name session.modified = True logger.info(f"check 成功: qrid={qrid}, uid={uid}, cookie长度={len(cookie_str)}, session已写入") return jsonify({ 'status': 'success', 'weibo_uid': uid, 'screen_name': screen_name, }) except Exception as e: logger.exception("检查二维码状态异常") return jsonify({'status': 'error', 'error': str(e)}) def _execute_sso_login(sso_url): """ 执行微博 SSO 登录流程,收集 Cookie 并提取用户信息。 流程: 1. GET sso_url → 返回 JSONP,包含: - uid, nick(用户信息,直接可用) - crossDomainUrlList(跨域种 cookie 的 URL) - Set-Cookie: SUB, SUBP, ALF 等 2. 逐个访问 crossDomainUrlList 中的 URL(种跨域 cookie) 3. 汇总所有 cookie Returns: (cookie_str, uid, nick) or (None, None, None) """ sso_session = requests.Session() sso_session.headers.update({ 'User-Agent': WEIBO_HEADERS['User-Agent'], 'Referer': 'https://weibo.com/', 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', }) try: # Step 1: 访问 SSO 登录 URL resp = sso_session.get(sso_url, allow_redirects=True, timeout=15) logger.info(f"SSO login status={resp.status_code}, cookies={len(sso_session.cookies)}") # 解析 JSONP 响应 sso_data = _parse_jsonp(resp.text) uid = '' nick = '' cross_urls = [] if sso_data: # 直接从 SSO 响应中提取用户信息 uid = str(sso_data.get('uid', '')) nick = sso_data.get('nick', '') cross_urls = sso_data.get('crossDomainUrlList', []) logger.info(f"SSO 响应: uid={uid}, nick={nick}, crossDomainUrls={len(cross_urls)}") else: logger.warning(f"无法解析 SSO 响应: {resp.text[:500]}") # Step 2: 逐个访问跨域 URL 种 cookie for url in cross_urls: if not isinstance(url, str) or not url.startswith('http'): continue try: logger.debug(f"访问跨域 URL: {url[:120]}") sso_session.get(url, allow_redirects=True, timeout=10) except Exception as e: logger.debug(f"跨域 URL 访问失败: {e}") # Step 3: 只提取 weibo.com 域名的 Cookie(签到 API 只需要这些) weibo_com_cookies = {} for cookie in sso_session.cookies: if cookie.domain and 'weibo.com' in cookie.domain: weibo_com_cookies[cookie.name] = cookie.value cookie_str = '; '.join(f'{k}={v}' for k, v in weibo_com_cookies.items()) logger.info(f"weibo.com Cookie ({len(weibo_com_cookies)} 个): {list(weibo_com_cookies.keys())}") if not cookie_str or 'SUB' not in weibo_com_cookies: logger.error(f"Cookie 不完整,缺少 SUB。获取到: {list(weibo_com_cookies.keys())}") return None, None, None if not uid: logger.warning("SSO 响应中没有 uid,尝试从 API 获取...") uid, nick = _fetch_weibo_user_info(sso_session) if not uid: logger.error("无法获取用户 uid") return None, None, None return cookie_str, uid, nick except Exception as e: logger.exception(f"SSO 登录流程失败: {e}") return None, None, None def _fetch_weibo_user_info(sso_session): """用已登录的 session 获取当前用户 uid 和昵称(PC 端接口)。""" try: resp = sso_session.get( 'https://weibo.com/ajax/profile/info', headers={ 'User-Agent': WEIBO_HEADERS['User-Agent'], 'Referer': 'https://weibo.com/', 'Accept': '*/*', }, timeout=10, ) data = resp.json() if data.get('ok') == 1: user = data.get('data', {}).get('user', {}) uid = user.get('idstr', '') screen_name = user.get('screen_name', f'用户{uid}') if uid: logger.info(f"weibo.com/ajax/profile/info: uid={uid}, name={screen_name}") return uid, screen_name except Exception as e: logger.warning(f"weibo.com/ajax/profile/info 失败: {e}") return None, None @app.route('/api/weibo/qrcode/add-account', methods=['POST']) @login_required def add_account_from_qrcode(): """从扫码结果添加账号""" try: data = request.json qrid = data.get('qrid') remark = data.get('remark', '') qrcodes = session.get('weibo_qrcodes', {}) qr_info = qrcodes.get(qrid) logger.info(f"add-account: qrid={qrid}, qrcodes keys={list(qrcodes.keys())}") logger.info(f"add-account: qr_info={json.dumps(qr_info, default=str)[:500] if qr_info else None}") if not qr_info or qr_info.get('status') != 'success': logger.error(f"add-account 失败: qr_info={qr_info}, status={qr_info.get('status') if qr_info else 'N/A'}") return jsonify({'success': False, 'message': '二维码未完成授权'}), 400 cookie = qr_info.get('cookie') weibo_uid = qr_info.get('weibo_uid') screen_name = qr_info.get('screen_name', '微博用户') if not cookie: logger.error(f"add-account: cookie 为空! qr_info keys={list(qr_info.keys())}") return jsonify({'success': False, 'message': 'Cookie 数据丢失,请重新扫码'}), 400 if not remark: remark = f"{screen_name} (扫码添加)" response = api_request( 'POST', f'{API_BASE_URL}/api/v1/accounts', json={ 'weibo_user_id': weibo_uid, 'cookie': cookie, 'remark': remark, }, ) result = response.json() if response.status_code in (200, 201) and result.get('success'): account_data = result.get('data', {}) account_id = account_data.get('id') # 扫码添加后自动触发 Cookie 验证,激活账号 if account_id: try: verify_resp = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account_id}/verify') verify_data = verify_resp.json() if verify_data.get('success') and verify_data.get('data', {}).get('cookie_valid'): logger.info(f"扫码添加后自动验证成功: account_id={account_id}") else: logger.warning(f"扫码添加后自动验证失败: {verify_data}") except Exception as e: logger.warning(f"扫码添加后自动验证异常: {e}") session['weibo_qrcodes'].pop(qrid, None) session.modified = True return jsonify({ 'success': True, 'message': '账号添加成功', 'account': account_data, }) elif response.status_code == 401: logger.error(f"add-account 后端返回 401: {response.text[:500]}") return jsonify({ 'success': False, 'message': '登录已过期,请重新登录后再试', 'need_login': True, }), 401 else: logger.error(f"add-account 后端返回失败: status={response.status_code}, body={response.text[:500]}") return jsonify({ 'success': False, 'message': result.get('message', result.get('detail', '添加账号失败')), }), 400 except Exception as e: logger.exception("添加账号异常") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/accounts/') @login_required def account_detail(account_id): try: # 获取账号详情 response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts/{account_id}') account_data = response.json() account = account_data.get('data') if account_data.get('success') else None # 获取任务列表 tasks_response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts/{account_id}/tasks') tasks_data = tasks_response.json() tasks = tasks_data.get('data', []) if tasks_data.get('success') else [] # 获取签到日志 page = request.args.get('page', 1, type=int) logs_response = api_request( 'GET', f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin-logs', params={'page': page, 'size': 20}, ) logs_data = logs_response.json() logs = logs_data.get('data', {}) if logs_data.get('success') else {} # 确保 logs 有默认结构,避免模板报错 if not isinstance(logs, dict): logs = {} logs.setdefault('items', []) logs.setdefault('total', 0) logs.setdefault('page', page) logs.setdefault('size', 20) logs.setdefault('total_pages', 0) if not account: flash('账号不存在', 'danger') return redirect(url_for('dashboard')) return render_template( 'account_detail.html', account=account, tasks=tasks, logs=logs, user=session.get('user') ) except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') return redirect(url_for('dashboard')) @app.route('/accounts//verify', methods=['POST']) @login_required def verify_account(account_id): """验证账号 Cookie 有效性""" try: response = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account_id}/verify') data = response.json() if data.get('success') and data.get('data', {}).get('cookie_valid'): flash('Cookie 验证通过,账号已激活', 'success') else: flash(data.get('message', 'Cookie 无效或已过期'), 'warning') except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') return redirect(url_for('account_detail', account_id=account_id)) @app.route('/accounts//signin', methods=['POST']) @login_required def manual_signin(account_id): """手动触发签到""" try: response = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin') data = response.json() if data.get('success'): result = data.get('data', {}) signed = result.get('signed', 0) already = result.get('already_signed', 0) failed = result.get('failed', 0) flash(f'签到完成: {signed} 成功, {already} 已签, {failed} 失败', 'success') else: flash(data.get('message', '签到失败'), 'danger') except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') return redirect(url_for('account_detail', account_id=account_id)) @app.route('/accounts//edit', methods=['GET', 'POST']) @login_required def edit_account(account_id): if request.method == 'POST': remark = request.form.get('remark') cookie = request.form.get('cookie') try: data = {'remark': remark} if cookie: data['cookie'] = cookie response = api_request( 'PUT', f'{API_BASE_URL}/api/v1/accounts/{account_id}', json=data, ) result = response.json() if response.status_code == 200 and result.get('success'): flash('账号更新成功', 'success') return redirect(url_for('account_detail', account_id=account_id)) else: flash(result.get('message', '更新账号失败'), 'danger') except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') try: response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts/{account_id}') data = response.json() account = data.get('data') if data.get('success') else None if not account: flash('账号不存在', 'danger') return redirect(url_for('dashboard')) return render_template('edit_account.html', account=account) except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') return redirect(url_for('dashboard')) @app.route('/accounts//delete', methods=['POST']) @login_required def delete_account(account_id): try: response = api_request('DELETE', f'{API_BASE_URL}/api/v1/accounts/{account_id}') data = response.json() if response.status_code == 200 and data.get('success'): flash('账号删除成功', 'success') else: flash(data.get('message', '删除账号失败'), 'danger') except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') return redirect(url_for('dashboard')) @app.route('/accounts//tasks/new', methods=['GET', 'POST']) @login_required def add_task(account_id): if request.method == 'POST': cron_expression = request.form.get('cron_expression') try: response = api_request( 'POST', f'{API_BASE_URL}/api/v1/accounts/{account_id}/tasks', json={'cron_expression': cron_expression}, ) data = response.json() if response.status_code in (200, 201) and data.get('success'): flash('任务创建成功', 'success') return redirect(url_for('account_detail', account_id=account_id)) else: flash(data.get('message', '创建任务失败'), 'danger') except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') return render_template('add_task.html', account_id=account_id) @app.route('/tasks//toggle', methods=['POST']) @login_required def toggle_task(task_id): is_enabled = request.form.get('is_enabled') == 'true' try: response = api_request( 'PUT', f'{API_BASE_URL}/api/v1/tasks/{task_id}', json={'is_enabled': not is_enabled}, ) data = response.json() if response.status_code == 200 and data.get('success'): flash('任务更新成功', 'success') else: flash(data.get('message', '更新任务失败'), 'danger') except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') account_id = request.form.get('account_id') return redirect(url_for('account_detail', account_id=account_id)) @app.route('/tasks//delete', methods=['POST']) @login_required def delete_task(task_id): account_id = request.form.get('account_id') try: response = api_request('DELETE', f'{API_BASE_URL}/api/v1/tasks/{task_id}') data = response.json() if response.status_code == 200 and data.get('success'): flash('任务删除成功', 'success') else: flash(data.get('message', '删除任务失败'), 'danger') except requests.RequestException as e: flash(f'连接错误: {str(e)}', 'danger') return redirect(url_for('account_detail', account_id=account_id)) @app.route('/api/batch/verify', methods=['POST']) @login_required def batch_verify(): """批量验证所有账号的 Cookie 有效性""" try: response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts') data = response.json() accounts = data.get('data', []) if data.get('success') else [] valid = invalid = errors = 0 for account in accounts: try: resp = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account["id"]}/verify') result = resp.json() if result.get('success') and result.get('data', {}).get('cookie_valid'): valid += 1 else: invalid += 1 except Exception: errors += 1 return jsonify({ 'success': True, 'data': {'valid': valid, 'invalid': invalid, 'errors': errors, 'total': len(accounts)}, }) except Exception as e: logger.exception("批量验证异常") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/batch/signin', methods=['POST']) @login_required def batch_signin(): """批量签到所有正常状态的账号""" try: response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts') data = response.json() accounts = data.get('data', []) if data.get('success') else [] total_signed = total_already = total_failed = 0 processed = 0 for account in accounts: if account.get('status') != 'active': continue try: resp = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account["id"]}/signin') result = resp.json() if result.get('success'): d = result.get('data', {}) total_signed += d.get('signed', 0) total_already += d.get('already_signed', 0) total_failed += d.get('failed', 0) processed += 1 except Exception as e: logger.warning(f"批量签到账号 {account['id']} 失败: {e}") total_failed += 1 return jsonify({ 'success': True, 'data': { 'total_accounts': processed, 'total_signed': total_signed, 'total_already': total_already, 'total_failed': total_failed, }, }) except Exception as e: logger.exception("批量签到异常") return jsonify({'success': False, 'message': str(e)}), 500 @app.errorhandler(404) def not_found(error): return render_template('404.html'), 404 @app.errorhandler(500) def server_error(error): return render_template('500.html'), 500 if __name__ == '__main__': # use_reloader=False 避免 Windows 终端 QuickEdit 模式导致进程挂起 app.run(debug=True, port=5000, use_reloader=False)