""" Weibo-HotSign 定时任务调度器 (APScheduler) 启动时从 DB 加载所有 enabled 的 Task,按 cron 精确注册。 每 5 分钟重新同步一次 DB(处理新增/删除/修改的任务)。 签到逻辑直接操作数据库和微博 API,不依赖其他服务。 """ import os import sys import re import asyncio import logging import signal import time as _time from datetime import datetime, timedelta from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from apscheduler.jobstores.memory import MemoryJobStore # 确保 shared 模块可导入 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) from shared.config import shared_settings logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("scheduler") scheduler = BackgroundScheduler( jobstores={"default": MemoryJobStore()}, timezone="Asia/Shanghai", ) # 已注册的任务 ID 集合,用于同步时对比 _registered_tasks: dict = {} # task_id -> cron_expression # 日志保留天数 SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30")) CLEANUP_BATCH_SIZE = 1000 # Webhook 推送地址和时间(从 DB 加载,env 作为 fallback) WEBHOOK_URL = os.getenv("WEBHOOK_URL", "") DAILY_REPORT_HOUR = int(os.getenv("DAILY_REPORT_HOUR", "23")) DAILY_REPORT_MINUTE = int(os.getenv("DAILY_REPORT_MINUTE", "30")) # Redis 订阅线程是否运行 _redis_listener_running = False WEIBO_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ), "Referer": "https://weibo.com/", "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } def _make_session(): """每次创建独立的 engine + session,避免 event loop 冲突。""" from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker kwargs = {"echo": False} if "sqlite" not in shared_settings.DATABASE_URL: kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True) eng = create_async_engine(shared_settings.DATABASE_URL, **kwargs) factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False) return factory, eng def _run_async(coro): """在新 event loop 中运行 async 函数。""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() def _parse_cookies(cookie_str: str) -> dict: cookies = {} for pair in cookie_str.split(";"): pair = pair.strip() if "=" in pair: k, v = pair.split("=", 1) cookies[k.strip()] = v.strip() return cookies # =============== 任务同步 =============== def load_config_from_db(): """从 DB 加载系统配置,更新全局变量并重新调度日报任务。""" global WEBHOOK_URL, DAILY_REPORT_HOUR, DAILY_REPORT_MINUTE try: config = _run_async(_fetch_config()) new_url = config.get("webhook_url", WEBHOOK_URL) new_hour = int(config.get("daily_report_hour", DAILY_REPORT_HOUR)) new_minute = int(config.get("daily_report_minute", DAILY_REPORT_MINUTE)) changed = (new_url != WEBHOOK_URL or new_hour != DAILY_REPORT_HOUR or new_minute != DAILY_REPORT_MINUTE) WEBHOOK_URL = new_url DAILY_REPORT_HOUR = new_hour DAILY_REPORT_MINUTE = new_minute if changed and scheduler.running: # 重新调度日报任务 try: scheduler.remove_job("daily_report") except Exception: pass if WEBHOOK_URL: scheduler.add_job( send_daily_report, trigger=CronTrigger(hour=DAILY_REPORT_HOUR, minute=DAILY_REPORT_MINUTE, timezone="Asia/Shanghai"), id="daily_report", replace_existing=True, misfire_grace_time=3600, ) logger.info(f"📊 日报任务已更新: {DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d}") else: logger.info("📊 Webhook 为空,日报任务已移除") logger.info(f"⚙️ 配置加载完成: webhook={'已配置' if WEBHOOK_URL else '未配置'}, 推送时间={DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d}") except Exception as e: logger.warning(f"从 DB 加载配置失败,使用默认值: {e}") async def _fetch_config() -> dict: from sqlalchemy import select from shared.models.system_config import SystemConfig SessionFactory, eng = _make_session() try: async with SessionFactory() as session: result = await session.execute(select(SystemConfig)) return {r.key: r.value for r in result.scalars().all()} finally: await eng.dispose() def sync_db_tasks(): """从 DB 同步任务到 APScheduler。新增的加上,删除的移除,cron 变了的更新。""" try: tasks = _run_async(_load_tasks_from_db()) except Exception as e: logger.error(f"同步任务失败: {e}") return db_task_ids = set() for task_id, account_id, cron_expr in tasks: db_task_ids.add(task_id) job_id = f"signin_{task_id}" # 已存在且 cron 没变 → 跳过 if task_id in _registered_tasks and _registered_tasks[task_id] == cron_expr: continue # 新增或 cron 变了 → 添加/替换 try: parts = cron_expr.split() if len(parts) != 5: logger.warning(f"无效 cron: task={task_id}, expr={cron_expr}") continue # 提前 1 分钟触发,用于预加载超话列表,到整点再发签到请求 orig_minute = parts[0] orig_hour = parts[1] try: m = int(orig_minute) h = int(orig_hour) if orig_hour != "*" else None if m == 0: early_minute = "59" early_hour = str(h - 1) if h is not None and h > 0 else "23" if h == 0 else "*" else: early_minute = str(m - 1) early_hour = orig_hour except (ValueError, TypeError): # 复杂 cron 表达式(如 */5),不做提前 early_minute = orig_minute early_hour = orig_hour trigger = CronTrigger( minute=early_minute, hour=early_hour, day=parts[2], month=parts[3], day_of_week=parts[4], timezone="Asia/Shanghai", ) scheduler.add_job( run_signin, trigger=trigger, id=job_id, args=[task_id, account_id, cron_expr], replace_existing=True, misfire_grace_time=300, ) _registered_tasks[task_id] = cron_expr actual_cron = f"{early_minute} {early_hour} {parts[2]} {parts[3]} {parts[4]}" logger.info(f"✅ 注册任务: task={task_id}, account={account_id}, 用户cron={cron_expr}, 实际触发={actual_cron}") except Exception as e: logger.error(f"注册任务失败: task={task_id}, error={e}") # 移除 DB 中已删除/禁用的任务 removed = set(_registered_tasks.keys()) - db_task_ids for task_id in removed: job_id = f"signin_{task_id}" try: scheduler.remove_job(job_id) logger.info(f"🗑️ 移除任务: task={task_id}") except Exception: pass _registered_tasks.pop(task_id, None) logger.info(f"📋 任务同步完成: 活跃 {len(db_task_ids)} 个, 已注册 {len(_registered_tasks)} 个") async def _load_tasks_from_db(): """从 DB 读取所有 enabled 且账号 active/pending 的任务。""" from sqlalchemy import select from shared.models.task import Task from shared.models.account import Account SessionFactory, eng = _make_session() try: async with SessionFactory() as session: stmt = ( select(Task.id, Task.account_id, Task.cron_expression) .join(Account, Task.account_id == Account.id) .where(Task.is_enabled == True) .where(Account.status.in_(["pending", "active"])) ) result = await session.execute(stmt) return [(str(r[0]), str(r[1]), r[2]) for r in result.all()] finally: await eng.dispose() # =============== 签到入口 =============== def run_signin(task_id: str, account_id: str, cron_expr: str = ""): """APScheduler 调用的签到入口(同步函数,内部跑 async)。""" logger.info(f"🎯 开始签到: task={task_id}, account={account_id}, cron={cron_expr}") start = _time.time() try: result = _run_async(asyncio.wait_for(_async_do_signin(account_id, cron_expr), timeout=300)) elapsed = _time.time() - start result["elapsed_seconds"] = round(elapsed, 1) logger.info(f"✅ 签到完成: task={task_id}, 耗时={elapsed:.1f}s, result={result}") # 签到完成后立即推送通知 _push_signin_result(account_id, result, elapsed) except asyncio.TimeoutError: elapsed = _time.time() - start logger.error(f"⏰ 签到超时(5分钟): task={task_id}, account={account_id}") _push_signin_result(account_id, {"status": "timeout"}, elapsed) except Exception as e: elapsed = _time.time() - start logger.error(f"❌ 签到失败: task={task_id}, error={e}") _push_signin_result(account_id, {"status": "error", "reason": str(e)}, elapsed) def _push_signin_result(account_id: str, result: dict, elapsed: float): """签到完成后立即推送结果到 Webhook。""" if not WEBHOOK_URL: return try: # 获取账号备注名 remark = _run_async(_get_account_remark(account_id)) or account_id[:8] status = result.get("status", "") if status == "timeout": lines = [f"⏰ {remark} 签到超时 ({elapsed:.1f}s)"] elif status == "error": lines = [f"❌ {remark} 签到异常: {result.get('reason', '未知')}"] else: signed = result.get("signed", 0) already = result.get("already_signed", 0) failed = result.get("failed", 0) total = result.get("total", 0) details = result.get("details", []) lines = [ f"🎯 {remark} 签到完成", f"⏱ 耗时: {elapsed:.1f}s | 超话: {total} 个", f"✅ 成功: {signed} 📌 已签: {already} ❌ 失败: {failed}", ] # 签到名次明细 if details: lines.append("") for d in details: topic = d.get("topic", "") msg = d.get("message", "") st = d.get("status", "") icon = "✅" if st == "success" else "📌" if st == "already_signed" else "❌" lines.append(f" {icon} {topic}: {msg}") _send_webhook("\n".join(lines)) except Exception as e: logger.warning(f"推送签到结果失败: {e}") async def _get_account_remark(account_id: str) -> str: from sqlalchemy import select from shared.models.account import Account SessionFactory, eng = _make_session() try: async with SessionFactory() as session: result = await session.execute( select(Account.remark, Account.weibo_user_id).where(Account.id == account_id) ) row = result.first() return (row[0] or row[1]) if row else "" finally: await eng.dispose() async def _async_do_signin(account_id: str, cron_expr: str = ""): """ 执行单个账号的全量超话签到。 流程:提前 1 分钟触发 → 预加载超话列表 + XSRF → 等到目标整点 → 快速签到 """ import httpx from sqlalchemy import select from shared.models.account import Account from shared.models.signin_log import SigninLog from shared.crypto import decrypt_cookie, derive_key # 计算目标签到时间(用户设定的 cron 对应的时间点) target_time = None if cron_expr: try: from croniter import croniter # 从当前时间往后算下一个 cron 触发点(因为我们提前了 1 分钟) now = datetime.now() cron = croniter(cron_expr, now) target_time = cron.get_next(datetime) # 如果下次触发超过 2 分钟,说明计算有误,忽略 if (target_time - now).total_seconds() > 120: target_time = None except Exception: target_time = None # ---- 阶段 1: 短事务读取账号信息 ---- SessionFactory, eng = _make_session() try: async with SessionFactory() as session: result = await session.execute(select(Account).where(Account.id == account_id)) account = result.scalar_one_or_none() if not account: return {"status": "error", "reason": "account not found"} if account.status not in ("pending", "active"): return {"status": "skipped", "reason": f"status={account.status}"} key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY) try: cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) except Exception: account.status = "invalid_cookie" await session.commit() return {"status": "failed", "reason": "cookie decryption failed"} acc_id = str(account.id) acc_selected_topics = account.selected_topics # 用户选择的超话列表 except Exception as e: await eng.dispose() raise e cookies = _parse_cookies(cookie_str) # ---- 阶段 2: 预加载超话列表 + XSRF token ---- try: topics = await _fetch_topics(cookies) if not topics: async with SessionFactory() as session: result = await session.execute(select(Account).where(Account.id == acc_id)) acc = result.scalar_one_or_none() if acc: acc.last_checked_at = datetime.now() await session.commit() return {"status": "completed", "signed": 0, "message": "no topics"} # 如果用户选择了特定超话,只签选中的 if acc_selected_topics and isinstance(acc_selected_topics, list): selected_cids = {t.get("containerid") for t in acc_selected_topics if t.get("containerid")} if selected_cids: topics = [t for t in topics if t["containerid"] in selected_cids] logger.info(f"📌 按用户选择过滤: {len(topics)} 个超话 (共 {len(selected_cids)} 个已选)") if not topics: return {"status": "completed", "signed": 0, "message": "no selected topics"} signed = already = failed = 0 log_entries = [] async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: # 预获取 XSRF token await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) xsrf = client.cookies.get("XSRF-TOKEN", "") logger.info(f"📦 预加载完成: account={account_id}, topics={len(topics)}, xsrf={'有' if xsrf else '无'}") # ---- 等到目标时间再开始签到 ---- if target_time: wait_seconds = (target_time - datetime.now()).total_seconds() if 0 < wait_seconds <= 90: logger.info(f"⏳ 等待 {wait_seconds:.1f} 秒到目标时间 {target_time.strftime('%H:%M:%S')}") await asyncio.sleep(wait_seconds) # ---- 快速签到(间隔 0.5 秒) ---- for topic in topics: r = await _do_single_signin(client, cookies, topic, xsrf) if r["status"] == "success": s, signed = "success", signed + 1 elif r["status"] == "already_signed": s, already = "failed_already_signed", already + 1 else: s, failed = "failed_network", failed + 1 log_entries.append(SigninLog( account_id=acc_id, topic_title=topic["title"], status=s, reward_info={"message": r["message"]}, signed_at=datetime.now(), )) await asyncio.sleep(0.5) # ---- 阶段 3: 短事务写日志 + 更新状态 ---- async with SessionFactory() as session: for log in log_entries: session.add(log) result = await session.execute(select(Account).where(Account.id == acc_id)) acc = result.scalar_one_or_none() if acc: acc.last_checked_at = datetime.now() if acc.status != "active": acc.status = "active" await session.commit() logger.info( f"签到结果: account={account_id}, " f"signed={signed}, already={already}, failed={failed}" ) return { "signed": signed, "already_signed": already, "failed": failed, "total": len(topics), "details": [ {"topic": e.topic_title, "status": e.status, "message": (e.reward_info or {}).get("message", "")} for e in log_entries ], } except Exception as e: logger.error(f"签到过程异常: account={account_id}, error={e}") return {"status": "error", "reason": str(e)} finally: await eng.dispose() async def _fetch_topics(cookies: dict) -> list: """ 获取关注的超话列表。 带重试机制:微博凌晨可能触发风控(302 到通行证验证页),重试 2 次。 返回空列表表示获取失败(Cookie 失效或风控)。 """ import httpx max_retries = 3 for attempt in range(max_retries): topics = [] try: # 不自动跟随重定向,手动检测 302 async with httpx.AsyncClient(timeout=15, follow_redirects=False) as client: resp = await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) # 302 重定向到登录页 = Cookie 失效或风控 if resp.status_code in (301, 302): location = resp.headers.get("location", "") if "login.sina.com.cn" in location or "passport" in location: if attempt < max_retries - 1: wait = (attempt + 1) * 3 logger.warning(f"被重定向到登录页(尝试 {attempt+1}/{max_retries}),{wait}秒后重试...") await asyncio.sleep(wait) continue else: logger.warning(f"多次重试仍被重定向,Cookie 可能已失效") return [] # 其他 302(如 weibo.com → www.weibo.com),跟随一次 resp = await client.get(location, headers=WEIBO_HEADERS, cookies=cookies) xsrf = "" for c in resp.cookies.jar: if c.name == "XSRF-TOKEN": xsrf = c.value break headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"} if xsrf: headers["X-XSRF-TOKEN"] = xsrf page = 1 while page <= 10: resp = await client.get( "https://weibo.com/ajax/profile/topicContent", params={"tabid": "231093_-_chaohua", "page": str(page)}, headers=headers, cookies=cookies, ) # 超话接口也可能被 302 if resp.status_code in (301, 302): if attempt < max_retries - 1: wait = (attempt + 1) * 3 logger.warning(f"超话接口被重定向(尝试 {attempt+1}/{max_retries}),{wait}秒后重试...") await asyncio.sleep(wait) break # break inner loop, retry outer else: logger.warning("超话接口多次重定向,放弃") return [] try: data = resp.json() except Exception: logger.warning(f"超话列表响应非 JSON: {resp.text[:200]}") break if data.get("ok") != 1: break tlist = data.get("data", {}).get("list", []) if not tlist: break for item in tlist: title = item.get("topic_name", "") or item.get("title", "") cid = "" for field in ("oid", "scheme"): m = re.search(r"100808[0-9a-fA-F]+", item.get(field, "")) if m: cid = m.group(0) break if title and cid: topics.append({"title": title, "containerid": cid}) if page >= data.get("data", {}).get("max_page", 1): break page += 1 else: # while 正常结束(没有 break),说明成功 if topics: return topics # 如果有 topics 说明成功了 if topics: return topics except Exception as e: logger.error(f"获取超话列表失败(尝试 {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: await asyncio.sleep(3) return topics async def _do_single_signin(client, cookies: dict, topic: dict, xsrf: str) -> dict: """签到单个超话(复用已有的 httpx client 和 xsrf token)。""" try: h = { **WEIBO_HEADERS, "Referer": f"https://weibo.com/p/{topic['containerid']}/super_index", "X-Requested-With": "XMLHttpRequest", } if xsrf: h["X-XSRF-TOKEN"] = xsrf resp = await client.get( "https://weibo.com/p/aj/general/button", params={ "ajwvr": "6", "api": "http://i.huati.weibo.com/aj/super/checkin", "texta": "签到", "textb": "已签到", "status": "0", "id": topic["containerid"], "location": "page_100808_super_index", "timezone": "GMT+0800", "lang": "zh-cn", "plat": "Win32", "ua": WEIBO_HEADERS["User-Agent"], "screen": "1920*1080", "__rnd": str(int(_time.time() * 1000)), }, headers=h, cookies=cookies, ) try: data = resp.json() except Exception: return {"status": "failed", "message": f"非JSON响应: {resp.text[:100]}"} code = str(data.get("code", "")) msg = data.get("msg", "") if code == "100000": tip = "" if isinstance(data.get("data"), dict): tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "") return {"status": "success", "message": tip or "签到成功"} elif code == "382004": return {"status": "already_signed", "message": msg or f"今天已签到({code})"} else: return {"status": "failed", "message": f"code={code}, msg={msg}"} except Exception as e: return {"status": "failed", "message": str(e)} # =============== Webhook 每日报告 =============== def send_daily_report(): """每日签到结果 + 账号状态汇总,推送到 Webhook。""" if not WEBHOOK_URL: logger.info("⚠️ WEBHOOK_URL 未配置,跳过每日报告推送") return logger.info("📊 开始生成每日报告...") try: report = _run_async(_build_daily_report()) _send_webhook(report) logger.info("✅ 每日报告推送成功") except Exception as e: logger.error(f"❌ 每日报告推送失败: {e}") async def _build_daily_report() -> str: """从 DB 汇总今日签到数据和账号状态。""" from sqlalchemy import select, func as sa_func from shared.models.account import Account from shared.models.signin_log import SigninLog from shared.models.user import User today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) now = datetime.now() SessionFactory, eng = _make_session() try: async with SessionFactory() as session: # 1. 账号总览 acc_status = await session.execute( select(Account.status, sa_func.count()) .group_by(Account.status) ) status_map = {row[0]: row[1] for row in acc_status.all()} total_accounts = sum(status_map.values()) active = status_map.get("active", 0) pending = status_map.get("pending", 0) invalid = status_map.get("invalid_cookie", 0) # 2. 今日签到统计 log_stats = await session.execute( select(SigninLog.status, sa_func.count()) .where(SigninLog.signed_at >= today_start) .group_by(SigninLog.status) ) log_map = {row[0]: row[1] for row in log_stats.all()} success = log_map.get("success", 0) already = log_map.get("failed_already_signed", 0) failed_net = log_map.get("failed_network", 0) total_logs = sum(log_map.values()) # 3. 今日各账号签到明细(含名次) detail_rows = await session.execute( select( Account.remark, Account.weibo_user_id, SigninLog.status, sa_func.count(), ) .join(Account, SigninLog.account_id == Account.id) .where(SigninLog.signed_at >= today_start) .group_by(Account.id, Account.remark, Account.weibo_user_id, SigninLog.status) .order_by(Account.remark) ) # 按账号聚合 account_details = {} for remark, uid, st, cnt in detail_rows.all(): name = remark or uid if name not in account_details: account_details[name] = {"success": 0, "already": 0, "failed": 0} if st == "success": account_details[name]["success"] += cnt elif st == "failed_already_signed": account_details[name]["already"] += cnt else: account_details[name]["failed"] += cnt # 5. 今日签到名次明细(从 reward_info 提取) rank_rows = await session.execute( select( Account.remark, Account.weibo_user_id, SigninLog.topic_title, SigninLog.reward_info, ) .join(Account, SigninLog.account_id == Account.id) .where(SigninLog.signed_at >= today_start) .where(SigninLog.status == "success") .order_by(Account.remark, SigninLog.signed_at) ) rank_details = [] for remark, uid, topic, reward in rank_rows.all(): name = remark or uid msg = "" if isinstance(reward, dict): msg = reward.get("message", "") elif isinstance(reward, str): msg = reward if msg: rank_details.append({"name": name, "topic": topic, "message": msg}) # 4. Cookie 即将失效的账号(超过 3 天未检查) stale_cutoff = now - timedelta(days=3) stale_result = await session.execute( select(Account.remark, Account.weibo_user_id, Account.last_checked_at) .where(Account.status == "active") .where( (Account.last_checked_at < stale_cutoff) | (Account.last_checked_at == None) ) ) stale_accounts = [ (row[0] or row[1], row[2].strftime("%m-%d %H:%M") if row[2] else "从未") for row in stale_result.all() ] finally: await eng.dispose() # 组装报告 lines = [ "📊 微博超话签到日报", f"⏰ {now.strftime('%Y-%m-%d %H:%M')}", "━━━━━━━━━━━━━━━━━━", "", "📱 账号状态", f" 总计: {total_accounts} 个", f" ✅ 正常: {active} ⏳ 待验证: {pending} ❌ 失效: {invalid}", ] if invalid > 0: lines.append(f" ⚠️ 有 {invalid} 个账号 Cookie 已失效,请及时更新") lines += [ "", "🎯 今日签到", f" 总计: {total_logs} 条记录", f" ✅ 成功: {success} 📌 已签: {already} ❌ 失败: {failed_net}", ] if account_details: lines += ["", "📋 账号明细"] for name, d in account_details.items(): lines.append(f" {name}: ✅{d['success']} 📌{d['already']} ❌{d['failed']}") if rank_details: lines += ["", "🏆 签到名次"] for r in rank_details: lines.append(f" {r['name']} - {r['topic']}: {r['message']}") if stale_accounts: lines += ["", "⚠️ 需要关注"] for name, last in stale_accounts: lines.append(f" {name} (上次检查: {last})") if total_logs == 0: lines += ["", "💤 今日暂无签到记录"] return "\n".join(lines) def _send_webhook(content: str): """发送消息到 Webhook(自动适配企业微信/钉钉/飞书格式)。""" import httpx url = WEBHOOK_URL if not url: return # 根据 URL 特征判断平台 if "qyapi.weixin.qq.com" in url: # 企业微信 payload = {"msgtype": "markdown", "markdown": {"content": content}} elif "oapi.dingtalk.com" in url: # 钉钉 payload = {"msgtype": "markdown", "markdown": {"title": "签到日报", "text": content}} elif "open.feishu.cn" in url: # 飞书自定义机器人 - 使用富文本消息 # 将 Markdown 转为纯文本(飞书 text 类型不支持 Markdown) plain = content.replace("## ", "").replace("### ", "\n").replace("**", "").replace("> ", "") payload = {"msg_type": "text", "content": {"text": plain}} else: # 通用 JSON(兼容自定义 Webhook) payload = {"text": content, "markdown": content} resp = httpx.post(url, json=payload, timeout=15) if resp.status_code != 200: logger.warning(f"Webhook 响应异常: status={resp.status_code}, body={resp.text[:200]}") else: resp_data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {} if resp_data.get("code", 0) != 0 and resp_data.get("StatusCode", 0) != 0: logger.warning(f"Webhook 业务异常: {resp.text[:300]}") # =============== 日志清理 =============== def cleanup_old_signin_logs(): """清理超过 N 天的签到日志,分批删除避免锁表。""" logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...") try: result = _run_async(_async_cleanup()) logger.info(f"✅ 日志清理完成: {result}") except Exception as e: logger.error(f"❌ 日志清理失败: {e}") async def _async_cleanup(): from sqlalchemy import select, delete from shared.models.signin_log import SigninLog cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS) total_deleted = 0 SessionFactory, eng = _make_session() try: async with SessionFactory() as session: while True: stmt = ( select(SigninLog.id) .where(SigninLog.signed_at < cutoff) .limit(CLEANUP_BATCH_SIZE) ) result = await session.execute(stmt) ids = [row[0] for row in result.all()] if not ids: break del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids)) await session.execute(del_stmt) await session.commit() total_deleted += len(ids) logger.info(f"🧹 已删除 {total_deleted} 条...") if len(ids) < CLEANUP_BATCH_SIZE: break finally: await eng.dispose() return {"deleted": total_deleted, "cutoff": cutoff.isoformat()} # =============== Redis 订阅(实时同步任务变更) =============== def _start_redis_listener(): """启动 Redis pub/sub 监听线程,收到 task_updates 消息后立即同步。""" global _redis_listener_running if not shared_settings.USE_REDIS or not shared_settings.REDIS_URL: logger.info("⚠️ Redis 未启用,任务变更仅靠 5 分钟轮询同步") return import threading import redis def _listen(): global _redis_listener_running _redis_listener_running = True logger.info("📡 Redis 订阅线程启动,监听 task_updates / config_updates 频道") while _redis_listener_running: try: r = redis.from_url(shared_settings.REDIS_URL, decode_responses=True) pubsub = r.pubsub() pubsub.subscribe("task_updates", "config_updates") for message in pubsub.listen(): if not _redis_listener_running: break if message["type"] == "message": channel = message.get("channel", "") if channel == "config_updates": logger.info("📡 收到配置变更通知,重新加载...") load_config_from_db() else: logger.info(f"📡 收到任务变更通知: {message['data'][:200]}") sync_db_tasks() pubsub.close() r.close() except Exception as e: logger.warning(f"Redis 订阅异常,5 秒后重连: {e}") _time.sleep(5) t = threading.Thread(target=_listen, daemon=True, name="redis-listener") t.start() # =============== 启动入口 =============== def _shutdown(signum, frame): global _redis_listener_running logger.info(f"收到信号 {signum},正在关闭调度器...") _redis_listener_running = False scheduler.shutdown(wait=False) sys.exit(0) if __name__ == "__main__": logger.info("=" * 60) logger.info("🚀 Weibo-HotSign 定时调度器启动 (APScheduler)") logger.info("=" * 60) # 注册信号处理 signal.signal(signal.SIGTERM, _shutdown) signal.signal(signal.SIGINT, _shutdown) # 启动调度器 scheduler.start() # 从 DB 加载配置(Webhook 地址、推送时间等) load_config_from_db() # 首次同步 DB 任务 sync_db_tasks() # 启动 Redis 订阅,实时接收任务变更通知 _start_redis_listener() # 每 50 分钟重新同步 DB(兜底,正常靠 Redis 实时通知) scheduler.add_job( sync_db_tasks, trigger=IntervalTrigger(minutes=50), id="sync_db_tasks", replace_existing=True, misfire_grace_time=60, ) # 每天凌晨 3 点清理过期签到日志 scheduler.add_job( cleanup_old_signin_logs, trigger=CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"), id="cleanup_old_signin_logs", replace_existing=True, misfire_grace_time=3600, ) # 每天定时推送签到日报到 Webhook(如果 load_config_from_db 已调度则跳过) if WEBHOOK_URL and not scheduler.get_job("daily_report"): scheduler.add_job( send_daily_report, trigger=CronTrigger(hour=DAILY_REPORT_HOUR, minute=DAILY_REPORT_MINUTE, timezone="Asia/Shanghai"), id="daily_report", replace_existing=True, misfire_grace_time=3600, ) if WEBHOOK_URL: logger.info(f"📊 每日报告: 每天 {DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d} 推送到 Webhook") else: logger.info("⚠️ Webhook 未配置,每日报告推送已禁用(可在管理面板设置)") logger.info("📋 调度器已启动,等待任务触发...") logger.info(f"📋 日志清理: 每天 03:00, 保留 {SIGNIN_LOG_RETAIN_DAYS} 天") # 打印当前所有已注册的 job jobs = scheduler.get_jobs() for job in jobs: logger.info(f" 📌 {job.id}: next_run={job.next_run_time}") # 主线程保持运行 try: while True: _time.sleep(1) except (KeyboardInterrupt, SystemExit): logger.info("调度器关闭中...") scheduler.shutdown(wait=False)