diff --git a/backend/task_scheduler/app/main.py b/backend/task_scheduler/app/main.py index cc1eecd..234f5f8 100644 --- a/backend/task_scheduler/app/main.py +++ b/backend/task_scheduler/app/main.py @@ -172,8 +172,25 @@ def sync_db_tasks(): logger.warning(f"无效 cron: task={task_id}, expr={cron_expr}") continue + # 提前 1 分钟触发,用于预加载超话列表,到整点再发签到请求 + orig_minute = parts[0] + orig_hour = parts[1] + try: + m = int(orig_minute) + h = int(orig_hour) if orig_hour != "*" else None + if m == 0: + early_minute = "59" + early_hour = str(h - 1) if h is not None and h > 0 else "23" if h == 0 else "*" + else: + early_minute = str(m - 1) + early_hour = orig_hour + except (ValueError, TypeError): + # 复杂 cron 表达式(如 */5),不做提前 + early_minute = orig_minute + early_hour = orig_hour + trigger = CronTrigger( - minute=parts[0], hour=parts[1], + minute=early_minute, hour=early_hour, day=parts[2], month=parts[3], day_of_week=parts[4], timezone="Asia/Shanghai", ) @@ -181,12 +198,13 @@ def sync_db_tasks(): run_signin, trigger=trigger, id=job_id, - args=[task_id, account_id], + args=[task_id, account_id, cron_expr], replace_existing=True, - misfire_grace_time=300, # 5 分钟内的 misfire 仍然执行 + misfire_grace_time=300, ) _registered_tasks[task_id] = cron_expr - logger.info(f"✅ 注册任务: task={task_id}, account={account_id}, cron={cron_expr}") + actual_cron = f"{early_minute} {early_hour} {parts[2]} {parts[3]} {parts[4]}" + logger.info(f"✅ 注册任务: task={task_id}, account={account_id}, 用户cron={cron_expr}, 实际触发={actual_cron}") except Exception as e: logger.error(f"注册任务失败: task={task_id}, error={e}") @@ -227,24 +245,111 @@ async def _load_tasks_from_db(): # =============== 签到入口 =============== -def run_signin(task_id: str, account_id: str): +def run_signin(task_id: str, account_id: str, cron_expr: str = ""): """APScheduler 调用的签到入口(同步函数,内部跑 async)。""" - logger.info(f"🎯 开始签到: task={task_id}, account={account_id}") + logger.info(f"🎯 开始签到: task={task_id}, account={account_id}, cron={cron_expr}") + start = _time.time() try: - result = _run_async(_async_do_signin(account_id)) - logger.info(f"✅ 签到完成: task={task_id}, result={result}") + result = _run_async(asyncio.wait_for(_async_do_signin(account_id, cron_expr), timeout=300)) + elapsed = _time.time() - start + result["elapsed_seconds"] = round(elapsed, 1) + logger.info(f"✅ 签到完成: task={task_id}, 耗时={elapsed:.1f}s, result={result}") + # 签到完成后立即推送通知 + _push_signin_result(account_id, result, elapsed) + except asyncio.TimeoutError: + elapsed = _time.time() - start + logger.error(f"⏰ 签到超时(5分钟): task={task_id}, account={account_id}") + _push_signin_result(account_id, {"status": "timeout"}, elapsed) except Exception as e: + elapsed = _time.time() - start logger.error(f"❌ 签到失败: task={task_id}, error={e}") + _push_signin_result(account_id, {"status": "error", "reason": str(e)}, elapsed) -async def _async_do_signin(account_id: str): - """执行单个账号的全量超话签到。""" +def _push_signin_result(account_id: str, result: dict, elapsed: float): + """签到完成后立即推送结果到 Webhook。""" + if not WEBHOOK_URL: + return + try: + # 获取账号备注名 + remark = _run_async(_get_account_remark(account_id)) or account_id[:8] + + status = result.get("status", "") + if status == "timeout": + lines = [f"⏰ {remark} 签到超时 ({elapsed:.1f}s)"] + elif status == "error": + lines = [f"❌ {remark} 签到异常: {result.get('reason', '未知')}"] + else: + signed = result.get("signed", 0) + already = result.get("already_signed", 0) + failed = result.get("failed", 0) + total = result.get("total", 0) + details = result.get("details", []) + + lines = [ + f"🎯 {remark} 签到完成", + f"⏱ 耗时: {elapsed:.1f}s | 超话: {total} 个", + f"✅ 成功: {signed} 📌 已签: {already} ❌ 失败: {failed}", + ] + + # 签到名次明细 + if details: + lines.append("") + for d in details: + topic = d.get("topic", "") + msg = d.get("message", "") + st = d.get("status", "") + icon = "✅" if st == "success" else "📌" if st == "already_signed" else "❌" + lines.append(f" {icon} {topic}: {msg}") + + _send_webhook("\n".join(lines)) + except Exception as e: + logger.warning(f"推送签到结果失败: {e}") + + +async def _get_account_remark(account_id: str) -> str: + from sqlalchemy import select + from shared.models.account import Account + + SessionFactory, eng = _make_session() + try: + async with SessionFactory() as session: + result = await session.execute( + select(Account.remark, Account.weibo_user_id).where(Account.id == account_id) + ) + row = result.first() + return (row[0] or row[1]) if row else "" + finally: + await eng.dispose() + + +async def _async_do_signin(account_id: str, cron_expr: str = ""): + """ + 执行单个账号的全量超话签到。 + 流程:提前 1 分钟触发 → 预加载超话列表 + XSRF → 等到目标整点 → 快速签到 + """ import httpx from sqlalchemy import select from shared.models.account import Account from shared.models.signin_log import SigninLog from shared.crypto import decrypt_cookie, derive_key + # 计算目标签到时间(用户设定的 cron 对应的时间点) + target_time = None + if cron_expr: + try: + from croniter import croniter + # 从当前时间往后算下一个 cron 触发点(因为我们提前了 1 分钟) + now = datetime.now() + cron = croniter(cron_expr, now) + target_time = cron.get_next(datetime) + # 如果下次触发超过 2 分钟,说明计算有误,忽略 + if (target_time - now).total_seconds() > 120: + target_time = None + except Exception: + target_time = None + + # ---- 阶段 1: 短事务读取账号信息 ---- SessionFactory, eng = _make_session() try: async with SessionFactory() as session: @@ -263,18 +368,45 @@ async def _async_do_signin(account_id: str): await session.commit() return {"status": "failed", "reason": "cookie decryption failed"} - cookies = _parse_cookies(cookie_str) + acc_id = str(account.id) + except Exception as e: + await eng.dispose() + raise e - # 获取超话列表 - topics = await _fetch_topics(cookies) - if not topics: - return {"status": "completed", "signed": 0, "message": "no topics"} + cookies = _parse_cookies(cookie_str) - # 逐个签到 - signed = already = failed = 0 + # ---- 阶段 2: 预加载超话列表 + XSRF token ---- + try: + topics = await _fetch_topics(cookies) + if not topics: + async with SessionFactory() as session: + result = await session.execute(select(Account).where(Account.id == acc_id)) + acc = result.scalar_one_or_none() + if acc: + acc.last_checked_at = datetime.now() + await session.commit() + return {"status": "completed", "signed": 0, "message": "no topics"} + + signed = already = failed = 0 + log_entries = [] + + async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: + # 预获取 XSRF token + await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) + xsrf = client.cookies.get("XSRF-TOKEN", "") + + logger.info(f"📦 预加载完成: account={account_id}, topics={len(topics)}, xsrf={'有' if xsrf else '无'}") + + # ---- 等到目标时间再开始签到 ---- + if target_time: + wait_seconds = (target_time - datetime.now()).total_seconds() + if 0 < wait_seconds <= 90: + logger.info(f"⏳ 等待 {wait_seconds:.1f} 秒到目标时间 {target_time.strftime('%H:%M:%S')}") + await asyncio.sleep(wait_seconds) + + # ---- 快速签到(间隔 0.5 秒) ---- for topic in topics: - await asyncio.sleep(1.5) - r = await _do_single_signin(cookies, topic) + r = await _do_single_signin(client, cookies, topic, xsrf) if r["status"] == "success": s, signed = "success", signed + 1 elif r["status"] == "already_signed": @@ -282,114 +414,145 @@ async def _async_do_signin(account_id: str): else: s, failed = "failed_network", failed + 1 - session.add(SigninLog( - account_id=account.id, topic_title=topic["title"], + log_entries.append(SigninLog( + account_id=acc_id, topic_title=topic["title"], status=s, reward_info={"message": r["message"]}, signed_at=datetime.now(), )) + await asyncio.sleep(0.5) - account.last_checked_at = datetime.now() - if account.status != "active": - account.status = "active" + # ---- 阶段 3: 短事务写日志 + 更新状态 ---- + async with SessionFactory() as session: + for log in log_entries: + session.add(log) + result = await session.execute(select(Account).where(Account.id == acc_id)) + acc = result.scalar_one_or_none() + if acc: + acc.last_checked_at = datetime.now() + if acc.status != "active": + acc.status = "active" await session.commit() - logger.info( - f"签到结果: account={account_id}, " - f"signed={signed}, already={already}, failed={failed}" - ) - return {"signed": signed, "already_signed": already, "failed": failed, "total": len(topics)} + logger.info( + f"签到结果: account={account_id}, " + f"signed={signed}, already={already}, failed={failed}" + ) + return { + "signed": signed, "already_signed": already, "failed": failed, + "total": len(topics), + "details": [ + {"topic": e.topic_title, "status": e.status, + "message": (e.reward_info or {}).get("message", "")} + for e in log_entries + ], + } + + except Exception as e: + logger.error(f"签到过程异常: account={account_id}, error={e}") + return {"status": "error", "reason": str(e)} finally: await eng.dispose() async def _fetch_topics(cookies: dict) -> list: - """获取关注的超话列表。""" + """获取关注的超话列表。Cookie 失效时返回空列表。""" import httpx topics = [] - async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: - await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) - xsrf = client.cookies.get("XSRF-TOKEN", "") - headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"} - if xsrf: - headers["X-XSRF-TOKEN"] = xsrf + try: + async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: + resp = await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) + # 检测是否被重定向到访客页(Cookie 失效) + if "passport.weibo.com" in str(resp.url) or "visitor" in str(resp.url).lower(): + logger.warning("Cookie 已失效,被重定向到登录/访客页") + return [] - page = 1 - while page <= 10: - resp = await client.get( - "https://weibo.com/ajax/profile/topicContent", - params={"tabid": "231093_-_chaohua", "page": str(page)}, - headers=headers, cookies=cookies, - ) - data = resp.json() - if data.get("ok") != 1: - break - tlist = data.get("data", {}).get("list", []) - if not tlist: - break - for item in tlist: - title = item.get("topic_name", "") or item.get("title", "") - cid = "" - for field in ("oid", "scheme"): - m = re.search(r"100808[0-9a-fA-F]+", item.get(field, "")) - if m: - cid = m.group(0) - break - if title and cid: - topics.append({"title": title, "containerid": cid}) - if page >= data.get("data", {}).get("max_page", 1): - break - page += 1 + xsrf = client.cookies.get("XSRF-TOKEN", "") + headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"} + if xsrf: + headers["X-XSRF-TOKEN"] = xsrf + + page = 1 + while page <= 10: + resp = await client.get( + "https://weibo.com/ajax/profile/topicContent", + params={"tabid": "231093_-_chaohua", "page": str(page)}, + headers=headers, cookies=cookies, + ) + try: + data = resp.json() + except Exception: + logger.warning(f"超话列表响应非 JSON: {resp.text[:200]}") + break + if data.get("ok") != 1: + break + tlist = data.get("data", {}).get("list", []) + if not tlist: + break + for item in tlist: + title = item.get("topic_name", "") or item.get("title", "") + cid = "" + for field in ("oid", "scheme"): + m = re.search(r"100808[0-9a-fA-F]+", item.get(field, "")) + if m: + cid = m.group(0) + break + if title and cid: + topics.append({"title": title, "containerid": cid}) + if page >= data.get("data", {}).get("max_page", 1): + break + page += 1 + except Exception as e: + logger.error(f"获取超话列表失败: {e}") return topics -async def _do_single_signin(cookies: dict, topic: dict) -> dict: - """签到单个超话。""" - import httpx - +async def _do_single_signin(client, cookies: dict, topic: dict, xsrf: str) -> dict: + """签到单个超话(复用已有的 httpx client 和 xsrf token)。""" try: - async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: - await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) - xsrf = client.cookies.get("XSRF-TOKEN", "") - h = { - **WEIBO_HEADERS, - "Referer": f"https://weibo.com/p/{topic['containerid']}/super_index", - "X-Requested-With": "XMLHttpRequest", - } - if xsrf: - h["X-XSRF-TOKEN"] = xsrf + h = { + **WEIBO_HEADERS, + "Referer": f"https://weibo.com/p/{topic['containerid']}/super_index", + "X-Requested-With": "XMLHttpRequest", + } + if xsrf: + h["X-XSRF-TOKEN"] = xsrf - resp = await client.get( - "https://weibo.com/p/aj/general/button", - params={ - "ajwvr": "6", - "api": "http://i.huati.weibo.com/aj/super/checkin", - "texta": "签到", "textb": "已签到", - "status": "0", - "id": topic["containerid"], - "location": "page_100808_super_index", - "timezone": "GMT+0800", - "lang": "zh-cn", - "plat": "Win32", - "ua": WEIBO_HEADERS["User-Agent"], - "screen": "1920*1080", - "__rnd": str(int(_time.time() * 1000)), - }, - headers=h, cookies=cookies, - ) + resp = await client.get( + "https://weibo.com/p/aj/general/button", + params={ + "ajwvr": "6", + "api": "http://i.huati.weibo.com/aj/super/checkin", + "texta": "签到", "textb": "已签到", + "status": "0", + "id": topic["containerid"], + "location": "page_100808_super_index", + "timezone": "GMT+0800", + "lang": "zh-cn", + "plat": "Win32", + "ua": WEIBO_HEADERS["User-Agent"], + "screen": "1920*1080", + "__rnd": str(int(_time.time() * 1000)), + }, + headers=h, cookies=cookies, + ) + try: data = resp.json() - code = str(data.get("code", "")) - msg = data.get("msg", "") + except Exception: + return {"status": "failed", "message": f"非JSON响应: {resp.text[:100]}"} - if code == "100000": - tip = "" - if isinstance(data.get("data"), dict): - tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "") - return {"status": "success", "message": tip or "签到成功"} - elif code == "382004": - return {"status": "already_signed", "message": msg or f"今天已签到({code})"} - else: - return {"status": "failed", "message": f"code={code}, msg={msg}"} + code = str(data.get("code", "")) + msg = data.get("msg", "") + + if code == "100000": + tip = "" + if isinstance(data.get("data"), dict): + tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "") + return {"status": "success", "message": tip or "签到成功"} + elif code == "382004": + return {"status": "already_signed", "message": msg or f"今天已签到({code})"} + else: + return {"status": "failed", "message": f"code={code}, msg={msg}"} except Exception as e: return {"status": "failed", "message": str(e)} @@ -446,7 +609,7 @@ async def _build_daily_report() -> str: failed_net = log_map.get("failed_network", 0) total_logs = sum(log_map.values()) - # 3. 今日各账号签到明细 + # 3. 今日各账号签到明细(含名次) detail_rows = await session.execute( select( Account.remark, @@ -472,6 +635,28 @@ async def _build_daily_report() -> str: else: account_details[name]["failed"] += cnt + # 5. 今日签到名次明细(从 reward_info 提取) + rank_rows = await session.execute( + select( + Account.remark, Account.weibo_user_id, + SigninLog.topic_title, SigninLog.reward_info, + ) + .join(Account, SigninLog.account_id == Account.id) + .where(SigninLog.signed_at >= today_start) + .where(SigninLog.status == "success") + .order_by(Account.remark, SigninLog.signed_at) + ) + rank_details = [] + for remark, uid, topic, reward in rank_rows.all(): + name = remark or uid + msg = "" + if isinstance(reward, dict): + msg = reward.get("message", "") + elif isinstance(reward, str): + msg = reward + if msg: + rank_details.append({"name": name, "topic": topic, "message": msg}) + # 4. Cookie 即将失效的账号(超过 3 天未检查) stale_cutoff = now - timedelta(days=3) stale_result = await session.execute( @@ -515,6 +700,11 @@ async def _build_daily_report() -> str: for name, d in account_details.items(): lines.append(f" {name}: ✅{d['success']} 📌{d['already']} ❌{d['failed']}") + if rank_details: + lines += ["", "🏆 签到名次"] + for r in rank_details: + lines.append(f" {r['name']} - {r['topic']}: {r['message']}") + if stale_accounts: lines += ["", "⚠️ 需要关注"] for name, last in stale_accounts: