定时任务逻辑优化

This commit is contained in:
2026-03-18 13:52:47 +08:00
parent fecb42838c
commit 07097091fe
6 changed files with 450 additions and 516 deletions

View File

@@ -23,5 +23,5 @@ RUN groupadd -r appuser && useradd -r -g appuser appuser
ENV PYTHONPATH=/app
USER appuser
# 同时启动 Celery Worker + Beat
CMD ["celery", "-A", "task_scheduler.app.celery_app:celery_app", "worker", "--beat", "--loglevel=info", "--concurrency=4"]
# APScheduler 调度器
CMD ["python", "-m", "task_scheduler.app.main"]

View File

@@ -1,54 +1 @@
"""
Weibo-HotSign Task Scheduler Service
Celery app + Beat configuration.
策略:每 5 分钟运行一次 check_and_run_due_tasks
该任务从数据库读取所有 enabled 的 Task用 croniter 判断
当前时间是否在 cron 窗口内±3 分钟),如果是则提交签到。
这样无需动态修改 beat_schedule简单可靠。
"""
import os
import sys
import logging
from celery import Celery
from celery.schedules import crontab
# 确保 shared 模块可导入
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
from .config import settings
logger = logging.getLogger(__name__)
# Create Celery app
celery_app = Celery(
"weibo_scheduler",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=["task_scheduler.app.tasks.signin_tasks"],
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Shanghai",
enable_utc=False,
beat_schedule_filename="/tmp/celerybeat-schedule",
beat_max_loop_interval=60,
beat_schedule={
"check-due-tasks": {
"task": "task_scheduler.app.tasks.signin_tasks.check_and_run_due_tasks",
"schedule": 60.0, # 每分钟检查一次(轻量查询,只在到期前 5 分钟才真正提交)
},
"cleanup-old-logs": {
"task": "task_scheduler.app.tasks.signin_tasks.cleanup_old_signin_logs",
"schedule": crontab(hour=3, minute=0), # 每天凌晨 3 点
},
},
)
# 导入 task 模块以注册
from .tasks import signin_tasks # noqa: F401, E402
# 此文件已废弃,调度器改用 APScheduler入口见 main.py

View File

@@ -0,0 +1,442 @@
"""
Weibo-HotSign 定时任务调度器 (APScheduler)
启动时从 DB 加载所有 enabled 的 Task按 cron 精确注册。
每 5 分钟重新同步一次 DB处理新增/删除/修改的任务)。
签到逻辑直接操作数据库和微博 API不依赖其他服务。
"""
import os
import sys
import re
import asyncio
import logging
import signal
import time as _time
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.jobstores.memory import MemoryJobStore
# 确保 shared 模块可导入
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
from shared.config import shared_settings
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("scheduler")
scheduler = BackgroundScheduler(
jobstores={"default": MemoryJobStore()},
timezone="Asia/Shanghai",
)
# 已注册的任务 ID 集合,用于同步时对比
_registered_tasks: dict = {} # task_id -> cron_expression
# 日志保留天数
SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30"))
CLEANUP_BATCH_SIZE = 1000
WEIBO_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Referer": "https://weibo.com/",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
def _make_session():
"""每次创建独立的 engine + session避免 event loop 冲突。"""
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
kwargs = {"echo": False}
if "sqlite" not in shared_settings.DATABASE_URL:
kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True)
eng = create_async_engine(shared_settings.DATABASE_URL, **kwargs)
factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False)
return factory, eng
def _run_async(coro):
"""在新 event loop 中运行 async 函数。"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
def _parse_cookies(cookie_str: str) -> dict:
cookies = {}
for pair in cookie_str.split(";"):
pair = pair.strip()
if "=" in pair:
k, v = pair.split("=", 1)
cookies[k.strip()] = v.strip()
return cookies
# =============== 任务同步 ===============
def sync_db_tasks():
"""从 DB 同步任务到 APScheduler。新增的加上删除的移除cron 变了的更新。"""
try:
tasks = _run_async(_load_tasks_from_db())
except Exception as e:
logger.error(f"同步任务失败: {e}")
return
db_task_ids = set()
for task_id, account_id, cron_expr in tasks:
db_task_ids.add(task_id)
job_id = f"signin_{task_id}"
# 已存在且 cron 没变 → 跳过
if task_id in _registered_tasks and _registered_tasks[task_id] == cron_expr:
continue
# 新增或 cron 变了 → 添加/替换
try:
parts = cron_expr.split()
if len(parts) != 5:
logger.warning(f"无效 cron: task={task_id}, expr={cron_expr}")
continue
trigger = CronTrigger(
minute=parts[0], hour=parts[1],
day=parts[2], month=parts[3], day_of_week=parts[4],
timezone="Asia/Shanghai",
)
scheduler.add_job(
run_signin,
trigger=trigger,
id=job_id,
args=[task_id, account_id],
replace_existing=True,
misfire_grace_time=300, # 5 分钟内的 misfire 仍然执行
)
_registered_tasks[task_id] = cron_expr
logger.info(f"✅ 注册任务: task={task_id}, account={account_id}, cron={cron_expr}")
except Exception as e:
logger.error(f"注册任务失败: task={task_id}, error={e}")
# 移除 DB 中已删除/禁用的任务
removed = set(_registered_tasks.keys()) - db_task_ids
for task_id in removed:
job_id = f"signin_{task_id}"
try:
scheduler.remove_job(job_id)
logger.info(f"🗑️ 移除任务: task={task_id}")
except Exception:
pass
_registered_tasks.pop(task_id, None)
logger.info(f"📋 任务同步完成: 活跃 {len(db_task_ids)} 个, 已注册 {len(_registered_tasks)}")
async def _load_tasks_from_db():
"""从 DB 读取所有 enabled 且账号 active/pending 的任务。"""
from sqlalchemy import select
from shared.models.task import Task
from shared.models.account import Account
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
stmt = (
select(Task.id, Task.account_id, Task.cron_expression)
.join(Account, Task.account_id == Account.id)
.where(Task.is_enabled == True)
.where(Account.status.in_(["pending", "active"]))
)
result = await session.execute(stmt)
return [(str(r[0]), str(r[1]), r[2]) for r in result.all()]
finally:
await eng.dispose()
# =============== 签到入口 ===============
def run_signin(task_id: str, account_id: str):
"""APScheduler 调用的签到入口(同步函数,内部跑 async"""
logger.info(f"🎯 开始签到: task={task_id}, account={account_id}")
try:
result = _run_async(_async_do_signin(account_id))
logger.info(f"✅ 签到完成: task={task_id}, result={result}")
except Exception as e:
logger.error(f"❌ 签到失败: task={task_id}, error={e}")
async def _async_do_signin(account_id: str):
"""执行单个账号的全量超话签到。"""
import httpx
from sqlalchemy import select
from shared.models.account import Account
from shared.models.signin_log import SigninLog
from shared.crypto import decrypt_cookie, derive_key
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
result = await session.execute(select(Account).where(Account.id == account_id))
account = result.scalar_one_or_none()
if not account:
return {"status": "error", "reason": "account not found"}
if account.status not in ("pending", "active"):
return {"status": "skipped", "reason": f"status={account.status}"}
key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY)
try:
cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key)
except Exception:
account.status = "invalid_cookie"
await session.commit()
return {"status": "failed", "reason": "cookie decryption failed"}
cookies = _parse_cookies(cookie_str)
# 获取超话列表
topics = await _fetch_topics(cookies)
if not topics:
return {"status": "completed", "signed": 0, "message": "no topics"}
# 逐个签到
signed = already = failed = 0
for topic in topics:
await asyncio.sleep(1.5)
r = await _do_single_signin(cookies, topic)
if r["status"] == "success":
s, signed = "success", signed + 1
elif r["status"] == "already_signed":
s, already = "failed_already_signed", already + 1
else:
s, failed = "failed_network", failed + 1
session.add(SigninLog(
account_id=account.id, topic_title=topic["title"],
status=s, reward_info={"message": r["message"]},
signed_at=datetime.now(),
))
account.last_checked_at = datetime.now()
if account.status != "active":
account.status = "active"
await session.commit()
logger.info(
f"签到结果: account={account_id}, "
f"signed={signed}, already={already}, failed={failed}"
)
return {"signed": signed, "already_signed": already, "failed": failed, "total": len(topics)}
finally:
await eng.dispose()
async def _fetch_topics(cookies: dict) -> list:
"""获取关注的超话列表。"""
import httpx
topics = []
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
xsrf = client.cookies.get("XSRF-TOKEN", "")
headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"}
if xsrf:
headers["X-XSRF-TOKEN"] = xsrf
page = 1
while page <= 10:
resp = await client.get(
"https://weibo.com/ajax/profile/topicContent",
params={"tabid": "231093_-_chaohua", "page": str(page)},
headers=headers, cookies=cookies,
)
data = resp.json()
if data.get("ok") != 1:
break
tlist = data.get("data", {}).get("list", [])
if not tlist:
break
for item in tlist:
title = item.get("topic_name", "") or item.get("title", "")
cid = ""
for field in ("oid", "scheme"):
m = re.search(r"100808[0-9a-fA-F]+", item.get(field, ""))
if m:
cid = m.group(0)
break
if title and cid:
topics.append({"title": title, "containerid": cid})
if page >= data.get("data", {}).get("max_page", 1):
break
page += 1
return topics
async def _do_single_signin(cookies: dict, topic: dict) -> dict:
"""签到单个超话。"""
import httpx
try:
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
xsrf = client.cookies.get("XSRF-TOKEN", "")
h = {
**WEIBO_HEADERS,
"Referer": f"https://weibo.com/p/{topic['containerid']}/super_index",
"X-Requested-With": "XMLHttpRequest",
}
if xsrf:
h["X-XSRF-TOKEN"] = xsrf
resp = await client.get(
"https://weibo.com/p/aj/general/button",
params={
"ajwvr": "6",
"api": "http://i.huati.weibo.com/aj/super/checkin",
"texta": "签到", "textb": "已签到",
"status": "0",
"id": topic["containerid"],
"location": "page_100808_super_index",
"timezone": "GMT+0800",
"lang": "zh-cn",
"plat": "Win32",
"ua": WEIBO_HEADERS["User-Agent"],
"screen": "1920*1080",
"__rnd": str(int(_time.time() * 1000)),
},
headers=h, cookies=cookies,
)
data = resp.json()
code = str(data.get("code", ""))
msg = data.get("msg", "")
if code == "100000":
tip = ""
if isinstance(data.get("data"), dict):
tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "")
return {"status": "success", "message": tip or "签到成功"}
elif code == "382004":
return {"status": "already_signed", "message": msg or f"今天已签到({code})"}
else:
return {"status": "failed", "message": f"code={code}, msg={msg}"}
except Exception as e:
return {"status": "failed", "message": str(e)}
# =============== 日志清理 ===============
def cleanup_old_signin_logs():
"""清理超过 N 天的签到日志,分批删除避免锁表。"""
logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...")
try:
result = _run_async(_async_cleanup())
logger.info(f"✅ 日志清理完成: {result}")
except Exception as e:
logger.error(f"❌ 日志清理失败: {e}")
async def _async_cleanup():
from sqlalchemy import select, delete
from shared.models.signin_log import SigninLog
cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS)
total_deleted = 0
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
while True:
stmt = (
select(SigninLog.id)
.where(SigninLog.signed_at < cutoff)
.limit(CLEANUP_BATCH_SIZE)
)
result = await session.execute(stmt)
ids = [row[0] for row in result.all()]
if not ids:
break
del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids))
await session.execute(del_stmt)
await session.commit()
total_deleted += len(ids)
logger.info(f"🧹 已删除 {total_deleted} 条...")
if len(ids) < CLEANUP_BATCH_SIZE:
break
finally:
await eng.dispose()
return {"deleted": total_deleted, "cutoff": cutoff.isoformat()}
# =============== 启动入口 ===============
def _shutdown(signum, frame):
logger.info(f"收到信号 {signum},正在关闭调度器...")
scheduler.shutdown(wait=False)
sys.exit(0)
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("🚀 Weibo-HotSign 定时调度器启动 (APScheduler)")
logger.info("=" * 60)
# 注册信号处理
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
# 启动调度器
scheduler.start()
# 首次同步 DB 任务
sync_db_tasks()
# 每 5 分钟重新同步 DB处理新增/删除/修改的任务)
scheduler.add_job(
sync_db_tasks,
trigger=IntervalTrigger(minutes=5),
id="sync_db_tasks",
replace_existing=True,
misfire_grace_time=60,
)
# 每天凌晨 3 点清理过期签到日志
scheduler.add_job(
cleanup_old_signin_logs,
trigger=CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"),
id="cleanup_old_signin_logs",
replace_existing=True,
misfire_grace_time=3600,
)
logger.info("📋 调度器已启动,等待任务触发...")
logger.info(f"📋 日志清理: 每天 03:00, 保留 {SIGNIN_LOG_RETAIN_DAYS}")
# 打印当前所有已注册的 job
jobs = scheduler.get_jobs()
for job in jobs:
logger.info(f" 📌 {job.id}: next_run={job.next_run_time}")
# 主线程保持运行
try:
while True:
_time.sleep(1)
except (KeyboardInterrupt, SystemExit):
logger.info("调度器关闭中...")
scheduler.shutdown(wait=False)

View File

@@ -1,454 +1 @@
"""
Weibo-HotSign 定时签到任务。
核心逻辑:
1. check_and_run_due_tasks (Beat 每 5 分钟触发)
→ 从 DB 读取所有 enabled Task
→ 用 croniter 判断是否到期
→ 对到期的 task 提交 execute_signin_task
2. execute_signin_task
→ 解密 cookie → 获取超话列表 → 逐个签到 → 写日志
"""
import os
import sys
import re
import asyncio
import time as _time
import logging
import redis
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from celery import current_task
from croniter import croniter
from sqlalchemy import select
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
from shared.models.base import Base
from shared.models.task import Task
from shared.models.account import Account
from shared.models.signin_log import SigninLog
from shared.crypto import decrypt_cookie, derive_key
from shared.config import shared_settings
from ..celery_app import celery_app
from ..config import settings
logger = logging.getLogger(__name__)
def _make_session():
"""
每次创建新的 engine + session避免 Celery worker fork 后
复用主进程 event loop 绑定的连接池导致 'Future attached to a different loop' 错误。
"""
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
engine_kwargs: dict = {"echo": False}
if "sqlite" not in shared_settings.DATABASE_URL:
engine_kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True)
eng = create_async_engine(shared_settings.DATABASE_URL, **engine_kwargs)
factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False)
return factory, eng
WEIBO_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Referer": "https://weibo.com/",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
# --------------- Redis 分布式锁 ---------------
_redis_client = None
def _get_redis():
global _redis_client
if not shared_settings.USE_REDIS:
return None
if _redis_client is None:
try:
_redis_client = redis.from_url(shared_settings.REDIS_URL, decode_responses=True)
except Exception:
return None
return _redis_client
def _acquire_lock(key: str, ttl: int = 600) -> bool:
r = _get_redis()
if r:
return bool(r.set(f"lock:{key}", "1", nx=True, ex=ttl))
return True # 无 Redis 时不加锁
def _release_lock(key: str):
r = _get_redis()
if r:
r.delete(f"lock:{key}")
# --------------- 辅助函数 ---------------
def _parse_cookies(cookie_str: str) -> Dict[str, str]:
cookies: Dict[str, str] = {}
for pair in cookie_str.split(";"):
pair = pair.strip()
if "=" in pair:
k, v = pair.split("=", 1)
cookies[k.strip()] = v.strip()
return cookies
def _run_async(coro):
"""在 Celery worker 线程中运行 async 函数"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
# =============== Celery Tasks ===============
@celery_app.task
def check_and_run_due_tasks():
"""
Beat 每分钟调用。
对每个 enabled Task用 croniter 算出下次触发时间,
如果在 5 分钟内即将到期,则计算精确延迟后提交执行。
已经过了触发时间但在 2 分钟内的也会立即执行(容错)。
"""
return _run_async(_async_check_due())
async def _async_check_due():
now = datetime.now()
submitted = 0
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
stmt = (
select(Task, Account)
.join(Account, Task.account_id == Account.id)
.where(Task.is_enabled == True)
.where(Account.status.in_(["pending", "active"]))
)
result = await session.execute(stmt)
pairs = result.all()
finally:
await eng.dispose()
for task, account in pairs:
try:
if not croniter.is_valid(task.cron_expression):
logger.warning(f"无效 cron: task={task.id}, expr={task.cron_expression}")
continue
cron = croniter(task.cron_expression, now)
# 看下次触发时间
next_fire = cron.get_next(datetime)
# 也看上次触发时间(处理刚过期的情况)
cron2 = croniter(task.cron_expression, now)
prev_fire = cron2.get_prev(datetime)
seconds_until = (next_fire - now).total_seconds()
seconds_since = (now - prev_fire).total_seconds()
# 情况1: 下次触发在 5 分钟内 → 延迟到精确时间执行
# 情况2: 上次触发在 2 分钟内 → 立即执行(刚好到期或略过)
fire_time = None
delay_seconds = 0
if seconds_until <= 300:
fire_time = next_fire
delay_seconds = max(0, int(seconds_until))
elif seconds_since <= 120:
fire_time = prev_fire
delay_seconds = 0
if fire_time is not None:
lock_key = f"due:{task.id}:{fire_time.strftime('%Y%m%d%H%M')}"
if _acquire_lock(lock_key, ttl=3600):
if delay_seconds > 0:
execute_signin_task.apply_async(
args=[str(task.id), str(account.id)],
countdown=delay_seconds,
)
logger.info(
f"📤 预约任务: task={task.id}, account={account.weibo_user_id}, "
f"{delay_seconds}秒后执行 (cron={task.cron_expression})"
)
else:
execute_signin_task.delay(str(task.id), str(account.id))
logger.info(
f"📤 立即执行: task={task.id}, account={account.weibo_user_id}, "
f"cron={task.cron_expression}"
)
submitted += 1
except Exception as e:
logger.error(f"检查任务 {task.id} 出错: {e}")
if submitted > 0:
logger.info(f"✅ 本轮提交了 {submitted} 个任务")
return {"checked": len(pairs), "submitted": submitted}
@celery_app.task(bind=True, max_retries=2, default_retry_delay=120)
def execute_signin_task(self, task_id: str, account_id: str):
"""执行单个账号的全量超话签到"""
lock_key = f"signin:{task_id}:{account_id}"
if not _acquire_lock(lock_key, ttl=600):
logger.warning(f"⏭️ 签到任务正在执行中: task={task_id}")
return {"status": "skipped", "reason": "already running"}
try:
logger.info(f"🎯 开始签到: task={task_id}, account={account_id}")
result = _run_async(_async_do_signin(account_id))
logger.info(f"✅ 签到完成: task={task_id}, result={result}")
return result
except Exception as exc:
logger.error(f"❌ 签到失败: task={task_id}, error={exc}")
_release_lock(lock_key)
if self.request.retries < 2:
raise self.retry(exc=exc)
raise
finally:
_release_lock(lock_key)
# =============== 签到核心逻辑 ===============
async def _async_do_signin(account_id: str) -> Dict[str, Any]:
"""直接操作数据库和微博 API 完成签到"""
import httpx
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
stmt = select(Account).where(Account.id == account_id)
result = await session.execute(stmt)
account = result.scalar_one_or_none()
if not account:
raise Exception(f"Account {account_id} not found")
if account.status not in ("pending", "active"):
return {"status": "skipped", "reason": f"status={account.status}"}
# 解密 cookie
key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY)
try:
cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key)
except Exception:
account.status = "invalid_cookie"
await session.commit()
return {"status": "failed", "reason": "cookie decryption failed"}
cookies = _parse_cookies(cookie_str)
# 获取超话列表
topics = await _fetch_topics(cookies)
if not topics:
return {"status": "completed", "signed": 0, "message": "no topics"}
# 逐个签到
signed = already = failed = 0
for topic in topics:
await asyncio.sleep(1.5)
r = await _do_single_signin(cookies, topic)
if r["status"] == "success":
s = "success"
signed += 1
elif r["status"] == "already_signed":
s = "failed_already_signed"
already += 1
else:
s = "failed_network"
failed += 1
log = SigninLog(
account_id=account.id,
topic_title=topic["title"],
status=s,
reward_info={"message": r["message"]},
signed_at=datetime.now(),
)
session.add(log)
account.last_checked_at = datetime.now()
if account.status != "active":
account.status = "active"
await session.commit()
logger.info(
f"签到结果: account={account_id}, "
f"signed={signed}, already={already}, failed={failed}"
)
return {
"status": "completed",
"signed": signed,
"already_signed": already,
"failed": failed,
"total_topics": len(topics),
}
finally:
await eng.dispose()
async def _fetch_topics(cookies: Dict[str, str]) -> List[dict]:
"""获取关注的超话列表"""
import httpx
topics: List[dict] = []
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
xsrf = client.cookies.get("XSRF-TOKEN", "")
headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"}
if xsrf:
headers["X-XSRF-TOKEN"] = xsrf
page = 1
while page <= 10:
resp = await client.get(
"https://weibo.com/ajax/profile/topicContent",
params={"tabid": "231093_-_chaohua", "page": str(page)},
headers=headers, cookies=cookies,
)
data = resp.json()
if data.get("ok") != 1:
break
topic_list = data.get("data", {}).get("list", [])
if not topic_list:
break
for item in topic_list:
title = item.get("topic_name", "") or item.get("title", "")
containerid = ""
for field in ("oid", "scheme"):
m = re.search(r"100808[0-9a-fA-F]+", item.get(field, ""))
if m:
containerid = m.group(0)
break
if title and containerid:
topics.append({"title": title, "containerid": containerid})
if page >= data.get("data", {}).get("max_page", 1):
break
page += 1
return topics
async def _do_single_signin(cookies: Dict[str, str], topic: dict) -> dict:
"""签到单个超话"""
import httpx
try:
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
xsrf = client.cookies.get("XSRF-TOKEN", "")
h = {
**WEIBO_HEADERS,
"Referer": f"https://weibo.com/p/{topic['containerid']}/super_index",
"X-Requested-With": "XMLHttpRequest",
}
if xsrf:
h["X-XSRF-TOKEN"] = xsrf
resp = await client.get(
"https://weibo.com/p/aj/general/button",
params={
"ajwvr": "6",
"api": "http://i.huati.weibo.com/aj/super/checkin",
"texta": "签到", "textb": "已签到",
"status": "0",
"id": topic["containerid"],
"location": "page_100808_super_index",
"timezone": "GMT+0800",
"lang": "zh-cn",
"plat": "Win32",
"ua": WEIBO_HEADERS["User-Agent"],
"screen": "1920*1080",
"__rnd": str(int(_time.time() * 1000)),
},
headers=h, cookies=cookies,
)
data = resp.json()
code = str(data.get("code", ""))
msg = data.get("msg", "")
if code == "100000":
tip = ""
if isinstance(data.get("data"), dict):
tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "")
return {"status": "success", "message": tip or "签到成功"}
elif code == "382004":
return {"status": "already_signed", "message": msg or "今日已签到"}
else:
return {"status": "failed", "message": f"code={code}, msg={msg}"}
except Exception as e:
return {"status": "failed", "message": str(e)}
# =============== 日志清理 ===============
# 保留天数,可通过环境变量覆盖
SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30"))
# 每批删除条数,避免长时间锁表
CLEANUP_BATCH_SIZE = 1000
@celery_app.task
def cleanup_old_signin_logs():
"""
清理超过 N 天的签到日志。
分批删除,每批最多 CLEANUP_BATCH_SIZE 条,避免锁表。
"""
logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...")
return _run_async(_async_cleanup())
async def _async_cleanup():
from sqlalchemy import delete, func
cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS)
total_deleted = 0
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
while True:
# 分批删除:先查出一批 id再按 id 删除
stmt = (
select(SigninLog.id)
.where(SigninLog.signed_at < cutoff)
.limit(CLEANUP_BATCH_SIZE)
)
result = await session.execute(stmt)
ids = [row[0] for row in result.all()]
if not ids:
break
del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids))
await session.execute(del_stmt)
await session.commit()
total_deleted += len(ids)
logger.info(f"🧹 已删除 {total_deleted} 条...")
# 如果本批不满,说明已经删完
if len(ids) < CLEANUP_BATCH_SIZE:
break
finally:
await eng.dispose()
logger.info(f"✅ 日志清理完成,共删除 {total_deleted} 条(截止 {cutoff.strftime('%Y-%m-%d')}")
return {"deleted": total_deleted, "cutoff": cutoff.isoformat()}
# 此文件已废弃,签到逻辑已迁移到 task_scheduler.app.main (APScheduler)

View File

@@ -1,6 +1,5 @@
# Weibo-HotSign Task Scheduler Service Requirements
# Task Queue
celery==5.3.6
# Weibo-HotSign Task Scheduler
apscheduler==3.10.4
redis==5.0.1
# Database
@@ -14,11 +13,10 @@ pydantic-settings==2.0.3
# HTTP Client
httpx==0.25.2
# Cron parsing
# Cron parsing (APScheduler 内置,但显式声明)
croniter==2.0.1
# Crypto (for cookie decryption)
cryptography==41.0.7
pycryptodome==3.19.0
# Utilities

View File

@@ -79,7 +79,7 @@ services:
networks:
- 1panel-network
# 定时任务调度器 (Celery Worker + Beat)
# 定时任务调度器 (APScheduler)
task-scheduler:
build:
context: ./backend