Files
weibo_signin/frontend/app.py
Jeason e514a11e62 注册码 + 管理员系统:
User 模型新增 is_admin 字段
新增 InviteCode 模型(邀请码表)
注册接口必须提供有效邀请码,使用后自动标记
管理员接口:查看所有用户、启用/禁用用户、生成/删除邀请码
前端新增管理面板页面 /admin,导航栏对管理员显示入口
注册页面新增邀请码输入框
选择性超话签到:

新增 GET /api/v1/accounts/{id}/topics 接口获取超话列表
POST /signin 接口支持 {"topic_indices": [0,1,3]} 选择性签到
新增超话选择页面 /accounts/{id}/topics,支持全选/手动勾选
账号详情页新增"选择超话签到"按钮
2026-03-17 17:05:28 +08:00

1060 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import json
import time
import uuid
import logging
import traceback
from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify
from flask_session import Session
import requests
from functools import wraps
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key')
app.config['SESSION_TYPE'] = 'filesystem'
Session(app)
API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:8000')
AUTH_BASE_URL = os.getenv('AUTH_BASE_URL', 'http://localhost:8001')
logger = logging.getLogger(__name__)
def get_headers():
"""获取请求头,包含认证令牌。如果 token 过期会自动刷新。"""
headers = {'Content-Type': 'application/json'}
if 'access_token' in session:
headers['Authorization'] = f"Bearer {session['access_token']}"
return headers
def _try_refresh_token():
"""
尝试用 refresh_token 刷新 access_token。
成功返回 True 并更新 session失败返回 False。
"""
refresh_token = session.get('refresh_token')
if not refresh_token:
logger.warning("Token 刷新失败: session 中没有 refresh_token")
return False
try:
resp = requests.post(
f'{AUTH_BASE_URL}/auth/refresh',
json={'refresh_token': refresh_token},
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
session['access_token'] = data['access_token']
session['refresh_token'] = data['refresh_token']
session.modified = True
logger.info("Token 刷新成功")
return True
else:
logger.warning(f"Token 刷新失败: HTTP {resp.status_code}, body={resp.text[:200]}")
except Exception as e:
logger.warning(f"Token 刷新异常: {e}")
return False
def api_request(method, url, **kwargs):
"""
封装 API 请求,自动处理 token 过期刷新。
如果收到 401尝试刷新 token 后重试一次。
"""
headers = kwargs.pop('headers', None) or get_headers()
token_preview = headers.get('Authorization', 'NONE')[:30]
logger.info(f"api_request: {method} {url} token={token_preview}...")
resp = requests.request(method, url, headers=headers, timeout=10, **kwargs)
if resp.status_code == 401:
logger.warning(f"api_request: 收到 401, 尝试刷新 token...")
if _try_refresh_token():
# Token 已刷新,用新 token 重试
headers['Authorization'] = f"Bearer {session['access_token']}"
logger.info(f"api_request: 刷新成功,重试请求...")
resp = requests.request(method, url, headers=headers, timeout=10, **kwargs)
else:
logger.error(f"api_request: token 刷新失败,清除 session 让用户重新登录")
# 清除无效的 token下次访问会被 login_required 拦截
session.pop('access_token', None)
session.pop('refresh_token', None)
session.pop('user', None)
session.modified = True
return resp
def login_required(f):
"""登录验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
flash('请先登录', 'warning')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/')
def index():
if 'user' in session:
return redirect(url_for('dashboard'))
return redirect(url_for('login'))
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
invite_code = request.form.get('invite_code', '').strip()
if password != confirm_password:
flash('两次输入的密码不一致', 'danger')
return redirect(url_for('register'))
if not invite_code:
flash('请输入邀请码', 'danger')
return redirect(url_for('register'))
try:
response = requests.post(
f'{AUTH_BASE_URL}/auth/register',
json={'username': username, 'email': email, 'password': password, 'invite_code': invite_code},
timeout=10
)
if response.status_code == 201:
data = response.json()
session['user'] = data['user']
session['access_token'] = data['access_token']
session['refresh_token'] = data['refresh_token']
flash('注册成功', 'success')
return redirect(url_for('dashboard'))
else:
error_data = response.json()
flash(error_data.get('detail', '注册失败'), 'danger')
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
email = request.form.get('email')
password = request.form.get('password')
try:
response = requests.post(
f'{AUTH_BASE_URL}/auth/login',
json={'email': email, 'password': password},
timeout=10
)
if response.status_code == 200:
data = response.json()
session['user'] = data['user']
session['access_token'] = data['access_token']
session['refresh_token'] = data['refresh_token']
flash('登录成功', 'success')
return redirect(url_for('dashboard'))
else:
error_data = response.json()
flash(error_data.get('detail', '登录失败'), 'danger')
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
return render_template('login.html')
@app.route('/api/auth/wx-login', methods=['POST'])
def wx_login_proxy():
"""
微信小程序登录代理。
小程序端调这个接口,转发到 auth_service 的 /auth/wx-login。
Web 端也可以用(未来微信扫码登录)。
"""
try:
data = request.json
response = requests.post(
f'{AUTH_BASE_URL}/auth/wx-login',
json=data,
timeout=15,
)
if response.status_code == 200:
result = response.json()
# 写入 sessionWeb 端用)
session['user'] = result.get('user')
session['access_token'] = result.get('access_token')
session['refresh_token'] = result.get('refresh_token')
session.modified = True
return jsonify({'success': True, 'data': result})
else:
detail = response.json().get('detail', '微信登录失败')
return jsonify({'success': False, 'message': detail}), response.status_code
except Exception as e:
logger.exception("微信登录代理异常")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/logout')
def logout():
session.clear()
flash('已退出登录', 'success')
return redirect(url_for('login'))
@app.route('/dashboard')
@login_required
def dashboard():
try:
response = api_request(
'GET',
f'{API_BASE_URL}/api/v1/accounts',
)
data = response.json()
accounts = data.get('data', []) if data.get('success') else []
except requests.RequestException:
accounts = []
flash('加载账号列表失败', 'warning')
return render_template('dashboard.html', accounts=accounts, user=session.get('user'))
@app.route('/accounts/new')
@login_required
def add_account():
return render_template('add_account.html')
WEIBO_HEADERS = {
'User-Agent': (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
),
'Referer': 'https://weibo.com/',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
}
def _parse_jsonp(text):
"""Strip JSONP wrapper and return parsed dict, or None."""
m = re.search(r'\((.*)\)', text, re.DOTALL)
if m:
return json.loads(m.group(1))
# Maybe it's already plain JSON
try:
return json.loads(text)
except (json.JSONDecodeError, ValueError):
return None
@app.route('/api/weibo/qrcode/generate', methods=['POST'])
@login_required
def generate_weibo_qrcode():
"""
生成微博扫码登录二维码。
调用 https://login.sina.com.cn/sso/qrcode/image 获取 qrid + 二维码图片。
"""
try:
qr_api_url = 'https://login.sina.com.cn/sso/qrcode/image'
params = {
'entry': 'weibo',
'size': '180',
'callback': f'STK_{int(time.time() * 1000)}',
}
resp = requests.get(qr_api_url, params=params, headers=WEIBO_HEADERS, timeout=10)
logger.debug(f"qrcode/image status={resp.status_code} body={resp.text[:300]}")
data = _parse_jsonp(resp.text)
if not data or data.get('retcode') != 20000000:
return jsonify({'success': False, 'error': '生成二维码失败'}), 500
qr_data = data.get('data', {})
qrid = qr_data.get('qrid')
qr_image_url = qr_data.get('image', '')
if not qrid or not qr_image_url:
return jsonify({'success': False, 'error': '二维码数据不完整'}), 500
if qr_image_url.startswith('//'):
qr_image_url = 'https:' + qr_image_url
# 存储二维码状态
if 'weibo_qrcodes' not in session:
session['weibo_qrcodes'] = {}
session['weibo_qrcodes'][qrid] = {
'status': 'waiting',
'created_at': str(datetime.now()),
}
session.modified = True
return jsonify({
'success': True,
'qrid': qrid,
'qr_image': qr_image_url,
'expires_in': 180,
})
except Exception as e:
logger.exception("生成二维码异常")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/weibo/qrcode/check/<qrid>', methods=['GET'])
@login_required
def check_weibo_qrcode(qrid):
"""
轮询微博扫码状态。
微博 check 接口必须用 JSONP带 callback否则返回非 JSON 内容。
成功时 retcode=20000001 会携带 alt 跳转 URL。
用 requests.Session 跟踪 alt 的完整 SSO 重定向链来收集 Cookie。
"""
try:
qrcodes = session.get('weibo_qrcodes', {})
if qrid not in qrcodes:
return jsonify({'status': 'expired'})
# 如果之前已经成功处理过,直接返回缓存结果
qr_info = qrcodes[qrid]
if qr_info.get('status') == 'success':
return jsonify({
'status': 'success',
'weibo_uid': qr_info.get('weibo_uid'),
'screen_name': qr_info.get('screen_name'),
})
# ---- 1. 调用 check 接口JSONP 模式)----
check_url = 'https://login.sina.com.cn/sso/qrcode/check'
params = {
'entry': 'weibo',
'qrid': qrid,
'callback': f'STK_{int(time.time() * 1000)}',
}
resp = requests.get(check_url, params=params, headers=WEIBO_HEADERS, timeout=10)
logger.info(f"qrcode/check status={resp.status_code} body={resp.text[:500]}")
data = _parse_jsonp(resp.text)
if not data:
logger.error(f"无法解析 check 响应: {resp.text[:300]}")
return jsonify({'status': 'error', 'error': '解析响应失败'})
retcode = data.get('retcode')
logger.info(f"check retcode={retcode}")
# 等待扫码50114001 = "未使用"
if retcode == 50114001:
return jsonify({'status': 'waiting'})
# 已扫码,等待确认
if retcode in (50050001, 50114002):
return jsonify({'status': 'scanned'})
# 二维码过期
if retcode == 50050002:
return jsonify({'status': 'expired'})
# 取消授权
if retcode == 50050004:
return jsonify({'status': 'cancelled'})
# 50114004 = 二维码已被消费(重复轮询),不带 alt
if retcode == 50114004:
if qr_info.get('status') == 'success':
return jsonify({
'status': 'success',
'weibo_uid': qr_info.get('weibo_uid'),
'screen_name': qr_info.get('screen_name'),
})
return jsonify({'status': 'error', 'error': '二维码已失效,请重新生成'})
# ---- 2. 登录成功 ----
# retcode=20000000 + data.alt 存在 = 扫码确认成功
# retcode=20000001 = 旧版成功状态
# alt 是一个 token如 "ALT-xxx"),不是 URL需要拼接成 SSO 登录 URL
alt_token = ''
nested = data.get('data')
if isinstance(nested, dict):
alt_token = nested.get('alt', '')
if retcode == 20000000 and not alt_token:
# 20000000 无 alt = 正常的等待扫码状态
return jsonify({'status': 'waiting'})
if not alt_token:
if retcode in (20000001, 50114003):
alt_token = data.get('alt', '')
else:
logger.warning(f"未知 retcode: {retcode}, data: {data}")
return jsonify({'status': 'waiting'})
if not alt_token:
return jsonify({'status': 'error', 'error': '微博未返回登录凭证,请重新扫码'})
logger.info(f"获取到 alt token: {alt_token}")
# 将 alt token 拼接成完整的 SSO 登录 URL
alt_url = (
f"https://login.sina.com.cn/sso/login.php"
f"?entry=weibo&returntype=TEXT&crossdomain=1&cdult=3"
f"&domain=weibo.com&alt={alt_token}&savestate=30"
f"&callback=STK_{int(time.time() * 1000)}"
)
logger.info(f"构造 SSO URL: {alt_url}")
# ---- 3. 执行 SSO 登录,收集 Cookie 和用户信息 ----
cookie_str, uid, nick = _execute_sso_login(alt_url)
if not cookie_str:
return jsonify({
'status': 'error',
'error': 'Cookie 获取失败,请重新扫码',
})
screen_name = nick or f'用户{uid}'
# 存储结果到 session防止重复轮询时丢失
session['weibo_qrcodes'][qrid]['status'] = 'success'
session['weibo_qrcodes'][qrid]['cookie'] = cookie_str
session['weibo_qrcodes'][qrid]['weibo_uid'] = uid
session['weibo_qrcodes'][qrid]['screen_name'] = screen_name
session.modified = True
logger.info(f"check 成功: qrid={qrid}, uid={uid}, cookie长度={len(cookie_str)}, session已写入")
return jsonify({
'status': 'success',
'weibo_uid': uid,
'screen_name': screen_name,
})
except Exception as e:
logger.exception("检查二维码状态异常")
return jsonify({'status': 'error', 'error': str(e)})
def _execute_sso_login(sso_url):
"""
执行微博 SSO 登录流程,收集 Cookie 并提取用户信息。
流程:
1. GET sso_url → 返回 JSONP包含:
- uid, nick用户信息直接可用
- crossDomainUrlList跨域种 cookie 的 URL
- Set-Cookie: SUB, SUBP, ALF 等
2. 逐个访问 crossDomainUrlList 中的 URL种跨域 cookie
3. 汇总所有 cookie
Returns: (cookie_str, uid, nick) or (None, None, None)
"""
sso_session = requests.Session()
sso_session.headers.update({
'User-Agent': WEIBO_HEADERS['User-Agent'],
'Referer': 'https://weibo.com/',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
})
try:
# Step 1: 访问 SSO 登录 URL
resp = sso_session.get(sso_url, allow_redirects=True, timeout=15)
logger.info(f"SSO login status={resp.status_code}, cookies={len(sso_session.cookies)}")
# 解析 JSONP 响应
sso_data = _parse_jsonp(resp.text)
uid = ''
nick = ''
cross_urls = []
if sso_data:
# 直接从 SSO 响应中提取用户信息
uid = str(sso_data.get('uid', ''))
nick = sso_data.get('nick', '')
cross_urls = sso_data.get('crossDomainUrlList', [])
logger.info(f"SSO 响应: uid={uid}, nick={nick}, crossDomainUrls={len(cross_urls)}")
else:
logger.warning(f"无法解析 SSO 响应: {resp.text[:500]}")
# Step 2: 逐个访问跨域 URL 种 cookie
for url in cross_urls:
if not isinstance(url, str) or not url.startswith('http'):
continue
try:
logger.debug(f"访问跨域 URL: {url[:120]}")
sso_session.get(url, allow_redirects=True, timeout=10)
except Exception as e:
logger.debug(f"跨域 URL 访问失败: {e}")
# Step 3: 只提取 weibo.com 域名的 Cookie签到 API 只需要这些)
weibo_com_cookies = {}
for cookie in sso_session.cookies:
if cookie.domain and 'weibo.com' in cookie.domain:
weibo_com_cookies[cookie.name] = cookie.value
cookie_str = '; '.join(f'{k}={v}' for k, v in weibo_com_cookies.items())
logger.info(f"weibo.com Cookie ({len(weibo_com_cookies)} 个): {list(weibo_com_cookies.keys())}")
if not cookie_str or 'SUB' not in weibo_com_cookies:
logger.error(f"Cookie 不完整,缺少 SUB。获取到: {list(weibo_com_cookies.keys())}")
return None, None, None
if not uid:
logger.warning("SSO 响应中没有 uid尝试从 API 获取...")
uid, nick = _fetch_weibo_user_info(sso_session)
if not uid:
logger.error("无法获取用户 uid")
return None, None, None
return cookie_str, uid, nick
except Exception as e:
logger.exception(f"SSO 登录流程失败: {e}")
return None, None, None
def _fetch_weibo_user_info(sso_session):
"""用已登录的 session 获取当前用户 uid 和昵称PC 端接口)。"""
try:
resp = sso_session.get(
'https://weibo.com/ajax/profile/info',
headers={
'User-Agent': WEIBO_HEADERS['User-Agent'],
'Referer': 'https://weibo.com/',
'Accept': '*/*',
},
timeout=10,
)
data = resp.json()
if data.get('ok') == 1:
user = data.get('data', {}).get('user', {})
uid = user.get('idstr', '')
screen_name = user.get('screen_name', f'用户{uid}')
if uid:
logger.info(f"weibo.com/ajax/profile/info: uid={uid}, name={screen_name}")
return uid, screen_name
except Exception as e:
logger.warning(f"weibo.com/ajax/profile/info 失败: {e}")
return None, None
@app.route('/api/weibo/qrcode/add-account', methods=['POST'])
@login_required
def add_account_from_qrcode():
"""从扫码结果添加账号"""
try:
data = request.json
qrid = data.get('qrid')
remark = data.get('remark', '')
qrcodes = session.get('weibo_qrcodes', {})
qr_info = qrcodes.get(qrid)
logger.info(f"add-account: qrid={qrid}, qrcodes keys={list(qrcodes.keys())}")
logger.info(f"add-account: qr_info={json.dumps(qr_info, default=str)[:500] if qr_info else None}")
if not qr_info or qr_info.get('status') != 'success':
logger.error(f"add-account 失败: qr_info={qr_info}, status={qr_info.get('status') if qr_info else 'N/A'}")
return jsonify({'success': False, 'message': '二维码未完成授权'}), 400
cookie = qr_info.get('cookie')
weibo_uid = qr_info.get('weibo_uid')
screen_name = qr_info.get('screen_name', '微博用户')
if not cookie:
logger.error(f"add-account: cookie 为空! qr_info keys={list(qr_info.keys())}")
return jsonify({'success': False, 'message': 'Cookie 数据丢失,请重新扫码'}), 400
if not remark:
remark = f"{screen_name} (扫码添加)"
response = api_request(
'POST',
f'{API_BASE_URL}/api/v1/accounts',
json={
'weibo_user_id': weibo_uid,
'cookie': cookie,
'remark': remark,
},
)
result = response.json()
if response.status_code in (200, 201) and result.get('success'):
account_data = result.get('data', {})
account_id = account_data.get('id')
# 扫码添加后自动触发 Cookie 验证,激活账号
if account_id:
try:
verify_resp = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account_id}/verify')
verify_data = verify_resp.json()
if verify_data.get('success') and verify_data.get('data', {}).get('cookie_valid'):
logger.info(f"扫码添加后自动验证成功: account_id={account_id}")
else:
logger.warning(f"扫码添加后自动验证失败: {verify_data}")
except Exception as e:
logger.warning(f"扫码添加后自动验证异常: {e}")
session['weibo_qrcodes'].pop(qrid, None)
session.modified = True
return jsonify({
'success': True,
'message': '账号添加成功',
'account': account_data,
})
elif response.status_code == 401:
logger.error(f"add-account 后端返回 401: {response.text[:500]}")
return jsonify({
'success': False,
'message': '登录已过期,请重新登录后再试',
'need_login': True,
}), 401
else:
logger.error(f"add-account 后端返回失败: status={response.status_code}, body={response.text[:500]}")
return jsonify({
'success': False,
'message': result.get('message', result.get('detail', '添加账号失败')),
}), 400
except Exception as e:
logger.exception("添加账号异常")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/accounts/<account_id>')
@login_required
def account_detail(account_id):
try:
# 获取账号详情
response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts/{account_id}')
account_data = response.json()
account = account_data.get('data') if account_data.get('success') else None
# 获取任务列表
tasks_response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts/{account_id}/tasks')
tasks_data = tasks_response.json()
tasks = tasks_data.get('data', []) if tasks_data.get('success') else []
# 获取签到日志
page = request.args.get('page', 1, type=int)
logs_response = api_request(
'GET',
f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin-logs',
params={'page': page, 'size': 20},
)
logs_data = logs_response.json()
logs = logs_data.get('data', {}) if logs_data.get('success') else {}
# 确保 logs 有默认结构,避免模板报错
if not isinstance(logs, dict):
logs = {}
logs.setdefault('items', [])
logs.setdefault('total', 0)
logs.setdefault('page', page)
logs.setdefault('size', 20)
logs.setdefault('total_pages', 0)
if not account:
flash('账号不存在', 'danger')
return redirect(url_for('dashboard'))
return render_template(
'account_detail.html',
account=account,
tasks=tasks,
logs=logs,
user=session.get('user')
)
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
return redirect(url_for('dashboard'))
@app.route('/accounts/<account_id>/verify', methods=['POST'])
@login_required
def verify_account(account_id):
"""验证账号 Cookie 有效性"""
try:
response = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account_id}/verify')
data = response.json()
if data.get('success') and data.get('data', {}).get('cookie_valid'):
flash('Cookie 验证通过,账号已激活', 'success')
else:
flash(data.get('message', 'Cookie 无效或已过期'), 'warning')
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
return redirect(url_for('account_detail', account_id=account_id))
@app.route('/accounts/<account_id>/signin', methods=['POST'])
@login_required
def manual_signin(account_id):
"""手动触发签到"""
try:
response = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin', json={})
data = response.json()
if data.get('success'):
result = data.get('data', {})
signed = result.get('signed', 0)
already = result.get('already_signed', 0)
failed = result.get('failed', 0)
flash(f'签到完成: {signed} 成功, {already} 已签, {failed} 失败', 'success')
else:
flash(data.get('message', '签到失败'), 'danger')
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
return redirect(url_for('account_detail', account_id=account_id))
@app.route('/accounts/<account_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_account(account_id):
if request.method == 'POST':
remark = request.form.get('remark')
cookie = request.form.get('cookie')
try:
data = {'remark': remark}
if cookie:
data['cookie'] = cookie
response = api_request(
'PUT',
f'{API_BASE_URL}/api/v1/accounts/{account_id}',
json=data,
)
result = response.json()
if response.status_code == 200 and result.get('success'):
flash('账号更新成功', 'success')
return redirect(url_for('account_detail', account_id=account_id))
else:
flash(result.get('message', '更新账号失败'), 'danger')
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
try:
response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts/{account_id}')
data = response.json()
account = data.get('data') if data.get('success') else None
if not account:
flash('账号不存在', 'danger')
return redirect(url_for('dashboard'))
return render_template('edit_account.html', account=account)
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
return redirect(url_for('dashboard'))
@app.route('/accounts/<account_id>/delete', methods=['POST'])
@login_required
def delete_account(account_id):
try:
response = api_request('DELETE', f'{API_BASE_URL}/api/v1/accounts/{account_id}')
data = response.json()
if response.status_code == 200 and data.get('success'):
flash('账号删除成功', 'success')
else:
flash(data.get('message', '删除账号失败'), 'danger')
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
return redirect(url_for('dashboard'))
@app.route('/accounts/<account_id>/tasks/new', methods=['GET', 'POST'])
@login_required
def add_task(account_id):
if request.method == 'POST':
cron_expression = request.form.get('cron_expression')
try:
response = api_request(
'POST',
f'{API_BASE_URL}/api/v1/accounts/{account_id}/tasks',
json={'cron_expression': cron_expression},
)
data = response.json()
if response.status_code in (200, 201) and data.get('success'):
flash('任务创建成功', 'success')
return redirect(url_for('account_detail', account_id=account_id))
else:
flash(data.get('message', '创建任务失败'), 'danger')
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
return render_template('add_task.html', account_id=account_id)
@app.route('/tasks/<task_id>/toggle', methods=['POST'])
@login_required
def toggle_task(task_id):
is_enabled = request.form.get('is_enabled') == 'true'
try:
response = api_request(
'PUT',
f'{API_BASE_URL}/api/v1/tasks/{task_id}',
json={'is_enabled': not is_enabled},
)
data = response.json()
if response.status_code == 200 and data.get('success'):
flash('任务更新成功', 'success')
else:
flash(data.get('message', '更新任务失败'), 'danger')
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
account_id = request.form.get('account_id')
return redirect(url_for('account_detail', account_id=account_id))
@app.route('/tasks/<task_id>/delete', methods=['POST'])
@login_required
def delete_task(task_id):
account_id = request.form.get('account_id')
try:
response = api_request('DELETE', f'{API_BASE_URL}/api/v1/tasks/{task_id}')
data = response.json()
if response.status_code == 200 and data.get('success'):
flash('任务删除成功', 'success')
else:
flash(data.get('message', '删除任务失败'), 'danger')
except requests.RequestException as e:
flash(f'连接错误: {str(e)}', 'danger')
return redirect(url_for('account_detail', account_id=account_id))
@app.route('/api/batch/verify', methods=['POST'])
@login_required
def batch_verify():
"""批量验证所有账号的 Cookie 有效性"""
try:
response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts')
data = response.json()
accounts = data.get('data', []) if data.get('success') else []
valid = invalid = errors = 0
for account in accounts:
try:
resp = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account["id"]}/verify')
result = resp.json()
if result.get('success') and result.get('data', {}).get('cookie_valid'):
valid += 1
else:
invalid += 1
except Exception:
errors += 1
return jsonify({
'success': True,
'data': {'valid': valid, 'invalid': invalid, 'errors': errors, 'total': len(accounts)},
})
except Exception as e:
logger.exception("批量验证异常")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/batch/signin', methods=['POST'])
@login_required
def batch_signin():
"""批量签到所有正常状态的账号"""
try:
response = api_request('GET', f'{API_BASE_URL}/api/v1/accounts')
data = response.json()
accounts = data.get('data', []) if data.get('success') else []
total_signed = total_already = total_failed = 0
processed = 0
for account in accounts:
if account.get('status') != 'active':
continue
try:
resp = api_request('POST', f'{API_BASE_URL}/api/v1/accounts/{account["id"]}/signin')
result = resp.json()
if result.get('success'):
d = result.get('data', {})
total_signed += d.get('signed', 0)
total_already += d.get('already_signed', 0)
total_failed += d.get('failed', 0)
processed += 1
except Exception as e:
logger.warning(f"批量签到账号 {account['id']} 失败: {e}")
total_failed += 1
return jsonify({
'success': True,
'data': {
'total_accounts': processed,
'total_signed': total_signed,
'total_already': total_already,
'total_failed': total_failed,
},
})
except Exception as e:
logger.exception("批量签到异常")
return jsonify({'success': False, 'message': str(e)}), 500
@app.errorhandler(404)
def not_found(error):
return render_template('404.html'), 404
@app.errorhandler(500)
def server_error(error):
return render_template('500.html'), 500
# ===================== Admin Routes =====================
def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
flash('请先登录', 'warning')
return redirect(url_for('login'))
if not session.get('user', {}).get('is_admin'):
flash('需要管理员权限', 'danger')
return redirect(url_for('dashboard'))
return f(*args, **kwargs)
return decorated_function
@app.route('/admin')
@admin_required
def admin_panel():
"""管理员面板"""
# 获取用户列表
try:
resp = api_request('GET', f'{AUTH_BASE_URL}/admin/users')
users = resp.json().get('data', []) if resp.status_code == 200 else []
except Exception:
users = []
# 获取邀请码列表
try:
resp = api_request('GET', f'{AUTH_BASE_URL}/admin/invite-codes')
codes = resp.json().get('data', []) if resp.status_code == 200 else []
except Exception:
codes = []
return render_template('admin.html', users=users, invite_codes=codes, user=session.get('user'))
@app.route('/admin/invite-codes/create', methods=['POST'])
@admin_required
def create_invite_code():
"""生成邀请码"""
try:
resp = api_request('POST', f'{AUTH_BASE_URL}/admin/invite-codes')
data = resp.json()
if resp.status_code == 200 and data.get('success'):
code = data['data']['code']
flash(f'邀请码已生成: {code}', 'success')
else:
flash('生成邀请码失败', 'danger')
except Exception as e:
flash(f'连接错误: {str(e)}', 'danger')
return redirect(url_for('admin_panel'))
@app.route('/admin/invite-codes/<code_id>/delete', methods=['POST'])
@admin_required
def delete_invite_code(code_id):
"""删除邀请码"""
try:
resp = api_request('DELETE', f'{AUTH_BASE_URL}/admin/invite-codes/{code_id}')
data = resp.json()
if resp.status_code == 200 and data.get('success'):
flash('邀请码已删除', 'success')
else:
flash(data.get('detail', '删除失败'), 'danger')
except Exception as e:
flash(f'连接错误: {str(e)}', 'danger')
return redirect(url_for('admin_panel'))
@app.route('/admin/users/<user_id>/toggle', methods=['POST'])
@admin_required
def toggle_user(user_id):
"""启用/禁用用户"""
try:
resp = api_request('PUT', f'{AUTH_BASE_URL}/admin/users/{user_id}/toggle')
data = resp.json()
if resp.status_code == 200 and data.get('success'):
status_text = '已启用' if data.get('is_active') else '已禁用'
flash(f'用户{status_text}', 'success')
else:
flash(data.get('detail', '操作失败'), 'danger')
except Exception as e:
flash(f'连接错误: {str(e)}', 'danger')
return redirect(url_for('admin_panel'))
# ===================== Topic Selection Signin =====================
@app.route('/accounts/<account_id>/topics')
@login_required
def account_topics(account_id):
"""获取超话列表页面,供用户勾选签到"""
try:
resp = api_request('GET', f'{API_BASE_URL}/api/v1/accounts/{account_id}/topics')
data = resp.json()
topics = data.get('data', {}).get('topics', []) if data.get('success') else []
# 获取账号信息
acc_resp = api_request('GET', f'{API_BASE_URL}/api/v1/accounts/{account_id}')
acc_data = acc_resp.json()
account = acc_data.get('data') if acc_data.get('success') else None
if not account:
flash('账号不存在', 'danger')
return redirect(url_for('dashboard'))
return render_template('topics.html', account=account, topics=topics, user=session.get('user'))
except Exception as e:
flash(f'获取超话列表失败: {str(e)}', 'danger')
return redirect(url_for('account_detail', account_id=account_id))
@app.route('/accounts/<account_id>/signin-selected', methods=['POST'])
@login_required
def signin_selected(account_id):
"""签到选中的超话"""
try:
indices = request.json.get('topic_indices', [])
resp = api_request(
'POST',
f'{API_BASE_URL}/api/v1/accounts/{account_id}/signin',
json={'topic_indices': indices},
)
data = resp.json()
if data.get('success'):
result = data.get('data', {})
return jsonify({
'success': True,
'data': result,
'message': f"签到完成: {result.get('signed', 0)} 成功, {result.get('already_signed', 0)} 已签, {result.get('failed', 0)} 失败",
})
else:
return jsonify({'success': False, 'message': data.get('message', '签到失败')}), 400
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
if __name__ == '__main__':
debug_mode = os.getenv('FLASK_DEBUG', 'True').lower() in ('true', '1', 'yes')
# use_reloader=False 避免 Windows 终端 QuickEdit 模式导致进程挂起
app.run(host='0.0.0.0', debug=debug_mode, port=5000, use_reloader=False)