""" Weibo-HotSign 定时签到任务。 核心逻辑: 1. check_and_run_due_tasks (Beat 每 5 分钟触发) → 从 DB 读取所有 enabled Task → 用 croniter 判断是否到期 → 对到期的 task 提交 execute_signin_task 2. execute_signin_task → 解密 cookie → 获取超话列表 → 逐个签到 → 写日志 """ import os import sys import re import asyncio import time as _time import logging import redis from datetime import datetime, timedelta from typing import Dict, Any, List, Optional from celery import current_task from croniter import croniter from sqlalchemy import select sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) from shared.models.base import Base from shared.models.task import Task from shared.models.account import Account from shared.models.signin_log import SigninLog from shared.crypto import decrypt_cookie, derive_key from shared.config import shared_settings from ..celery_app import celery_app from ..config import settings logger = logging.getLogger(__name__) def _make_session(): """ 每次创建新的 engine + session,避免 Celery worker fork 后 复用主进程 event loop 绑定的连接池导致 'Future attached to a different loop' 错误。 """ from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker engine_kwargs: dict = {"echo": False} if "sqlite" not in shared_settings.DATABASE_URL: engine_kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True) eng = create_async_engine(shared_settings.DATABASE_URL, **engine_kwargs) factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False) return factory, eng WEIBO_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ), "Referer": "https://weibo.com/", "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } # --------------- Redis 分布式锁 --------------- _redis_client = None def _get_redis(): global _redis_client if not shared_settings.USE_REDIS: return None if _redis_client is None: try: _redis_client = redis.from_url(shared_settings.REDIS_URL, decode_responses=True) except Exception: return None return _redis_client def _acquire_lock(key: str, ttl: int = 600) -> bool: r = _get_redis() if r: return bool(r.set(f"lock:{key}", "1", nx=True, ex=ttl)) return True # 无 Redis 时不加锁 def _release_lock(key: str): r = _get_redis() if r: r.delete(f"lock:{key}") # --------------- 辅助函数 --------------- def _parse_cookies(cookie_str: str) -> Dict[str, str]: cookies: Dict[str, str] = {} for pair in cookie_str.split(";"): pair = pair.strip() if "=" in pair: k, v = pair.split("=", 1) cookies[k.strip()] = v.strip() return cookies def _run_async(coro): """在 Celery worker 线程中运行 async 函数""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() # =============== Celery Tasks =============== @celery_app.task def check_and_run_due_tasks(): """ Beat 每分钟调用。 对每个 enabled Task,用 croniter 算出下次触发时间, 如果在 5 分钟内即将到期,则计算精确延迟后提交执行。 已经过了触发时间但在 2 分钟内的也会立即执行(容错)。 """ return _run_async(_async_check_due()) async def _async_check_due(): now = datetime.now() submitted = 0 SessionFactory, eng = _make_session() try: async with SessionFactory() as session: stmt = ( select(Task, Account) .join(Account, Task.account_id == Account.id) .where(Task.is_enabled == True) .where(Account.status.in_(["pending", "active"])) ) result = await session.execute(stmt) pairs = result.all() finally: await eng.dispose() for task, account in pairs: try: if not croniter.is_valid(task.cron_expression): logger.warning(f"无效 cron: task={task.id}, expr={task.cron_expression}") continue cron = croniter(task.cron_expression, now) # 看下次触发时间 next_fire = cron.get_next(datetime) # 也看上次触发时间(处理刚过期的情况) cron2 = croniter(task.cron_expression, now) prev_fire = cron2.get_prev(datetime) seconds_until = (next_fire - now).total_seconds() seconds_since = (now - prev_fire).total_seconds() # 情况1: 下次触发在 5 分钟内 → 延迟到精确时间执行 # 情况2: 上次触发在 2 分钟内 → 立即执行(刚好到期或略过) fire_time = None delay_seconds = 0 if seconds_until <= 300: fire_time = next_fire delay_seconds = max(0, int(seconds_until)) elif seconds_since <= 120: fire_time = prev_fire delay_seconds = 0 if fire_time is not None: lock_key = f"due:{task.id}:{fire_time.strftime('%Y%m%d%H%M')}" if _acquire_lock(lock_key, ttl=3600): if delay_seconds > 0: execute_signin_task.apply_async( args=[str(task.id), str(account.id)], countdown=delay_seconds, ) logger.info( f"📤 预约任务: task={task.id}, account={account.weibo_user_id}, " f"{delay_seconds}秒后执行 (cron={task.cron_expression})" ) else: execute_signin_task.delay(str(task.id), str(account.id)) logger.info( f"📤 立即执行: task={task.id}, account={account.weibo_user_id}, " f"cron={task.cron_expression}" ) submitted += 1 except Exception as e: logger.error(f"检查任务 {task.id} 出错: {e}") if submitted > 0: logger.info(f"✅ 本轮提交了 {submitted} 个任务") return {"checked": len(pairs), "submitted": submitted} @celery_app.task(bind=True, max_retries=2, default_retry_delay=120) def execute_signin_task(self, task_id: str, account_id: str): """执行单个账号的全量超话签到""" lock_key = f"signin:{task_id}:{account_id}" if not _acquire_lock(lock_key, ttl=600): logger.warning(f"⏭️ 签到任务正在执行中: task={task_id}") return {"status": "skipped", "reason": "already running"} try: logger.info(f"🎯 开始签到: task={task_id}, account={account_id}") result = _run_async(_async_do_signin(account_id)) logger.info(f"✅ 签到完成: task={task_id}, result={result}") return result except Exception as exc: logger.error(f"❌ 签到失败: task={task_id}, error={exc}") _release_lock(lock_key) if self.request.retries < 2: raise self.retry(exc=exc) raise finally: _release_lock(lock_key) # =============== 签到核心逻辑 =============== async def _async_do_signin(account_id: str) -> Dict[str, Any]: """直接操作数据库和微博 API 完成签到""" import httpx SessionFactory, eng = _make_session() try: async with SessionFactory() as session: stmt = select(Account).where(Account.id == account_id) result = await session.execute(stmt) account = result.scalar_one_or_none() if not account: raise Exception(f"Account {account_id} not found") if account.status not in ("pending", "active"): return {"status": "skipped", "reason": f"status={account.status}"} # 解密 cookie key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY) try: cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) except Exception: account.status = "invalid_cookie" await session.commit() return {"status": "failed", "reason": "cookie decryption failed"} cookies = _parse_cookies(cookie_str) # 获取超话列表 topics = await _fetch_topics(cookies) if not topics: return {"status": "completed", "signed": 0, "message": "no topics"} # 逐个签到 signed = already = failed = 0 for topic in topics: await asyncio.sleep(1.5) r = await _do_single_signin(cookies, topic) if r["status"] == "success": s = "success" signed += 1 elif r["status"] == "already_signed": s = "failed_already_signed" already += 1 else: s = "failed_network" failed += 1 log = SigninLog( account_id=account.id, topic_title=topic["title"], status=s, reward_info={"message": r["message"]}, signed_at=datetime.now(), ) session.add(log) account.last_checked_at = datetime.now() if account.status != "active": account.status = "active" await session.commit() logger.info( f"签到结果: account={account_id}, " f"signed={signed}, already={already}, failed={failed}" ) return { "status": "completed", "signed": signed, "already_signed": already, "failed": failed, "total_topics": len(topics), } finally: await eng.dispose() async def _fetch_topics(cookies: Dict[str, str]) -> List[dict]: """获取关注的超话列表""" import httpx topics: List[dict] = [] async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) xsrf = client.cookies.get("XSRF-TOKEN", "") headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"} if xsrf: headers["X-XSRF-TOKEN"] = xsrf page = 1 while page <= 10: resp = await client.get( "https://weibo.com/ajax/profile/topicContent", params={"tabid": "231093_-_chaohua", "page": str(page)}, headers=headers, cookies=cookies, ) data = resp.json() if data.get("ok") != 1: break topic_list = data.get("data", {}).get("list", []) if not topic_list: break for item in topic_list: title = item.get("topic_name", "") or item.get("title", "") containerid = "" for field in ("oid", "scheme"): m = re.search(r"100808[0-9a-fA-F]+", item.get(field, "")) if m: containerid = m.group(0) break if title and containerid: topics.append({"title": title, "containerid": containerid}) if page >= data.get("data", {}).get("max_page", 1): break page += 1 return topics async def _do_single_signin(cookies: Dict[str, str], topic: dict) -> dict: """签到单个超话""" import httpx try: async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) xsrf = client.cookies.get("XSRF-TOKEN", "") h = { **WEIBO_HEADERS, "Referer": f"https://weibo.com/p/{topic['containerid']}/super_index", "X-Requested-With": "XMLHttpRequest", } if xsrf: h["X-XSRF-TOKEN"] = xsrf resp = await client.get( "https://weibo.com/p/aj/general/button", params={ "ajwvr": "6", "api": "http://i.huati.weibo.com/aj/super/checkin", "texta": "签到", "textb": "已签到", "status": "0", "id": topic["containerid"], "location": "page_100808_super_index", "timezone": "GMT+0800", "lang": "zh-cn", "plat": "Win32", "ua": WEIBO_HEADERS["User-Agent"], "screen": "1920*1080", "__rnd": str(int(_time.time() * 1000)), }, headers=h, cookies=cookies, ) data = resp.json() code = str(data.get("code", "")) msg = data.get("msg", "") if code == "100000": tip = "" if isinstance(data.get("data"), dict): tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "") return {"status": "success", "message": tip or "签到成功"} elif code == "382004": return {"status": "already_signed", "message": msg or "今日已签到"} else: return {"status": "failed", "message": f"code={code}, msg={msg}"} except Exception as e: return {"status": "failed", "message": str(e)} # =============== 日志清理 =============== # 保留天数,可通过环境变量覆盖 SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30")) # 每批删除条数,避免长时间锁表 CLEANUP_BATCH_SIZE = 1000 @celery_app.task def cleanup_old_signin_logs(): """ 清理超过 N 天的签到日志。 分批删除,每批最多 CLEANUP_BATCH_SIZE 条,避免锁表。 """ logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...") return _run_async(_async_cleanup()) async def _async_cleanup(): from sqlalchemy import delete, func cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS) total_deleted = 0 SessionFactory, eng = _make_session() try: async with SessionFactory() as session: while True: # 分批删除:先查出一批 id,再按 id 删除 stmt = ( select(SigninLog.id) .where(SigninLog.signed_at < cutoff) .limit(CLEANUP_BATCH_SIZE) ) result = await session.execute(stmt) ids = [row[0] for row in result.all()] if not ids: break del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids)) await session.execute(del_stmt) await session.commit() total_deleted += len(ids) logger.info(f"🧹 已删除 {total_deleted} 条...") # 如果本批不满,说明已经删完 if len(ids) < CLEANUP_BATCH_SIZE: break finally: await eng.dispose() logger.info(f"✅ 日志清理完成,共删除 {total_deleted} 条(截止 {cutoff.strftime('%Y-%m-%d')})") return {"deleted": total_deleted, "cutoff": cutoff.isoformat()}