From 633e4249decdb126545a1c8ecb80366ffc091f82 Mon Sep 17 00:00:00 2001 From: Jeason <1710884619@qq.com> Date: Wed, 18 Mar 2026 09:19:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E7=BB=86?= =?UTF-8?q?=E5=8C=96=EF=BC=8C=E7=AD=BE=E5=88=B0=E6=88=90=E5=8A=9F=E9=A1=B5?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/api_service/app/main.py | 11 + backend/auth_service/app/main.py | 7 + backend/task_scheduler/Dockerfile | 35 +- backend/task_scheduler/app/celery_app.py | 204 +----- backend/task_scheduler/app/config.py | 7 +- .../task_scheduler/app/tasks/signin_tasks.py | 692 ++++++++---------- backend/task_scheduler/requirements.txt | 7 + docker-compose.yml | 14 + frontend/templates/account_detail.html | 8 +- 9 files changed, 408 insertions(+), 577 deletions(-) diff --git a/backend/api_service/app/main.py b/backend/api_service/app/main.py index 489a5b3..08b595d 100644 --- a/backend/api_service/app/main.py +++ b/backend/api_service/app/main.py @@ -3,6 +3,8 @@ Weibo-HotSign API Service Main FastAPI application entry point — account management, task config, signin logs. """ +import logging + from fastapi import FastAPI, Request from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware @@ -11,6 +13,15 @@ from starlette.exceptions import HTTPException as StarletteHTTPException from shared.response import success_response, error_response from api_service.app.routers import accounts, tasks, signin_logs + +# 过滤 /health 的 access log,避免日志刷屏 +class _HealthFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + msg = record.getMessage() + return "/health" not in msg + +logging.getLogger("uvicorn.access").addFilter(_HealthFilter()) + app = FastAPI( title="Weibo-HotSign API Service", version="1.0.0", diff --git a/backend/auth_service/app/main.py b/backend/auth_service/app/main.py index 95c8078..dd00dd4 100644 --- a/backend/auth_service/app/main.py +++ b/backend/auth_service/app/main.py @@ -14,6 +14,13 @@ import logging import secrets from datetime import datetime +# 过滤 /health 的 access log,避免日志刷屏 +class _HealthFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return "/health" not in record.getMessage() + +logging.getLogger("uvicorn.access").addFilter(_HealthFilter()) + from shared.models import get_db, User, InviteCode from shared.config import shared_settings from auth_service.app.models.database import create_tables diff --git a/backend/task_scheduler/Dockerfile b/backend/task_scheduler/Dockerfile index 9404bf1..c6e3d1e 100644 --- a/backend/task_scheduler/Dockerfile +++ b/backend/task_scheduler/Dockerfile @@ -1,30 +1,27 @@ # Weibo-HotSign Task Scheduler Service Dockerfile -FROM python:3.11-slim +FROM python:3.10-slim -# Set working directory WORKDIR /app -# Install system dependencies -RUN apt-get update && apt-get install -y \ - gcc \ - default-libmysqlclient-dev \ +# 使用阿里云镜像加速 +RUN sed -i 's|deb.debian.org|mirrors.aliyun.com|g' /etc/apt/sources.list.d/debian.sources 2>/dev/null || \ + sed -i 's|deb.debian.org|mirrors.aliyun.com|g' /etc/apt/sources.list 2>/dev/null || true + +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc default-libmysqlclient-dev curl \ && rm -rf /var/lib/apt/lists/* -# Copy requirements first for better caching -COPY requirements.txt . +COPY task_scheduler/requirements.txt . +RUN pip install --no-cache-dir -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com -r requirements.txt -# Install Python dependencies -RUN pip install --no-cache-dir -r requirements.txt +# 复制共享模块和调度器代码 +COPY shared/ ./shared/ +COPY task_scheduler/ ./task_scheduler/ -# Copy application code -COPY app/ ./app/ - -# Create non-root user for security RUN groupadd -r appuser && useradd -r -g appuser appuser + +ENV PYTHONPATH=/app USER appuser -# Expose port (optional, as scheduler doesn't need external access) -# EXPOSE 8000 - -# Start Celery Beat scheduler -CMD ["celery", "-A", "app.celery_app", "beat", "--loglevel=info"] +# 同时启动 Celery Worker + Beat +CMD ["celery", "-A", "task_scheduler.app.celery_app:celery_app", "worker", "--beat", "--loglevel=info", "--concurrency=4"] diff --git a/backend/task_scheduler/app/celery_app.py b/backend/task_scheduler/app/celery_app.py index e892d48..d0e8971 100644 --- a/backend/task_scheduler/app/celery_app.py +++ b/backend/task_scheduler/app/celery_app.py @@ -1,213 +1,49 @@ """ Weibo-HotSign Task Scheduler Service -Celery Beat configuration for scheduled sign-in tasks +Celery app + Beat configuration. + +策略:每 5 分钟运行一次 check_and_run_due_tasks, +该任务从数据库读取所有 enabled 的 Task,用 croniter 判断 +当前时间是否在 cron 窗口内(±3 分钟),如果是则提交签到。 +这样无需动态修改 beat_schedule,简单可靠。 """ import os import sys -import asyncio import logging -from typing import Dict, List -from datetime import datetime from celery import Celery -from celery.schedules import crontab -from croniter import croniter -from sqlalchemy import select -# Add parent directory to path for imports +# 确保 shared 模块可导入 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) -from shared.models.base import AsyncSessionLocal -from shared.models.task import Task -from shared.models.account import Account -from shared.config import shared_settings - from .config import settings logger = logging.getLogger(__name__) # Create Celery app celery_app = Celery( - "weibo_hot_sign_scheduler", + "weibo_scheduler", broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND, - include=["task_scheduler.app.tasks.signin_tasks"] + include=["task_scheduler.app.tasks.signin_tasks"], ) -# Celery configuration celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="Asia/Shanghai", - enable_utc=True, - beat_schedule_filename="celerybeat-schedule", - beat_max_loop_interval=5, + enable_utc=False, + beat_schedule_filename="/tmp/celerybeat-schedule", + beat_max_loop_interval=60, + beat_schedule={ + "check-due-tasks": { + "task": "task_scheduler.app.tasks.signin_tasks.check_and_run_due_tasks", + "schedule": 60.0, # 每分钟检查一次(轻量查询,只在到期前 5 分钟才真正提交) + }, + }, ) - -class TaskSchedulerService: - """Service to manage scheduled tasks from database""" - - def __init__(self): - self.scheduled_tasks: Dict[str, dict] = {} - - async def load_scheduled_tasks(self) -> List[Task]: - """ - Load enabled tasks from database and register them to Celery Beat. - Returns list of loaded tasks. - """ - try: - async with AsyncSessionLocal() as session: - # Query all enabled tasks with their accounts - stmt = ( - select(Task, Account) - .join(Account, Task.account_id == Account.id) - .where(Task.is_enabled == True) - ) - result = await session.execute(stmt) - task_account_pairs = result.all() - - logger.info(f"📅 Loaded {len(task_account_pairs)} enabled tasks from database") - - # Register tasks to Celery Beat dynamically - beat_schedule = {} - for task, account in task_account_pairs: - try: - # Validate cron expression - if not croniter.is_valid(task.cron_expression): - logger.warning(f"Invalid cron expression for task {task.id}: {task.cron_expression}") - continue - - # Create schedule entry - schedule_name = f"task_{task.id}" - beat_schedule[schedule_name] = { - "task": "task_scheduler.app.tasks.signin_tasks.execute_signin_task", - "schedule": self._parse_cron_to_celery(task.cron_expression), - "args": (task.id, task.account_id, task.cron_expression), - } - - self.scheduled_tasks[task.id] = { - "task_id": task.id, - "account_id": task.account_id, - "cron_expression": task.cron_expression, - "account_status": account.status, - } - - logger.info(f"✅ Registered task {task.id} for account {account.weibo_user_id} with cron: {task.cron_expression}") - - except Exception as e: - logger.error(f"Failed to register task {task.id}: {e}") - continue - - # Update Celery Beat schedule - celery_app.conf.beat_schedule.update(beat_schedule) - - return [task for task, _ in task_account_pairs] - - except Exception as e: - logger.error(f"❌ Error loading tasks from database: {e}") - return [] - - def _parse_cron_to_celery(self, cron_expression: str) -> crontab: - """ - Parse cron expression string to Celery crontab schedule. - Format: minute hour day month day_of_week - """ - parts = cron_expression.split() - if len(parts) != 5: - raise ValueError(f"Invalid cron expression: {cron_expression}") - - return crontab( - minute=parts[0], - hour=parts[1], - day_of_month=parts[2], - month_of_year=parts[3], - day_of_week=parts[4], - ) - - async def add_task(self, task_id: str, account_id: str, cron_expression: str): - """Dynamically add a new task to the schedule""" - try: - if not croniter.is_valid(cron_expression): - raise ValueError(f"Invalid cron expression: {cron_expression}") - - schedule_name = f"task_{task_id}" - celery_app.conf.beat_schedule[schedule_name] = { - "task": "task_scheduler.app.tasks.signin_tasks.execute_signin_task", - "schedule": self._parse_cron_to_celery(cron_expression), - "args": (task_id, account_id, cron_expression), - } - - self.scheduled_tasks[task_id] = { - "task_id": task_id, - "account_id": account_id, - "cron_expression": cron_expression, - } - - logger.info(f"✅ Added task {task_id} to schedule") - - except Exception as e: - logger.error(f"Failed to add task {task_id}: {e}") - raise - - async def remove_task(self, task_id: str): - """Dynamically remove a task from the schedule""" - try: - schedule_name = f"task_{task_id}" - if schedule_name in celery_app.conf.beat_schedule: - del celery_app.conf.beat_schedule[schedule_name] - logger.info(f"✅ Removed task {task_id} from schedule") - - if task_id in self.scheduled_tasks: - del self.scheduled_tasks[task_id] - - except Exception as e: - logger.error(f"Failed to remove task {task_id}: {e}") - raise - - async def update_task(self, task_id: str, is_enabled: bool, cron_expression: str = None): - """Update an existing task in the schedule""" - try: - if is_enabled: - # Re-add or update the task - async with AsyncSessionLocal() as session: - stmt = select(Task).where(Task.id == task_id) - result = await session.execute(stmt) - task = result.scalar_one_or_none() - - if task: - await self.add_task( - task_id, - task.account_id, - cron_expression or task.cron_expression - ) - else: - # Remove the task - await self.remove_task(task_id) - - logger.info(f"✅ Updated task {task_id}, enabled={is_enabled}") - - except Exception as e: - logger.error(f"Failed to update task {task_id}: {e}") - raise - - -# Global scheduler service instance -scheduler_service = TaskSchedulerService() - - -# Synchronous wrapper for async function -def sync_load_tasks(): - """Synchronous wrapper to load tasks on startup""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(scheduler_service.load_scheduled_tasks()) - finally: - loop.close() - - -# Import task modules to register them -from .tasks import signin_tasks +# 导入 task 模块以注册 +from .tasks import signin_tasks # noqa: F401, E402 diff --git a/backend/task_scheduler/app/config.py b/backend/task_scheduler/app/config.py index 8961d05..d2512f3 100644 --- a/backend/task_scheduler/app/config.py +++ b/backend/task_scheduler/app/config.py @@ -9,9 +9,9 @@ from typing import List class Settings(BaseSettings): """Task Scheduler settings""" - # Celery settings - CELERY_BROKER_URL: str = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") - CELERY_RESULT_BACKEND: str = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0") + # Celery settings — 从环境变量 REDIS_URL 派生 + CELERY_BROKER_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379/0") + CELERY_RESULT_BACKEND: str = os.getenv("REDIS_URL", "redis://localhost:6379/0") # Task execution settings MAX_CONCURRENT_TASKS: int = int(os.getenv("MAX_CONCURRENT_TASKS", "10")) @@ -40,5 +40,6 @@ class Settings(BaseSettings): class Config: case_sensitive = True env_file = ".env" + extra = "ignore" settings = Settings() diff --git a/backend/task_scheduler/app/tasks/signin_tasks.py b/backend/task_scheduler/app/tasks/signin_tasks.py index c8a0838..22d0727 100644 --- a/backend/task_scheduler/app/tasks/signin_tasks.py +++ b/backend/task_scheduler/app/tasks/signin_tasks.py @@ -1,421 +1,373 @@ """ -Weibo-HotSign Sign-in Task Definitions -Celery tasks for scheduled sign-in operations +Weibo-HotSign 定时签到任务。 + +核心逻辑: +1. check_and_run_due_tasks (Beat 每 5 分钟触发) + → 从 DB 读取所有 enabled Task + → 用 croniter 判断是否到期 + → 对到期的 task 提交 execute_signin_task +2. execute_signin_task + → 解密 cookie → 获取超话列表 → 逐个签到 → 写日志 """ import os import sys +import re import asyncio -import httpx -import json +import time as _time import logging import redis -from datetime import datetime -from typing import Dict, Any, Optional +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional from celery import current_task +from croniter import croniter from sqlalchemy import select -# Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) from shared.models.base import AsyncSessionLocal from shared.models.task import Task from shared.models.account import Account +from shared.models.signin_log import SigninLog +from shared.crypto import decrypt_cookie, derive_key from shared.config import shared_settings from ..celery_app import celery_app from ..config import settings -# Configure logger logger = logging.getLogger(__name__) -# Redis client for distributed locks (可选) +WEIBO_HEADERS = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + ), + "Referer": "https://weibo.com/", + "Accept": "*/*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", +} + +# --------------- Redis 分布式锁 --------------- + _redis_client = None -def get_redis_client(): - """获取 Redis 客户端,如果未启用则返回 None""" + +def _get_redis(): global _redis_client if not shared_settings.USE_REDIS: return None - if _redis_client is None: try: _redis_client = redis.from_url(shared_settings.REDIS_URL, decode_responses=True) - except Exception as e: - logger.warning(f"Redis 连接失败: {e},分布式锁将被禁用") + except Exception: return None return _redis_client -# 内存锁(当 Redis 不可用时) -_memory_locks = {} +def _acquire_lock(key: str, ttl: int = 600) -> bool: + r = _get_redis() + if r: + return bool(r.set(f"lock:{key}", "1", nx=True, ex=ttl)) + return True # 无 Redis 时不加锁 -class DistributedLock: - """分布式锁(支持 Redis 或内存模式)""" - - def __init__(self, lock_key: str, timeout: int = 300): - """ - Initialize distributed lock - - Args: - lock_key: Unique key for the lock - timeout: Lock timeout in seconds (default 5 minutes) - """ - self.lock_key = f"lock:{lock_key}" - self.timeout = timeout - self.acquired = False - self.redis_client = get_redis_client() - - def acquire(self) -> bool: - """ - Acquire the lock using Redis SETNX or memory dict - Returns True if lock acquired, False otherwise - """ +def _release_lock(key: str): + r = _get_redis() + if r: + r.delete(f"lock:{key}") + + +# --------------- 辅助函数 --------------- + +def _parse_cookies(cookie_str: str) -> Dict[str, str]: + cookies: Dict[str, str] = {} + for pair in cookie_str.split(";"): + pair = pair.strip() + if "=" in pair: + k, v = pair.split("=", 1) + cookies[k.strip()] = v.strip() + return cookies + + +def _run_async(coro): + """在 Celery worker 线程中运行 async 函数""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +# =============== Celery Tasks =============== + +@celery_app.task +def check_and_run_due_tasks(): + """ + Beat 每分钟调用。 + 对每个 enabled Task,用 croniter 算出下次触发时间, + 如果在 5 分钟内即将到期,则计算精确延迟后提交执行。 + 已经过了触发时间但在 2 分钟内的也会立即执行(容错)。 + """ + return _run_async(_async_check_due()) + + +async def _async_check_due(): + now = datetime.now() + submitted = 0 + + async with AsyncSessionLocal() as session: + stmt = ( + select(Task, Account) + .join(Account, Task.account_id == Account.id) + .where(Task.is_enabled == True) + .where(Account.status.in_(["pending", "active"])) + ) + result = await session.execute(stmt) + pairs = result.all() + + for task, account in pairs: try: - if self.redis_client: - # 使用 Redis - result = self.redis_client.set(self.lock_key, "1", nx=True, ex=self.timeout) - self.acquired = bool(result) - else: - # 使用内存锁(本地开发) - if self.lock_key not in _memory_locks: - _memory_locks[self.lock_key] = True - self.acquired = True - else: - self.acquired = False - - return self.acquired + if not croniter.is_valid(task.cron_expression): + logger.warning(f"无效 cron: task={task.id}, expr={task.cron_expression}") + continue + + cron = croniter(task.cron_expression, now) + + # 看下次触发时间 + next_fire = cron.get_next(datetime) + # 也看上次触发时间(处理刚过期的情况) + cron2 = croniter(task.cron_expression, now) + prev_fire = cron2.get_prev(datetime) + + seconds_until = (next_fire - now).total_seconds() + seconds_since = (now - prev_fire).total_seconds() + + # 情况1: 下次触发在 5 分钟内 → 延迟到精确时间执行 + # 情况2: 上次触发在 2 分钟内 → 立即执行(刚好到期或略过) + fire_time = None + delay_seconds = 0 + + if seconds_until <= 300: + fire_time = next_fire + delay_seconds = max(0, int(seconds_until)) + elif seconds_since <= 120: + fire_time = prev_fire + delay_seconds = 0 + + if fire_time is not None: + lock_key = f"due:{task.id}:{fire_time.strftime('%Y%m%d%H%M')}" + if _acquire_lock(lock_key, ttl=3600): + if delay_seconds > 0: + execute_signin_task.apply_async( + args=[str(task.id), str(account.id)], + countdown=delay_seconds, + ) + logger.info( + f"📤 预约任务: task={task.id}, account={account.weibo_user_id}, " + f"{delay_seconds}秒后执行 (cron={task.cron_expression})" + ) + else: + execute_signin_task.delay(str(task.id), str(account.id)) + logger.info( + f"📤 立即执行: task={task.id}, account={account.weibo_user_id}, " + f"cron={task.cron_expression}" + ) + submitted += 1 except Exception as e: - logger.error(f"Failed to acquire lock {self.lock_key}: {e}") - return False - - def release(self): - """Release the lock""" - if self.acquired: - try: - if self.redis_client: - # 使用 Redis - self.redis_client.delete(self.lock_key) - else: - # 使用内存锁 - if self.lock_key in _memory_locks: - del _memory_locks[self.lock_key] - - self.acquired = False - except Exception as e: - logger.error(f"Failed to release lock {self.lock_key}: {e}") - - def __enter__(self): - """Context manager entry""" - if not self.acquire(): - raise Exception(f"Failed to acquire lock: {self.lock_key}") - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit""" - self.release() + logger.error(f"检查任务 {task.id} 出错: {e}") + + if submitted > 0: + logger.info(f"✅ 本轮提交了 {submitted} 个任务") + return {"checked": len(pairs), "submitted": submitted} + + +@celery_app.task(bind=True, max_retries=2, default_retry_delay=120) +def execute_signin_task(self, task_id: str, account_id: str): + """执行单个账号的全量超话签到""" + lock_key = f"signin:{task_id}:{account_id}" + if not _acquire_lock(lock_key, ttl=600): + logger.warning(f"⏭️ 签到任务正在执行中: task={task_id}") + return {"status": "skipped", "reason": "already running"} -@celery_app.task(bind=True, max_retries=3, default_retry_delay=60) -def execute_signin_task(self, task_id: str, account_id: str, cron_expression: str): - """ - Execute scheduled sign-in task for a specific account - This task is triggered by Celery Beat based on cron schedule - Uses distributed lock to prevent duplicate execution - """ - lock_key = f"signin_task:{task_id}:{account_id}" - lock = DistributedLock(lock_key, timeout=300) - - # Try to acquire lock - if not lock.acquire(): - logger.warning(f"⚠️ Task {task_id} for account {account_id} is already running, skipping") - return { - "status": "skipped", - "reason": "Task already running (distributed lock)", - "account_id": account_id, - "task_id": task_id - } - try: - logger.info(f"🎯 Starting sign-in task {task_id} for account {account_id}") - - # Update task status - current_task.update_state( - state="PROGRESS", - meta={ - "current": 10, - "total": 100, - "status": "Initializing sign-in process...", - "account_id": account_id - } - ) - - # Get account info from database - account_info = _get_account_from_db(account_id) - if not account_info: - raise Exception(f"Account {account_id} not found in database") - - # Check if account is active - if account_info["status"] not in ["pending", "active"]: - logger.warning(f"Account {account_id} status is {account_info['status']}, skipping sign-in") - return { - "status": "skipped", - "reason": f"Account status is {account_info['status']}", - "account_id": account_id - } - - # Call signin executor service - result = _call_signin_executor(account_id, task_id) - - # Update task status - current_task.update_state( - state="SUCCESS", - meta={ - "current": 100, - "total": 100, - "status": "Sign-in completed successfully", - "result": result, - "account_id": account_id - } - ) - - logger.info(f"✅ Sign-in task {task_id} completed successfully for account {account_id}") + logger.info(f"🎯 开始签到: task={task_id}, account={account_id}") + result = _run_async(_async_do_signin(account_id)) + logger.info(f"✅ 签到完成: task={task_id}, result={result}") return result - except Exception as exc: - logger.error(f"❌ Sign-in task {task_id} failed for account {account_id}: {exc}") - - # Retry logic with exponential backoff - if self.request.retries < settings.MAX_RETRY_ATTEMPTS: - retry_delay = settings.RETRY_DELAY_SECONDS * (2 ** self.request.retries) - logger.info(f"🔄 Retrying task {task_id} (attempt {self.request.retries + 1}) in {retry_delay}s") - - # Release lock before retry - lock.release() - - raise self.retry(exc=exc, countdown=retry_delay) - - # Final failure - current_task.update_state( - state="FAILURE", - meta={ - "current": 100, - "total": 100, - "status": f"Task failed after {settings.MAX_RETRY_ATTEMPTS} attempts", - "error": str(exc), - "account_id": account_id - } - ) - raise exc - - finally: - # Always release lock - lock.release() - - -def _get_account_from_db(account_id: str) -> Optional[Dict[str, Any]]: - """ - Query account information from database (replaces mock data). - Returns account dict or None if not found. - """ - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(_async_get_account(account_id)) - finally: - loop.close() - - -async def _async_get_account(account_id: str) -> Optional[Dict[str, Any]]: - """Async helper to query account from database""" - try: - async with AsyncSessionLocal() as session: - stmt = select(Account).where(Account.id == account_id) - result = await session.execute(stmt) - account = result.scalar_one_or_none() - - if not account: - return None - - return { - "id": account.id, - "user_id": account.user_id, - "weibo_user_id": account.weibo_user_id, - "remark": account.remark, - "status": account.status, - "encrypted_cookies": account.encrypted_cookies, - "iv": account.iv, - } - except Exception as e: - logger.error(f"Error querying account {account_id}: {e}") - return None - -@celery_app.task -def schedule_daily_signin(): - """ - Daily sign-in task - queries database for enabled tasks - """ - logger.info("📅 Executing daily sign-in schedule") - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(_async_schedule_daily_signin()) - finally: - loop.close() - - -async def _async_schedule_daily_signin(): - """Async helper to query and schedule tasks""" - try: - async with AsyncSessionLocal() as session: - # Query all enabled tasks - stmt = ( - select(Task, Account) - .join(Account, Task.account_id == Account.id) - .where(Task.is_enabled == True) - .where(Account.status.in_(["pending", "active"])) - ) - result = await session.execute(stmt) - task_account_pairs = result.all() - - results = [] - for task, account in task_account_pairs: - try: - # Submit individual sign-in task for each account - celery_task = execute_signin_task.delay( - task_id=task.id, - account_id=account.id, - cron_expression=task.cron_expression - ) - results.append({ - "account_id": account.id, - "task_id": celery_task.id, - "status": "submitted" - }) - except Exception as e: - logger.error(f"Failed to submit task for account {account.id}: {e}") - results.append({ - "account_id": account.id, - "status": "failed", - "error": str(e) - }) - - return { - "scheduled_date": datetime.now().isoformat(), - "accounts_processed": len(task_account_pairs), - "results": results - } - except Exception as e: - logger.error(f"Error in daily signin schedule: {e}") + logger.error(f"❌ 签到失败: task={task_id}, error={exc}") + _release_lock(lock_key) + if self.request.retries < 2: + raise self.retry(exc=exc) raise - -@celery_app.task -def process_pending_tasks(): - """ - Process pending sign-in tasks from database - Queries database for enabled tasks and submits them for execution - """ - logger.info("🔄 Processing pending sign-in tasks from database") - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(_async_process_pending_tasks()) finally: - loop.close() + _release_lock(lock_key) -async def _async_process_pending_tasks(): - """Async helper to process pending tasks""" - try: - async with AsyncSessionLocal() as session: - # Query enabled tasks that are due for execution - stmt = ( - select(Task, Account) - .join(Account, Task.account_id == Account.id) - .where(Task.is_enabled == True) - .where(Account.status.in_(["pending", "active"])) - ) - result = await session.execute(stmt) - task_account_pairs = result.all() - - tasks_submitted = 0 - tasks_skipped = 0 - - for task, account in task_account_pairs: - try: - # Submit task for execution - execute_signin_task.delay( - task_id=task.id, - account_id=account.id, - cron_expression=task.cron_expression - ) - tasks_submitted += 1 - except Exception as e: - logger.error(f"Failed to submit task {task.id}: {e}") - tasks_skipped += 1 - - result = { - "processed_at": datetime.now().isoformat(), - "tasks_found": len(task_account_pairs), - "tasks_submitted": tasks_submitted, - "tasks_skipped": tasks_skipped, - "status": "completed" - } - - logger.info(f"✅ Processed pending tasks: {result}") - return result - - except Exception as e: - logger.error(f"❌ Failed to process pending tasks: {e}") - raise +# =============== 签到核心逻辑 =============== -def _call_signin_executor(account_id: str, task_id: str) -> Dict[str, Any]: - """ - Call the signin executor service to perform actual sign-in - """ - try: - signin_data = { - "task_id": task_id, - "account_id": account_id, - "timestamp": datetime.now().isoformat(), - "requested_by": "task_scheduler" - } - - # Call signin executor service - with httpx.Client(timeout=30.0) as client: - response = client.post( - f"{settings.SIGNIN_EXECUTOR_URL}/api/v1/signin/execute", - json=signin_data - ) - - if response.status_code == 200: - result = response.json() - logger.info(f"Sign-in executor response: {result}") - return result +async def _async_do_signin(account_id: str) -> Dict[str, Any]: + """直接操作数据库和微博 API 完成签到""" + import httpx + + async with AsyncSessionLocal() as session: + stmt = select(Account).where(Account.id == account_id) + result = await session.execute(stmt) + account = result.scalar_one_or_none() + if not account: + raise Exception(f"Account {account_id} not found") + + if account.status not in ("pending", "active"): + return {"status": "skipped", "reason": f"status={account.status}"} + + # 解密 cookie + key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY) + try: + cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) + except Exception: + account.status = "invalid_cookie" + await session.commit() + return {"status": "failed", "reason": "cookie decryption failed"} + + cookies = _parse_cookies(cookie_str) + + # 获取超话列表 + topics = await _fetch_topics(cookies) + if not topics: + return {"status": "completed", "signed": 0, "message": "no topics"} + + # 逐个签到 + signed = already = failed = 0 + for topic in topics: + await asyncio.sleep(1.5) + r = await _do_single_signin(cookies, topic) + + if r["status"] == "success": + s = "success" + signed += 1 + elif r["status"] == "already_signed": + s = "failed_already_signed" + already += 1 else: - raise Exception(f"Sign-in executor returned error: {response.status_code} - {response.text}") - - except httpx.RequestError as e: - logger.error(f"Network error calling signin executor: {e}") - raise Exception(f"Failed to connect to signin executor: {e}") - except Exception as e: - logger.error(f"Error calling signin executor: {e}") - raise + s = "failed_network" + failed += 1 -# Periodic task definitions for Celery Beat -celery_app.conf.beat_schedule = { - "daily-signin-at-8am": { - "task": "app.tasks.signin_tasks.schedule_daily_signin", - "schedule": { - "hour": 8, - "minute": 0, - }, - }, - "process-pending-every-15-minutes": { - "task": "app.tasks.signin_tasks.process_pending_tasks", - "schedule": 900.0, # Every 15 minutes - }, -} + log = SigninLog( + account_id=account.id, + topic_title=topic["title"], + status=s, + reward_info={"message": r["message"]}, + signed_at=datetime.now(), + ) + session.add(log) + + account.last_checked_at = datetime.now() + if account.status != "active": + account.status = "active" + await session.commit() + + logger.info( + f"签到结果: account={account_id}, " + f"signed={signed}, already={already}, failed={failed}" + ) + return { + "status": "completed", + "signed": signed, + "already_signed": already, + "failed": failed, + "total_topics": len(topics), + } + + +async def _fetch_topics(cookies: Dict[str, str]) -> List[dict]: + """获取关注的超话列表""" + import httpx + + topics: List[dict] = [] + async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: + await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) + xsrf = client.cookies.get("XSRF-TOKEN", "") + headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"} + if xsrf: + headers["X-XSRF-TOKEN"] = xsrf + + page = 1 + while page <= 10: + resp = await client.get( + "https://weibo.com/ajax/profile/topicContent", + params={"tabid": "231093_-_chaohua", "page": str(page)}, + headers=headers, cookies=cookies, + ) + data = resp.json() + if data.get("ok") != 1: + break + topic_list = data.get("data", {}).get("list", []) + if not topic_list: + break + for item in topic_list: + title = item.get("topic_name", "") or item.get("title", "") + containerid = "" + for field in ("oid", "scheme"): + m = re.search(r"100808[0-9a-fA-F]+", item.get(field, "")) + if m: + containerid = m.group(0) + break + if title and containerid: + topics.append({"title": title, "containerid": containerid}) + if page >= data.get("data", {}).get("max_page", 1): + break + page += 1 + return topics + + +async def _do_single_signin(cookies: Dict[str, str], topic: dict) -> dict: + """签到单个超话""" + import httpx + + try: + async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client: + await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies) + xsrf = client.cookies.get("XSRF-TOKEN", "") + h = { + **WEIBO_HEADERS, + "Referer": f"https://weibo.com/p/{topic['containerid']}/super_index", + "X-Requested-With": "XMLHttpRequest", + } + if xsrf: + h["X-XSRF-TOKEN"] = xsrf + + resp = await client.get( + "https://weibo.com/p/aj/general/button", + params={ + "ajwvr": "6", + "api": "http://i.huati.weibo.com/aj/super/checkin", + "texta": "签到", "textb": "已签到", + "status": "0", + "id": topic["containerid"], + "location": "page_100808_super_index", + "timezone": "GMT+0800", + "lang": "zh-cn", + "plat": "Win32", + "ua": WEIBO_HEADERS["User-Agent"], + "screen": "1920*1080", + "__rnd": str(int(_time.time() * 1000)), + }, + headers=h, cookies=cookies, + ) + data = resp.json() + code = str(data.get("code", "")) + msg = data.get("msg", "") + + if code == "100000": + tip = "" + if isinstance(data.get("data"), dict): + tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "") + return {"status": "success", "message": tip or "签到成功"} + elif code == "382004": + return {"status": "already_signed", "message": msg or "今日已签到"} + else: + return {"status": "failed", "message": f"code={code}, msg={msg}"} + except Exception as e: + return {"status": "failed", "message": str(e)} diff --git a/backend/task_scheduler/requirements.txt b/backend/task_scheduler/requirements.txt index 03ac96a..b0b3f8b 100644 --- a/backend/task_scheduler/requirements.txt +++ b/backend/task_scheduler/requirements.txt @@ -14,5 +14,12 @@ pydantic-settings==2.0.3 # HTTP Client httpx==0.25.2 +# Cron parsing +croniter==2.0.1 + +# Crypto (for cookie decryption) +cryptography==41.0.7 +pycryptodome==3.19.0 + # Utilities python-dotenv==1.0.0 diff --git a/docker-compose.yml b/docker-compose.yml index 542887b..25f312e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -79,6 +79,20 @@ services: networks: - 1panel-network + # 定时任务调度器 (Celery Worker + Beat) + task-scheduler: + build: + context: ./backend + dockerfile: task_scheduler/Dockerfile + container_name: weibo-scheduler + restart: unless-stopped + environment: + <<: *db-env + depends_on: + - api-service + networks: + - 1panel-network + networks: 1panel-network: external: true diff --git a/frontend/templates/account_detail.html b/frontend/templates/account_detail.html index 8cbfdc0..fb9c4f0 100644 --- a/frontend/templates/account_detail.html +++ b/frontend/templates/account_detail.html @@ -136,7 +136,13 @@ {% endif %}