From 07097091fe854915134af57992178a4159142e4b Mon Sep 17 00:00:00 2001 From: Jeason <1710884619@qq.com> Date: Wed, 18 Mar 2026 13:52:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/task_scheduler/Dockerfile | 4 +- backend/task_scheduler/app/celery_app.py | 55 +-- backend/task_scheduler/app/main.py | 442 +++++++++++++++++ .../task_scheduler/app/tasks/signin_tasks.py | 455 +----------------- backend/task_scheduler/requirements.txt | 8 +- docker-compose.yml | 2 +- 6 files changed, 450 insertions(+), 516 deletions(-) create mode 100644 backend/task_scheduler/app/main.py diff --git a/backend/task_scheduler/Dockerfile b/backend/task_scheduler/Dockerfile index c6e3d1e..13a463f 100644 --- a/backend/task_scheduler/Dockerfile +++ b/backend/task_scheduler/Dockerfile @@ -23,5 +23,5 @@ RUN groupadd -r appuser && useradd -r -g appuser appuser ENV PYTHONPATH=/app USER appuser -# 同时启动 Celery Worker + Beat -CMD ["celery", "-A", "task_scheduler.app.celery_app:celery_app", "worker", "--beat", "--loglevel=info", "--concurrency=4"] +# APScheduler 调度器 +CMD ["python", "-m", "task_scheduler.app.main"] diff --git a/backend/task_scheduler/app/celery_app.py b/backend/task_scheduler/app/celery_app.py index 47f0cab..3f5fcf9 100644 --- a/backend/task_scheduler/app/celery_app.py +++ b/backend/task_scheduler/app/celery_app.py @@ -1,54 +1 @@ -""" -Weibo-HotSign Task Scheduler Service -Celery app + Beat configuration. - -策略:每 5 分钟运行一次 check_and_run_due_tasks, -该任务从数据库读取所有 enabled 的 Task,用 croniter 判断 -当前时间是否在 cron 窗口内(±3 分钟),如果是则提交签到。 -这样无需动态修改 beat_schedule,简单可靠。 -""" - -import os -import sys -import logging - -from celery import Celery -from celery.schedules import crontab - -# 确保 shared 模块可导入 -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) - -from .config import settings - -logger = logging.getLogger(__name__) - -# Create Celery app -celery_app = Celery( - "weibo_scheduler", - broker=settings.CELERY_BROKER_URL, - backend=settings.CELERY_RESULT_BACKEND, - include=["task_scheduler.app.tasks.signin_tasks"], -) - -celery_app.conf.update( - task_serializer="json", - accept_content=["json"], - result_serializer="json", - timezone="Asia/Shanghai", - enable_utc=False, - beat_schedule_filename="/tmp/celerybeat-schedule", - beat_max_loop_interval=60, - beat_schedule={ - "check-due-tasks": { - "task": "task_scheduler.app.tasks.signin_tasks.check_and_run_due_tasks", - "schedule": 60.0, # 每分钟检查一次(轻量查询,只在到期前 5 分钟才真正提交) - }, - "cleanup-old-logs": { - "task": "task_scheduler.app.tasks.signin_tasks.cleanup_old_signin_logs", - "schedule": crontab(hour=3, minute=0), # 每天凌晨 3 点 - }, - }, -) - -# 导入 task 模块以注册 -from .tasks import signin_tasks # noqa: F401, E402 +# 此文件已废弃,调度器改用 APScheduler,入口见 main.py diff --git a/backend/task_scheduler/app/main.py b/backend/task_scheduler/app/main.py new file mode 100644 index 0000000..cb2e269 --- /dev/null +++ b/backend/task_scheduler/app/main.py @@ -0,0 +1,442 @@ +""" +Weibo-HotSign 定时任务调度器 (APScheduler) + +启动时从 DB 加载所有 enabled 的 Task,按 cron 精确注册。 +每 5 分钟重新同步一次 DB(处理新增/删除/修改的任务)。 +签到逻辑直接操作数据库和微博 API,不依赖其他服务。 +""" + +import os +import sys +import re +import asyncio +import logging +import signal +import time as _time +from datetime import datetime, timedelta + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.jobstores.memory import MemoryJobStore + +# 确保 shared 模块可导入 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) + +from shared.config import shared_settings + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger("scheduler") + +scheduler = BackgroundScheduler( + jobstores={"default": MemoryJobStore()}, + timezone="Asia/Shanghai", +) + +# 已注册的任务 ID 集合,用于同步时对比 +_registered_tasks: dict = {} # task_id -> cron_expression + +# 日志保留天数 +SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30")) +CLEANUP_BATCH_SIZE = 1000 + +WEIBO_HEADERS = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + ), + "Referer": "https://weibo.com/", + "Accept": "*/*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", +} + + +def _make_session(): + """每次创建独立的 engine + session,避免 event loop 冲突。""" + from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine + from sqlalchemy.orm import sessionmaker + + kwargs = {"echo": False} + if "sqlite" not in shared_settings.DATABASE_URL: + kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True) + eng = create_async_engine(shared_settings.DATABASE_URL, **kwargs) + factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False) + return factory, eng + + +def _run_async(coro): + """在新 event loop 中运行 async 函数。""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +def _parse_cookies(cookie_str: str) -> dict: + cookies = {} + for pair in cookie_str.split(";"): + pair = pair.strip() + if "=" in pair: + k, v = pair.split("=", 1) + cookies[k.strip()] = v.strip() + return cookies + + +# =============== 任务同步 =============== + +def sync_db_tasks(): + """从 DB 同步任务到 APScheduler。新增的加上,删除的移除,cron 变了的更新。""" + try: + tasks = _run_async(_load_tasks_from_db()) + except Exception as e: + logger.error(f"同步任务失败: {e}") + return + + db_task_ids = set() + for task_id, account_id, cron_expr in tasks: + db_task_ids.add(task_id) + job_id = f"signin_{task_id}" + + # 已存在且 cron 没变 → 跳过 + if task_id in _registered_tasks and _registered_tasks[task_id] == cron_expr: + continue + + # 新增或 cron 变了 → 添加/替换 + try: + parts = cron_expr.split() + if len(parts) != 5: + logger.warning(f"无效 cron: task={task_id}, expr={cron_expr}") + continue + + trigger = CronTrigger( + minute=parts[0], hour=parts[1], + day=parts[2], month=parts[3], day_of_week=parts[4], + timezone="Asia/Shanghai", + ) + scheduler.add_job( + run_signin, + trigger=trigger, + id=job_id, + args=[task_id, account_id], + replace_existing=True, + misfire_grace_time=300, # 5 分钟内的 misfire 仍然执行 + ) + _registered_tasks[task_id] = cron_expr + logger.info(f"✅ 注册任务: task={task_id}, account={account_id}, cron={cron_expr}") + except Exception as e: + logger.error(f"注册任务失败: task={task_id}, error={e}") + + # 移除 DB 中已删除/禁用的任务 + removed = set(_registered_tasks.keys()) - db_task_ids + for task_id in removed: + job_id = f"signin_{task_id}" + try: + scheduler.remove_job(job_id) + logger.info(f"🗑️ 移除任务: task={task_id}") + except Exception: + pass + _registered_tasks.pop(task_id, None) + + logger.info(f"📋 任务同步完成: 活跃 {len(db_task_ids)} 个, 已注册 {len(_registered_tasks)} 个") + + +async def _load_tasks_from_db(): + """从 DB 读取所有 enabled 且账号 active/pending 的任务。""" + from sqlalchemy import select + from shared.models.task import Task + from shared.models.account import Account + + SessionFactory, eng = _make_session() + try: + async with SessionFactory() as session: + stmt = ( + select(Task.id, Task.account_id, Task.cron_expression) + .join(Account, Task.account_id == Account.id) + .where(Task.is_enabled == True) + .where(Account.status.in_(["pending", "active"])) + ) + result = await session.execute(stmt) + return [(str(r[0]), str(r[1]), r[2]) for r in result.all()] + finally: + await eng.dispose() + + +# =============== 签到入口 =============== + +def run_signin(task_id: str, account_id: str): + """APScheduler 调用的签到入口(同步函数,内部跑 async)。""" + logger.info(f"🎯 开始签到: task={task_id}, account={account_id}") + try: + result = _run_async(_async_do_signin(account_id)) + logger.info(f"✅ 签到完成: task={task_id}, result={result}") + except Exception as e: + logger.error(f"❌ 签到失败: task={task_id}, error={e}") + + +async def _async_do_signin(account_id: str): + """执行单个账号的全量超话签到。""" + import httpx + from sqlalchemy import select + from shared.models.account import Account + from shared.models.signin_log import SigninLog + from shared.crypto import decrypt_cookie, derive_key + + SessionFactory, eng = _make_session() + try: + async with SessionFactory() as session: + result = await session.execute(select(Account).where(Account.id == account_id)) + account = result.scalar_one_or_none() + if not account: + return {"status": "error", "reason": "account not found"} + if account.status not in ("pending", "active"): + return {"status": "skipped", "reason": f"status={account.status}"} + + key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY) + try: + cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) + except Exception: + account.status = "invalid_cookie" + await session.commit() + return {"status": "failed", "reason": "cookie decryption failed"} + + cookies = _parse_cookies(cookie_str) + + # 获取超话列表 + topics = await _fetch_topics(cookies) + if not topics: + return {"status": "completed", "signed": 0, "message": "no topics"} + + # 逐个签到 + signed = already = failed = 0 + for topic in topics: + await asyncio.sleep(1.5) + r = await _do_single_signin(cookies, topic) + if r["status"] == "success": + s, signed = "success", signed + 1 + elif r["status"] == "already_signed": + s, already = "failed_already_signed", already + 1 + else: + s, failed = "failed_network", failed + 1 + + session.add(SigninLog( + account_id=account.id, topic_title=topic["title"], + status=s, reward_info={"message": r["message"]}, + signed_at=datetime.now(), + )) + + account.last_checked_at = datetime.now() + if account.status != "active": + account.status = "active" + await session.commit() + + logger.info( + f"签到结果: account={account_id}, " + f"signed={signed}, already={already}, failed={failed}" + ) + return {"signed": signed, "already_signed": already, "failed": failed, "total": len(topics)} + finally: + await eng.dispose() + + +async def _fetch_topics(cookies: dict) -> list: + """获取关注的超话列表。""" + import httpx + + topics = [] + async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: + await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) + xsrf = client.cookies.get("XSRF-TOKEN", "") + headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"} + if xsrf: + headers["X-XSRF-TOKEN"] = xsrf + + page = 1 + while page <= 10: + resp = await client.get( + "https://weibo.com/ajax/profile/topicContent", + params={"tabid": "231093_-_chaohua", "page": str(page)}, + headers=headers, cookies=cookies, + ) + data = resp.json() + if data.get("ok") != 1: + break + tlist = data.get("data", {}).get("list", []) + if not tlist: + break + for item in tlist: + title = item.get("topic_name", "") or item.get("title", "") + cid = "" + for field in ("oid", "scheme"): + m = re.search(r"100808[0-9a-fA-F]+", item.get(field, "")) + if m: + cid = m.group(0) + break + if title and cid: + topics.append({"title": title, "containerid": cid}) + if page >= data.get("data", {}).get("max_page", 1): + break + page += 1 + return topics + + +async def _do_single_signin(cookies: dict, topic: dict) -> dict: + """签到单个超话。""" + import httpx + + try: + async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: + await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) + xsrf = client.cookies.get("XSRF-TOKEN", "") + h = { + **WEIBO_HEADERS, + "Referer": f"https://weibo.com/p/{topic['containerid']}/super_index", + "X-Requested-With": "XMLHttpRequest", + } + if xsrf: + h["X-XSRF-TOKEN"] = xsrf + + resp = await client.get( + "https://weibo.com/p/aj/general/button", + params={ + "ajwvr": "6", + "api": "http://i.huati.weibo.com/aj/super/checkin", + "texta": "签到", "textb": "已签到", + "status": "0", + "id": topic["containerid"], + "location": "page_100808_super_index", + "timezone": "GMT+0800", + "lang": "zh-cn", + "plat": "Win32", + "ua": WEIBO_HEADERS["User-Agent"], + "screen": "1920*1080", + "__rnd": str(int(_time.time() * 1000)), + }, + headers=h, cookies=cookies, + ) + data = resp.json() + code = str(data.get("code", "")) + msg = data.get("msg", "") + + if code == "100000": + tip = "" + if isinstance(data.get("data"), dict): + tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "") + return {"status": "success", "message": tip or "签到成功"} + elif code == "382004": + return {"status": "already_signed", "message": msg or f"今天已签到({code})"} + else: + return {"status": "failed", "message": f"code={code}, msg={msg}"} + except Exception as e: + return {"status": "failed", "message": str(e)} + + +# =============== 日志清理 =============== + +def cleanup_old_signin_logs(): + """清理超过 N 天的签到日志,分批删除避免锁表。""" + logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...") + try: + result = _run_async(_async_cleanup()) + logger.info(f"✅ 日志清理完成: {result}") + except Exception as e: + logger.error(f"❌ 日志清理失败: {e}") + + +async def _async_cleanup(): + from sqlalchemy import select, delete + from shared.models.signin_log import SigninLog + + cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS) + total_deleted = 0 + + SessionFactory, eng = _make_session() + try: + async with SessionFactory() as session: + while True: + stmt = ( + select(SigninLog.id) + .where(SigninLog.signed_at < cutoff) + .limit(CLEANUP_BATCH_SIZE) + ) + result = await session.execute(stmt) + ids = [row[0] for row in result.all()] + + if not ids: + break + + del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids)) + await session.execute(del_stmt) + await session.commit() + total_deleted += len(ids) + logger.info(f"🧹 已删除 {total_deleted} 条...") + + if len(ids) < CLEANUP_BATCH_SIZE: + break + finally: + await eng.dispose() + + return {"deleted": total_deleted, "cutoff": cutoff.isoformat()} + + +# =============== 启动入口 =============== + +def _shutdown(signum, frame): + logger.info(f"收到信号 {signum},正在关闭调度器...") + scheduler.shutdown(wait=False) + sys.exit(0) + + +if __name__ == "__main__": + logger.info("=" * 60) + logger.info("🚀 Weibo-HotSign 定时调度器启动 (APScheduler)") + logger.info("=" * 60) + + # 注册信号处理 + signal.signal(signal.SIGTERM, _shutdown) + signal.signal(signal.SIGINT, _shutdown) + + # 启动调度器 + scheduler.start() + + # 首次同步 DB 任务 + sync_db_tasks() + + # 每 5 分钟重新同步 DB(处理新增/删除/修改的任务) + scheduler.add_job( + sync_db_tasks, + trigger=IntervalTrigger(minutes=5), + id="sync_db_tasks", + replace_existing=True, + misfire_grace_time=60, + ) + + # 每天凌晨 3 点清理过期签到日志 + scheduler.add_job( + cleanup_old_signin_logs, + trigger=CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"), + id="cleanup_old_signin_logs", + replace_existing=True, + misfire_grace_time=3600, + ) + + logger.info("📋 调度器已启动,等待任务触发...") + logger.info(f"📋 日志清理: 每天 03:00, 保留 {SIGNIN_LOG_RETAIN_DAYS} 天") + + # 打印当前所有已注册的 job + jobs = scheduler.get_jobs() + for job in jobs: + logger.info(f" 📌 {job.id}: next_run={job.next_run_time}") + + # 主线程保持运行 + try: + while True: + _time.sleep(1) + except (KeyboardInterrupt, SystemExit): + logger.info("调度器关闭中...") + scheduler.shutdown(wait=False) diff --git a/backend/task_scheduler/app/tasks/signin_tasks.py b/backend/task_scheduler/app/tasks/signin_tasks.py index 4a4618d..609ab0d 100644 --- a/backend/task_scheduler/app/tasks/signin_tasks.py +++ b/backend/task_scheduler/app/tasks/signin_tasks.py @@ -1,454 +1 @@ -""" -Weibo-HotSign 定时签到任务。 - -核心逻辑: -1. check_and_run_due_tasks (Beat 每 5 分钟触发) - → 从 DB 读取所有 enabled Task - → 用 croniter 判断是否到期 - → 对到期的 task 提交 execute_signin_task -2. execute_signin_task - → 解密 cookie → 获取超话列表 → 逐个签到 → 写日志 -""" - -import os -import sys -import re -import asyncio -import time as _time -import logging -import redis -from datetime import datetime, timedelta -from typing import Dict, Any, List, Optional - -from celery import current_task -from croniter import croniter -from sqlalchemy import select - -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) - -from shared.models.base import Base -from shared.models.task import Task -from shared.models.account import Account -from shared.models.signin_log import SigninLog -from shared.crypto import decrypt_cookie, derive_key -from shared.config import shared_settings - -from ..celery_app import celery_app -from ..config import settings - -logger = logging.getLogger(__name__) - - -def _make_session(): - """ - 每次创建新的 engine + session,避免 Celery worker fork 后 - 复用主进程 event loop 绑定的连接池导致 'Future attached to a different loop' 错误。 - """ - from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine - from sqlalchemy.orm import sessionmaker - - engine_kwargs: dict = {"echo": False} - if "sqlite" not in shared_settings.DATABASE_URL: - engine_kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True) - - eng = create_async_engine(shared_settings.DATABASE_URL, **engine_kwargs) - factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False) - return factory, eng - -WEIBO_HEADERS = { - "User-Agent": ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" - ), - "Referer": "https://weibo.com/", - "Accept": "*/*", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", -} - -# --------------- Redis 分布式锁 --------------- - -_redis_client = None - - -def _get_redis(): - global _redis_client - if not shared_settings.USE_REDIS: - return None - if _redis_client is None: - try: - _redis_client = redis.from_url(shared_settings.REDIS_URL, decode_responses=True) - except Exception: - return None - return _redis_client - - -def _acquire_lock(key: str, ttl: int = 600) -> bool: - r = _get_redis() - if r: - return bool(r.set(f"lock:{key}", "1", nx=True, ex=ttl)) - return True # 无 Redis 时不加锁 - - -def _release_lock(key: str): - r = _get_redis() - if r: - r.delete(f"lock:{key}") - - -# --------------- 辅助函数 --------------- - -def _parse_cookies(cookie_str: str) -> Dict[str, str]: - cookies: Dict[str, str] = {} - for pair in cookie_str.split(";"): - pair = pair.strip() - if "=" in pair: - k, v = pair.split("=", 1) - cookies[k.strip()] = v.strip() - return cookies - - -def _run_async(coro): - """在 Celery worker 线程中运行 async 函数""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(coro) - finally: - loop.close() - - -# =============== Celery Tasks =============== - -@celery_app.task -def check_and_run_due_tasks(): - """ - Beat 每分钟调用。 - 对每个 enabled Task,用 croniter 算出下次触发时间, - 如果在 5 分钟内即将到期,则计算精确延迟后提交执行。 - 已经过了触发时间但在 2 分钟内的也会立即执行(容错)。 - """ - return _run_async(_async_check_due()) - - -async def _async_check_due(): - now = datetime.now() - submitted = 0 - - SessionFactory, eng = _make_session() - try: - async with SessionFactory() as session: - stmt = ( - select(Task, Account) - .join(Account, Task.account_id == Account.id) - .where(Task.is_enabled == True) - .where(Account.status.in_(["pending", "active"])) - ) - result = await session.execute(stmt) - pairs = result.all() - finally: - await eng.dispose() - - for task, account in pairs: - try: - if not croniter.is_valid(task.cron_expression): - logger.warning(f"无效 cron: task={task.id}, expr={task.cron_expression}") - continue - - cron = croniter(task.cron_expression, now) - - # 看下次触发时间 - next_fire = cron.get_next(datetime) - # 也看上次触发时间(处理刚过期的情况) - cron2 = croniter(task.cron_expression, now) - prev_fire = cron2.get_prev(datetime) - - seconds_until = (next_fire - now).total_seconds() - seconds_since = (now - prev_fire).total_seconds() - - # 情况1: 下次触发在 5 分钟内 → 延迟到精确时间执行 - # 情况2: 上次触发在 2 分钟内 → 立即执行(刚好到期或略过) - fire_time = None - delay_seconds = 0 - - if seconds_until <= 300: - fire_time = next_fire - delay_seconds = max(0, int(seconds_until)) - elif seconds_since <= 120: - fire_time = prev_fire - delay_seconds = 0 - - if fire_time is not None: - lock_key = f"due:{task.id}:{fire_time.strftime('%Y%m%d%H%M')}" - if _acquire_lock(lock_key, ttl=3600): - if delay_seconds > 0: - execute_signin_task.apply_async( - args=[str(task.id), str(account.id)], - countdown=delay_seconds, - ) - logger.info( - f"📤 预约任务: task={task.id}, account={account.weibo_user_id}, " - f"{delay_seconds}秒后执行 (cron={task.cron_expression})" - ) - else: - execute_signin_task.delay(str(task.id), str(account.id)) - logger.info( - f"📤 立即执行: task={task.id}, account={account.weibo_user_id}, " - f"cron={task.cron_expression}" - ) - submitted += 1 - except Exception as e: - logger.error(f"检查任务 {task.id} 出错: {e}") - - if submitted > 0: - logger.info(f"✅ 本轮提交了 {submitted} 个任务") - return {"checked": len(pairs), "submitted": submitted} - - -@celery_app.task(bind=True, max_retries=2, default_retry_delay=120) -def execute_signin_task(self, task_id: str, account_id: str): - """执行单个账号的全量超话签到""" - lock_key = f"signin:{task_id}:{account_id}" - if not _acquire_lock(lock_key, ttl=600): - logger.warning(f"⏭️ 签到任务正在执行中: task={task_id}") - return {"status": "skipped", "reason": "already running"} - - try: - logger.info(f"🎯 开始签到: task={task_id}, account={account_id}") - result = _run_async(_async_do_signin(account_id)) - logger.info(f"✅ 签到完成: task={task_id}, result={result}") - return result - except Exception as exc: - logger.error(f"❌ 签到失败: task={task_id}, error={exc}") - _release_lock(lock_key) - if self.request.retries < 2: - raise self.retry(exc=exc) - raise - finally: - _release_lock(lock_key) - - -# =============== 签到核心逻辑 =============== - -async def _async_do_signin(account_id: str) -> Dict[str, Any]: - """直接操作数据库和微博 API 完成签到""" - import httpx - - SessionFactory, eng = _make_session() - try: - async with SessionFactory() as session: - stmt = select(Account).where(Account.id == account_id) - result = await session.execute(stmt) - account = result.scalar_one_or_none() - if not account: - raise Exception(f"Account {account_id} not found") - - if account.status not in ("pending", "active"): - return {"status": "skipped", "reason": f"status={account.status}"} - - # 解密 cookie - key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY) - try: - cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) - except Exception: - account.status = "invalid_cookie" - await session.commit() - return {"status": "failed", "reason": "cookie decryption failed"} - - cookies = _parse_cookies(cookie_str) - - # 获取超话列表 - topics = await _fetch_topics(cookies) - if not topics: - return {"status": "completed", "signed": 0, "message": "no topics"} - - # 逐个签到 - signed = already = failed = 0 - for topic in topics: - await asyncio.sleep(1.5) - r = await _do_single_signin(cookies, topic) - - if r["status"] == "success": - s = "success" - signed += 1 - elif r["status"] == "already_signed": - s = "failed_already_signed" - already += 1 - else: - s = "failed_network" - failed += 1 - - log = SigninLog( - account_id=account.id, - topic_title=topic["title"], - status=s, - reward_info={"message": r["message"]}, - signed_at=datetime.now(), - ) - session.add(log) - - account.last_checked_at = datetime.now() - if account.status != "active": - account.status = "active" - await session.commit() - - logger.info( - f"签到结果: account={account_id}, " - f"signed={signed}, already={already}, failed={failed}" - ) - return { - "status": "completed", - "signed": signed, - "already_signed": already, - "failed": failed, - "total_topics": len(topics), - } - finally: - await eng.dispose() - - -async def _fetch_topics(cookies: Dict[str, str]) -> List[dict]: - """获取关注的超话列表""" - import httpx - - topics: List[dict] = [] - async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: - await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) - xsrf = client.cookies.get("XSRF-TOKEN", "") - headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"} - if xsrf: - headers["X-XSRF-TOKEN"] = xsrf - - page = 1 - while page <= 10: - resp = await client.get( - "https://weibo.com/ajax/profile/topicContent", - params={"tabid": "231093_-_chaohua", "page": str(page)}, - headers=headers, cookies=cookies, - ) - data = resp.json() - if data.get("ok") != 1: - break - topic_list = data.get("data", {}).get("list", []) - if not topic_list: - break - for item in topic_list: - title = item.get("topic_name", "") or item.get("title", "") - containerid = "" - for field in ("oid", "scheme"): - m = re.search(r"100808[0-9a-fA-F]+", item.get(field, "")) - if m: - containerid = m.group(0) - break - if title and containerid: - topics.append({"title": title, "containerid": containerid}) - if page >= data.get("data", {}).get("max_page", 1): - break - page += 1 - return topics - - -async def _do_single_signin(cookies: Dict[str, str], topic: dict) -> dict: - """签到单个超话""" - import httpx - - try: - async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: - await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) - xsrf = client.cookies.get("XSRF-TOKEN", "") - h = { - **WEIBO_HEADERS, - "Referer": f"https://weibo.com/p/{topic['containerid']}/super_index", - "X-Requested-With": "XMLHttpRequest", - } - if xsrf: - h["X-XSRF-TOKEN"] = xsrf - - resp = await client.get( - "https://weibo.com/p/aj/general/button", - params={ - "ajwvr": "6", - "api": "http://i.huati.weibo.com/aj/super/checkin", - "texta": "签到", "textb": "已签到", - "status": "0", - "id": topic["containerid"], - "location": "page_100808_super_index", - "timezone": "GMT+0800", - "lang": "zh-cn", - "plat": "Win32", - "ua": WEIBO_HEADERS["User-Agent"], - "screen": "1920*1080", - "__rnd": str(int(_time.time() * 1000)), - }, - headers=h, cookies=cookies, - ) - data = resp.json() - code = str(data.get("code", "")) - msg = data.get("msg", "") - - if code == "100000": - tip = "" - if isinstance(data.get("data"), dict): - tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "") - return {"status": "success", "message": tip or "签到成功"} - elif code == "382004": - return {"status": "already_signed", "message": msg or "今日已签到"} - else: - return {"status": "failed", "message": f"code={code}, msg={msg}"} - except Exception as e: - return {"status": "failed", "message": str(e)} - - -# =============== 日志清理 =============== - -# 保留天数,可通过环境变量覆盖 -SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30")) -# 每批删除条数,避免长时间锁表 -CLEANUP_BATCH_SIZE = 1000 - - -@celery_app.task -def cleanup_old_signin_logs(): - """ - 清理超过 N 天的签到日志。 - 分批删除,每批最多 CLEANUP_BATCH_SIZE 条,避免锁表。 - """ - logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...") - return _run_async(_async_cleanup()) - - -async def _async_cleanup(): - from sqlalchemy import delete, func - - cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS) - total_deleted = 0 - - SessionFactory, eng = _make_session() - try: - async with SessionFactory() as session: - while True: - # 分批删除:先查出一批 id,再按 id 删除 - stmt = ( - select(SigninLog.id) - .where(SigninLog.signed_at < cutoff) - .limit(CLEANUP_BATCH_SIZE) - ) - result = await session.execute(stmt) - ids = [row[0] for row in result.all()] - - if not ids: - break - - del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids)) - await session.execute(del_stmt) - await session.commit() - total_deleted += len(ids) - logger.info(f"🧹 已删除 {total_deleted} 条...") - - # 如果本批不满,说明已经删完 - if len(ids) < CLEANUP_BATCH_SIZE: - break - finally: - await eng.dispose() - - logger.info(f"✅ 日志清理完成,共删除 {total_deleted} 条(截止 {cutoff.strftime('%Y-%m-%d')})") - return {"deleted": total_deleted, "cutoff": cutoff.isoformat()} +# 此文件已废弃,签到逻辑已迁移到 task_scheduler.app.main (APScheduler) diff --git a/backend/task_scheduler/requirements.txt b/backend/task_scheduler/requirements.txt index b0b3f8b..d761282 100644 --- a/backend/task_scheduler/requirements.txt +++ b/backend/task_scheduler/requirements.txt @@ -1,6 +1,5 @@ -# Weibo-HotSign Task Scheduler Service Requirements -# Task Queue -celery==5.3.6 +# Weibo-HotSign Task Scheduler +apscheduler==3.10.4 redis==5.0.1 # Database @@ -14,11 +13,10 @@ pydantic-settings==2.0.3 # HTTP Client httpx==0.25.2 -# Cron parsing +# Cron parsing (APScheduler 内置,但显式声明) croniter==2.0.1 # Crypto (for cookie decryption) -cryptography==41.0.7 pycryptodome==3.19.0 # Utilities diff --git a/docker-compose.yml b/docker-compose.yml index 25f312e..1f3f3a2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -79,7 +79,7 @@ services: networks: - 1panel-network - # 定时任务调度器 (Celery Worker + Beat) + # 定时任务调度器 (APScheduler) task-scheduler: build: context: ./backend