From c25c7662239f71342f8f8e375015e640f7cb838a Mon Sep 17 00:00:00 2001 From: Jeason <1710884619@qq.com> Date: Thu, 19 Mar 2026 10:45:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E9=83=A8=E5=88=86=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E9=80=9A=E7=9F=A5=E5=85=A5=E5=8F=A3=EF=BC=8C=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=89=8D=E7=AB=AF=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/auth_service/app/main.py | 48 +++++ backend/shared/models/__init__.py | 2 + backend/shared/models/system_config.py | 17 ++ backend/task_scheduler/app/main.py | 250 ++++++++++++++++++++++++- docker-compose.yml | 3 + frontend/app.py | 59 +++++- frontend/requirements.txt | 1 + frontend/templates/admin.html | 53 ++++++ init-db.sql | 13 ++ migrate_add_system_config.sql | 13 ++ 10 files changed, 454 insertions(+), 5 deletions(-) create mode 100644 backend/shared/models/system_config.py create mode 100644 migrate_add_system_config.sql diff --git a/backend/auth_service/app/main.py b/backend/auth_service/app/main.py index dd00dd4..529af97 100644 --- a/backend/auth_service/app/main.py +++ b/backend/auth_service/app/main.py @@ -490,3 +490,51 @@ async def admin_delete_invite_code( await db.delete(invite) await db.commit() return {"success": True} + + +# ===================== System Config Endpoints ===================== + +from shared.models.system_config import SystemConfig + + +@app.get("/admin/config") +async def admin_get_config( + admin: User = Depends(require_admin), + db: AsyncSession = Depends(get_db), +): + """获取系统配置""" + result = await db.execute(select(SystemConfig)) + rows = result.scalars().all() + config = {r.key: r.value for r in rows} + return {"success": True, "data": config} + + +@app.put("/admin/config") +async def admin_update_config( + body: dict, + admin: User = Depends(require_admin), + db: AsyncSession = Depends(get_db), +): + """更新系统配置,同时通知调度器重新加载""" + allowed_keys = {"webhook_url", "daily_report_hour", "daily_report_minute"} + for key, value in body.items(): + if key not in allowed_keys: + continue + result = await db.execute(select(SystemConfig).where(SystemConfig.key == key)) + row = result.scalar_one_or_none() + if row: + row.value = str(value) + else: + db.add(SystemConfig(key=key, value=str(value))) + await db.commit() + + # 通过 Redis 通知调度器重新加载配置 + try: + import redis.asyncio as aioredis + r = aioredis.from_url(shared_settings.REDIS_URL, decode_responses=True) + await r.publish("config_updates", "reload") + await r.close() + except Exception as e: + logger.warning(f"通知调度器失败(不影响保存): {e}") + + return {"success": True, "message": "配置已保存"} diff --git a/backend/shared/models/__init__.py b/backend/shared/models/__init__.py index cb955d1..69ae1f4 100644 --- a/backend/shared/models/__init__.py +++ b/backend/shared/models/__init__.py @@ -5,6 +5,7 @@ from .user import User, InviteCode from .account import Account from .task import Task from .signin_log import SigninLog +from .system_config import SystemConfig __all__ = [ "Base", @@ -16,4 +17,5 @@ __all__ = [ "Account", "Task", "SigninLog", + "SystemConfig", ] diff --git a/backend/shared/models/system_config.py b/backend/shared/models/system_config.py new file mode 100644 index 0000000..093fca8 --- /dev/null +++ b/backend/shared/models/system_config.py @@ -0,0 +1,17 @@ +"""SystemConfig ORM model - 系统配置键值表。""" + +from sqlalchemy import Column, DateTime, String +from sqlalchemy.sql import func + +from .base import Base + + +class SystemConfig(Base): + __tablename__ = "system_config" + + key = Column(String(64), primary_key=True) + value = Column(String(500), nullable=False, default="") + updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now()) + + def __repr__(self): + return f"" diff --git a/backend/task_scheduler/app/main.py b/backend/task_scheduler/app/main.py index 151e7a2..cc1eecd 100644 --- a/backend/task_scheduler/app/main.py +++ b/backend/task_scheduler/app/main.py @@ -44,6 +44,11 @@ _registered_tasks: dict = {} # task_id -> cron_expression SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30")) CLEANUP_BATCH_SIZE = 1000 +# Webhook 推送地址和时间(从 DB 加载,env 作为 fallback) +WEBHOOK_URL = os.getenv("WEBHOOK_URL", "") +DAILY_REPORT_HOUR = int(os.getenv("DAILY_REPORT_HOUR", "23")) +DAILY_REPORT_MINUTE = int(os.getenv("DAILY_REPORT_MINUTE", "30")) + # Redis 订阅线程是否运行 _redis_listener_running = False @@ -93,6 +98,56 @@ def _parse_cookies(cookie_str: str) -> dict: # =============== 任务同步 =============== +def load_config_from_db(): + """从 DB 加载系统配置,更新全局变量并重新调度日报任务。""" + global WEBHOOK_URL, DAILY_REPORT_HOUR, DAILY_REPORT_MINUTE + try: + config = _run_async(_fetch_config()) + new_url = config.get("webhook_url", WEBHOOK_URL) + new_hour = int(config.get("daily_report_hour", DAILY_REPORT_HOUR)) + new_minute = int(config.get("daily_report_minute", DAILY_REPORT_MINUTE)) + + changed = (new_url != WEBHOOK_URL or new_hour != DAILY_REPORT_HOUR or new_minute != DAILY_REPORT_MINUTE) + WEBHOOK_URL = new_url + DAILY_REPORT_HOUR = new_hour + DAILY_REPORT_MINUTE = new_minute + + if changed and scheduler.running: + # 重新调度日报任务 + try: + scheduler.remove_job("daily_report") + except Exception: + pass + if WEBHOOK_URL: + scheduler.add_job( + send_daily_report, + trigger=CronTrigger(hour=DAILY_REPORT_HOUR, minute=DAILY_REPORT_MINUTE, timezone="Asia/Shanghai"), + id="daily_report", + replace_existing=True, + misfire_grace_time=3600, + ) + logger.info(f"📊 日报任务已更新: {DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d}") + else: + logger.info("📊 Webhook 为空,日报任务已移除") + + logger.info(f"⚙️ 配置加载完成: webhook={'已配置' if WEBHOOK_URL else '未配置'}, 推送时间={DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d}") + except Exception as e: + logger.warning(f"从 DB 加载配置失败,使用默认值: {e}") + + +async def _fetch_config() -> dict: + from sqlalchemy import select + from shared.models.system_config import SystemConfig + + SessionFactory, eng = _make_session() + try: + async with SessionFactory() as session: + result = await session.execute(select(SystemConfig)) + return {r.key: r.value for r in result.scalars().all()} + finally: + await eng.dispose() + + def sync_db_tasks(): """从 DB 同步任务到 APScheduler。新增的加上,删除的移除,cron 变了的更新。""" try: @@ -339,6 +394,171 @@ async def _do_single_signin(cookies: dict, topic: dict) -> dict: return {"status": "failed", "message": str(e)} +# =============== Webhook 每日报告 =============== + +def send_daily_report(): + """每日签到结果 + 账号状态汇总,推送到 Webhook。""" + if not WEBHOOK_URL: + logger.info("⚠️ WEBHOOK_URL 未配置,跳过每日报告推送") + return + logger.info("📊 开始生成每日报告...") + try: + report = _run_async(_build_daily_report()) + _send_webhook(report) + logger.info("✅ 每日报告推送成功") + except Exception as e: + logger.error(f"❌ 每日报告推送失败: {e}") + + +async def _build_daily_report() -> str: + """从 DB 汇总今日签到数据和账号状态。""" + from sqlalchemy import select, func as sa_func + from shared.models.account import Account + from shared.models.signin_log import SigninLog + from shared.models.user import User + + today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + now = datetime.now() + + SessionFactory, eng = _make_session() + try: + async with SessionFactory() as session: + # 1. 账号总览 + acc_status = await session.execute( + select(Account.status, sa_func.count()) + .group_by(Account.status) + ) + status_map = {row[0]: row[1] for row in acc_status.all()} + total_accounts = sum(status_map.values()) + active = status_map.get("active", 0) + pending = status_map.get("pending", 0) + invalid = status_map.get("invalid_cookie", 0) + + # 2. 今日签到统计 + log_stats = await session.execute( + select(SigninLog.status, sa_func.count()) + .where(SigninLog.signed_at >= today_start) + .group_by(SigninLog.status) + ) + log_map = {row[0]: row[1] for row in log_stats.all()} + success = log_map.get("success", 0) + already = log_map.get("failed_already_signed", 0) + failed_net = log_map.get("failed_network", 0) + total_logs = sum(log_map.values()) + + # 3. 今日各账号签到明细 + detail_rows = await session.execute( + select( + Account.remark, + Account.weibo_user_id, + SigninLog.status, + sa_func.count(), + ) + .join(Account, SigninLog.account_id == Account.id) + .where(SigninLog.signed_at >= today_start) + .group_by(Account.id, Account.remark, Account.weibo_user_id, SigninLog.status) + .order_by(Account.remark) + ) + # 按账号聚合 + account_details = {} + for remark, uid, st, cnt in detail_rows.all(): + name = remark or uid + if name not in account_details: + account_details[name] = {"success": 0, "already": 0, "failed": 0} + if st == "success": + account_details[name]["success"] += cnt + elif st == "failed_already_signed": + account_details[name]["already"] += cnt + else: + account_details[name]["failed"] += cnt + + # 4. Cookie 即将失效的账号(超过 3 天未检查) + stale_cutoff = now - timedelta(days=3) + stale_result = await session.execute( + select(Account.remark, Account.weibo_user_id, Account.last_checked_at) + .where(Account.status == "active") + .where( + (Account.last_checked_at < stale_cutoff) | (Account.last_checked_at == None) + ) + ) + stale_accounts = [ + (row[0] or row[1], row[2].strftime("%m-%d %H:%M") if row[2] else "从未") + for row in stale_result.all() + ] + + finally: + await eng.dispose() + + # 组装报告 + lines = [ + "📊 微博超话签到日报", + f"⏰ {now.strftime('%Y-%m-%d %H:%M')}", + "━━━━━━━━━━━━━━━━━━", + "", + "📱 账号状态", + f" 总计: {total_accounts} 个", + f" ✅ 正常: {active} ⏳ 待验证: {pending} ❌ 失效: {invalid}", + ] + + if invalid > 0: + lines.append(f" ⚠️ 有 {invalid} 个账号 Cookie 已失效,请及时更新") + + lines += [ + "", + "🎯 今日签到", + f" 总计: {total_logs} 条记录", + f" ✅ 成功: {success} 📌 已签: {already} ❌ 失败: {failed_net}", + ] + + if account_details: + lines += ["", "📋 账号明细"] + for name, d in account_details.items(): + lines.append(f" {name}: ✅{d['success']} 📌{d['already']} ❌{d['failed']}") + + if stale_accounts: + lines += ["", "⚠️ 需要关注"] + for name, last in stale_accounts: + lines.append(f" {name} (上次检查: {last})") + + if total_logs == 0: + lines += ["", "💤 今日暂无签到记录"] + + return "\n".join(lines) + + +def _send_webhook(content: str): + """发送消息到 Webhook(自动适配企业微信/钉钉/飞书格式)。""" + import httpx + + url = WEBHOOK_URL + if not url: + return + + # 根据 URL 特征判断平台 + if "qyapi.weixin.qq.com" in url: + # 企业微信 + payload = {"msgtype": "markdown", "markdown": {"content": content}} + elif "oapi.dingtalk.com" in url: + # 钉钉 + payload = {"msgtype": "markdown", "markdown": {"title": "签到日报", "text": content}} + elif "open.feishu.cn" in url: + # 飞书自定义机器人 - 使用富文本消息 + # 将 Markdown 转为纯文本(飞书 text 类型不支持 Markdown) + plain = content.replace("## ", "").replace("### ", "\n").replace("**", "").replace("> ", "") + payload = {"msg_type": "text", "content": {"text": plain}} + else: + # 通用 JSON(兼容自定义 Webhook) + payload = {"text": content, "markdown": content} + + resp = httpx.post(url, json=payload, timeout=15) + if resp.status_code != 200: + logger.warning(f"Webhook 响应异常: status={resp.status_code}, body={resp.text[:200]}") + else: + resp_data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {} + if resp_data.get("code", 0) != 0 and resp_data.get("StatusCode", 0) != 0: + logger.warning(f"Webhook 业务异常: {resp.text[:300]}") + + # =============== 日志清理 =============== def cleanup_old_signin_logs(): @@ -402,18 +622,23 @@ def _start_redis_listener(): def _listen(): global _redis_listener_running _redis_listener_running = True - logger.info("📡 Redis 订阅线程启动,监听 task_updates 频道") + logger.info("📡 Redis 订阅线程启动,监听 task_updates / config_updates 频道") while _redis_listener_running: try: r = redis.from_url(shared_settings.REDIS_URL, decode_responses=True) pubsub = r.pubsub() - pubsub.subscribe("task_updates") + pubsub.subscribe("task_updates", "config_updates") for message in pubsub.listen(): if not _redis_listener_running: break if message["type"] == "message": - logger.info(f"📡 收到任务变更通知: {message['data'][:200]}") - sync_db_tasks() + channel = message.get("channel", "") + if channel == "config_updates": + logger.info("📡 收到配置变更通知,重新加载...") + load_config_from_db() + else: + logger.info(f"📡 收到任务变更通知: {message['data'][:200]}") + sync_db_tasks() pubsub.close() r.close() except Exception as e: @@ -446,6 +671,9 @@ if __name__ == "__main__": # 启动调度器 scheduler.start() + # 从 DB 加载配置(Webhook 地址、推送时间等) + load_config_from_db() + # 首次同步 DB 任务 sync_db_tasks() @@ -470,6 +698,20 @@ if __name__ == "__main__": misfire_grace_time=3600, ) + # 每天定时推送签到日报到 Webhook(如果 load_config_from_db 已调度则跳过) + if WEBHOOK_URL and not scheduler.get_job("daily_report"): + scheduler.add_job( + send_daily_report, + trigger=CronTrigger(hour=DAILY_REPORT_HOUR, minute=DAILY_REPORT_MINUTE, timezone="Asia/Shanghai"), + id="daily_report", + replace_existing=True, + misfire_grace_time=3600, + ) + if WEBHOOK_URL: + logger.info(f"📊 每日报告: 每天 {DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d} 推送到 Webhook") + else: + logger.info("⚠️ Webhook 未配置,每日报告推送已禁用(可在管理面板设置)") + logger.info("📋 调度器已启动,等待任务触发...") logger.info(f"📋 日志清理: 每天 03:00, 保留 {SIGNIN_LOG_RETAIN_DAYS} 天") diff --git a/docker-compose.yml b/docker-compose.yml index 1f3f3a2..296f544 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -88,6 +88,9 @@ services: restart: unless-stopped environment: <<: *db-env + WEBHOOK_URL: "https://open.feishu.cn/open-apis/bot/v2/hook/ba78bd75-baa3-4f14-990c-ae5a2b2d272a" + DAILY_REPORT_HOUR: "23" # 每日报告推送时间(小时),默认 23 点 + SIGNIN_LOG_RETAIN_DAYS: "30" depends_on: - api-service networks: diff --git a/frontend/app.py b/frontend/app.py index 8733131..752d79c 100644 --- a/frontend/app.py +++ b/frontend/app.py @@ -966,7 +966,64 @@ def admin_panel(): except Exception: codes = [] - return render_template('admin.html', users=users, invite_codes=codes, user=session.get('user')) + # 获取系统配置 + try: + resp = api_request('GET', f'{AUTH_BASE_URL}/admin/config') + config = resp.json().get('data', {}) if resp.status_code == 200 else {} + except Exception: + config = {} + + return render_template('admin.html', users=users, invite_codes=codes, config=config, user=session.get('user')) + + +@app.route('/admin/config/save', methods=['POST']) +@admin_required +def save_config(): + """保存系统配置""" + try: + config_data = { + 'webhook_url': request.form.get('webhook_url', '').strip(), + 'daily_report_hour': request.form.get('daily_report_hour', '23').strip(), + 'daily_report_minute': request.form.get('daily_report_minute', '30').strip(), + } + resp = api_request('PUT', f'{AUTH_BASE_URL}/admin/config', json=config_data) + data = resp.json() + if resp.status_code == 200 and data.get('success'): + flash('配置已保存,调度器将自动重新加载', 'success') + else: + flash(data.get('message', '保存失败'), 'danger') + except Exception as e: + flash(f'连接错误: {str(e)}', 'danger') + return redirect(url_for('admin_panel')) + + +@app.route('/admin/webhook/test', methods=['POST']) +@admin_required +def test_webhook(): + """测试 Webhook 推送""" + try: + webhook_url = request.form.get('webhook_url', '').strip() + if not webhook_url: + return jsonify({'success': False, 'message': 'Webhook 地址为空'}), 400 + + import httpx + # 飞书格式 + if 'open.feishu.cn' in webhook_url: + payload = {"msg_type": "text", "content": {"text": "🔔 微博超话签到系统 Webhook 测试\n如果你看到这条消息,说明推送配置正确。"}} + elif 'qyapi.weixin.qq.com' in webhook_url: + payload = {"msgtype": "text", "text": {"content": "🔔 微博超话签到系统 Webhook 测试\n如果你看到这条消息,说明推送配置正确。"}} + elif 'oapi.dingtalk.com' in webhook_url: + payload = {"msgtype": "text", "text": {"content": "🔔 微博超话签到系统 Webhook 测试\n如果你看到这条消息,说明推送配置正确。"}} + else: + payload = {"text": "🔔 微博超话签到系统 Webhook 测试"} + + resp = httpx.post(webhook_url, json=payload, timeout=10) + if resp.status_code == 200: + return jsonify({'success': True, 'message': '测试消息已发送'}) + else: + return jsonify({'success': False, 'message': f'推送失败: HTTP {resp.status_code}'}), 400 + except Exception as e: + return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/admin/invite-codes/create', methods=['POST']) diff --git a/frontend/requirements.txt b/frontend/requirements.txt index 690a077..7f98fe2 100644 --- a/frontend/requirements.txt +++ b/frontend/requirements.txt @@ -4,3 +4,4 @@ requests==2.31.0 python-dotenv==1.0.0 Werkzeug==3.0.1 qrcode[pil]==7.4.2 +httpx==0.25.2 diff --git a/frontend/templates/admin.html b/frontend/templates/admin.html index 3960f8b..ab179ce 100644 --- a/frontend/templates/admin.html +++ b/frontend/templates/admin.html @@ -111,5 +111,58 @@ {% endfor %} + + +
+
🔔 消息推送设置
+
+
+ + +
支持飞书、企业微信、钉钉自定义机器人
+
+
+
+ + +
+
+ + +
+
+
+ + +
+
+
+ + + {% endblock %} diff --git a/init-db.sql b/init-db.sql index aae3446..5f7d5bc 100644 --- a/init-db.sql +++ b/init-db.sql @@ -74,3 +74,16 @@ CREATE TABLE IF NOT EXISTS invite_codes ( INDEX idx_invite_codes_code (code), INDEX idx_invite_codes_is_used (is_used) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- System config table (key-value) +CREATE TABLE IF NOT EXISTS system_config ( + `key` VARCHAR(64) PRIMARY KEY, + `value` VARCHAR(500) NOT NULL DEFAULT '', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- 默认配置 +INSERT IGNORE INTO system_config (`key`, `value`) VALUES + ('webhook_url', ''), + ('daily_report_hour', '23'), + ('daily_report_minute', '30'); diff --git a/migrate_add_system_config.sql b/migrate_add_system_config.sql new file mode 100644 index 0000000..1b9ed35 --- /dev/null +++ b/migrate_add_system_config.sql @@ -0,0 +1,13 @@ +-- 添加 system_config 表(已有数据库执行此脚本) +-- 用法: mysql -u weibo -p weibo_hotsign < migrate_add_system_config.sql + +CREATE TABLE IF NOT EXISTS system_config ( + `key` VARCHAR(64) PRIMARY KEY, + `value` VARCHAR(500) NOT NULL DEFAULT '', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +INSERT IGNORE INTO system_config (`key`, `value`) VALUES + ('webhook_url', 'https://open.feishu.cn/open-apis/bot/v2/hook/ba78bd75-baa3-4f14-990c-ae5a2b2d272a'), + ('daily_report_hour', '23'), + ('daily_report_minute', '30');