488 lines
17 KiB
Python
488 lines
17 KiB
Python
"""
|
||
Weibo-HotSign 定时任务调度器 (APScheduler)
|
||
|
||
启动时从 DB 加载所有 enabled 的 Task,按 cron 精确注册。
|
||
每 5 分钟重新同步一次 DB(处理新增/删除/修改的任务)。
|
||
签到逻辑直接操作数据库和微博 API,不依赖其他服务。
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import re
|
||
import asyncio
|
||
import logging
|
||
import signal
|
||
import time as _time
|
||
from datetime import datetime, timedelta
|
||
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
from apscheduler.triggers.interval import IntervalTrigger
|
||
from apscheduler.jobstores.memory import MemoryJobStore
|
||
|
||
# 确保 shared 模块可导入
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
|
||
|
||
from shared.config import shared_settings
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
datefmt="%Y-%m-%d %H:%M:%S",
|
||
)
|
||
logger = logging.getLogger("scheduler")
|
||
|
||
scheduler = BackgroundScheduler(
|
||
jobstores={"default": MemoryJobStore()},
|
||
timezone="Asia/Shanghai",
|
||
)
|
||
|
||
# 已注册的任务 ID 集合,用于同步时对比
|
||
_registered_tasks: dict = {} # task_id -> cron_expression
|
||
|
||
# 日志保留天数
|
||
SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30"))
|
||
CLEANUP_BATCH_SIZE = 1000
|
||
|
||
# Redis 订阅线程是否运行
|
||
_redis_listener_running = False
|
||
|
||
WEIBO_HEADERS = {
|
||
"User-Agent": (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
||
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||
),
|
||
"Referer": "https://weibo.com/",
|
||
"Accept": "*/*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
}
|
||
|
||
|
||
def _make_session():
|
||
"""每次创建独立的 engine + session,避免 event loop 冲突。"""
|
||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
||
from sqlalchemy.orm import sessionmaker
|
||
|
||
kwargs = {"echo": False}
|
||
if "sqlite" not in shared_settings.DATABASE_URL:
|
||
kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True)
|
||
eng = create_async_engine(shared_settings.DATABASE_URL, **kwargs)
|
||
factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False)
|
||
return factory, eng
|
||
|
||
|
||
def _run_async(coro):
|
||
"""在新 event loop 中运行 async 函数。"""
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
try:
|
||
return loop.run_until_complete(coro)
|
||
finally:
|
||
loop.close()
|
||
|
||
|
||
def _parse_cookies(cookie_str: str) -> dict:
|
||
cookies = {}
|
||
for pair in cookie_str.split(";"):
|
||
pair = pair.strip()
|
||
if "=" in pair:
|
||
k, v = pair.split("=", 1)
|
||
cookies[k.strip()] = v.strip()
|
||
return cookies
|
||
|
||
|
||
# =============== 任务同步 ===============
|
||
|
||
def sync_db_tasks():
|
||
"""从 DB 同步任务到 APScheduler。新增的加上,删除的移除,cron 变了的更新。"""
|
||
try:
|
||
tasks = _run_async(_load_tasks_from_db())
|
||
except Exception as e:
|
||
logger.error(f"同步任务失败: {e}")
|
||
return
|
||
|
||
db_task_ids = set()
|
||
for task_id, account_id, cron_expr in tasks:
|
||
db_task_ids.add(task_id)
|
||
job_id = f"signin_{task_id}"
|
||
|
||
# 已存在且 cron 没变 → 跳过
|
||
if task_id in _registered_tasks and _registered_tasks[task_id] == cron_expr:
|
||
continue
|
||
|
||
# 新增或 cron 变了 → 添加/替换
|
||
try:
|
||
parts = cron_expr.split()
|
||
if len(parts) != 5:
|
||
logger.warning(f"无效 cron: task={task_id}, expr={cron_expr}")
|
||
continue
|
||
|
||
trigger = CronTrigger(
|
||
minute=parts[0], hour=parts[1],
|
||
day=parts[2], month=parts[3], day_of_week=parts[4],
|
||
timezone="Asia/Shanghai",
|
||
)
|
||
scheduler.add_job(
|
||
run_signin,
|
||
trigger=trigger,
|
||
id=job_id,
|
||
args=[task_id, account_id],
|
||
replace_existing=True,
|
||
misfire_grace_time=300, # 5 分钟内的 misfire 仍然执行
|
||
)
|
||
_registered_tasks[task_id] = cron_expr
|
||
logger.info(f"✅ 注册任务: task={task_id}, account={account_id}, cron={cron_expr}")
|
||
except Exception as e:
|
||
logger.error(f"注册任务失败: task={task_id}, error={e}")
|
||
|
||
# 移除 DB 中已删除/禁用的任务
|
||
removed = set(_registered_tasks.keys()) - db_task_ids
|
||
for task_id in removed:
|
||
job_id = f"signin_{task_id}"
|
||
try:
|
||
scheduler.remove_job(job_id)
|
||
logger.info(f"🗑️ 移除任务: task={task_id}")
|
||
except Exception:
|
||
pass
|
||
_registered_tasks.pop(task_id, None)
|
||
|
||
logger.info(f"📋 任务同步完成: 活跃 {len(db_task_ids)} 个, 已注册 {len(_registered_tasks)} 个")
|
||
|
||
|
||
async def _load_tasks_from_db():
|
||
"""从 DB 读取所有 enabled 且账号 active/pending 的任务。"""
|
||
from sqlalchemy import select
|
||
from shared.models.task import Task
|
||
from shared.models.account import Account
|
||
|
||
SessionFactory, eng = _make_session()
|
||
try:
|
||
async with SessionFactory() as session:
|
||
stmt = (
|
||
select(Task.id, Task.account_id, Task.cron_expression)
|
||
.join(Account, Task.account_id == Account.id)
|
||
.where(Task.is_enabled == True)
|
||
.where(Account.status.in_(["pending", "active"]))
|
||
)
|
||
result = await session.execute(stmt)
|
||
return [(str(r[0]), str(r[1]), r[2]) for r in result.all()]
|
||
finally:
|
||
await eng.dispose()
|
||
|
||
|
||
# =============== 签到入口 ===============
|
||
|
||
def run_signin(task_id: str, account_id: str):
|
||
"""APScheduler 调用的签到入口(同步函数,内部跑 async)。"""
|
||
logger.info(f"🎯 开始签到: task={task_id}, account={account_id}")
|
||
try:
|
||
result = _run_async(_async_do_signin(account_id))
|
||
logger.info(f"✅ 签到完成: task={task_id}, result={result}")
|
||
except Exception as e:
|
||
logger.error(f"❌ 签到失败: task={task_id}, error={e}")
|
||
|
||
|
||
async def _async_do_signin(account_id: str):
|
||
"""执行单个账号的全量超话签到。"""
|
||
import httpx
|
||
from sqlalchemy import select
|
||
from shared.models.account import Account
|
||
from shared.models.signin_log import SigninLog
|
||
from shared.crypto import decrypt_cookie, derive_key
|
||
|
||
SessionFactory, eng = _make_session()
|
||
try:
|
||
async with SessionFactory() as session:
|
||
result = await session.execute(select(Account).where(Account.id == account_id))
|
||
account = result.scalar_one_or_none()
|
||
if not account:
|
||
return {"status": "error", "reason": "account not found"}
|
||
if account.status not in ("pending", "active"):
|
||
return {"status": "skipped", "reason": f"status={account.status}"}
|
||
|
||
key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY)
|
||
try:
|
||
cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key)
|
||
except Exception:
|
||
account.status = "invalid_cookie"
|
||
await session.commit()
|
||
return {"status": "failed", "reason": "cookie decryption failed"}
|
||
|
||
cookies = _parse_cookies(cookie_str)
|
||
|
||
# 获取超话列表
|
||
topics = await _fetch_topics(cookies)
|
||
if not topics:
|
||
return {"status": "completed", "signed": 0, "message": "no topics"}
|
||
|
||
# 逐个签到
|
||
signed = already = failed = 0
|
||
for topic in topics:
|
||
await asyncio.sleep(1.5)
|
||
r = await _do_single_signin(cookies, topic)
|
||
if r["status"] == "success":
|
||
s, signed = "success", signed + 1
|
||
elif r["status"] == "already_signed":
|
||
s, already = "failed_already_signed", already + 1
|
||
else:
|
||
s, failed = "failed_network", failed + 1
|
||
|
||
session.add(SigninLog(
|
||
account_id=account.id, topic_title=topic["title"],
|
||
status=s, reward_info={"message": r["message"]},
|
||
signed_at=datetime.now(),
|
||
))
|
||
|
||
account.last_checked_at = datetime.now()
|
||
if account.status != "active":
|
||
account.status = "active"
|
||
await session.commit()
|
||
|
||
logger.info(
|
||
f"签到结果: account={account_id}, "
|
||
f"signed={signed}, already={already}, failed={failed}"
|
||
)
|
||
return {"signed": signed, "already_signed": already, "failed": failed, "total": len(topics)}
|
||
finally:
|
||
await eng.dispose()
|
||
|
||
|
||
async def _fetch_topics(cookies: dict) -> list:
|
||
"""获取关注的超话列表。"""
|
||
import httpx
|
||
|
||
topics = []
|
||
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
|
||
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
|
||
xsrf = client.cookies.get("XSRF-TOKEN", "")
|
||
headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"}
|
||
if xsrf:
|
||
headers["X-XSRF-TOKEN"] = xsrf
|
||
|
||
page = 1
|
||
while page <= 10:
|
||
resp = await client.get(
|
||
"https://weibo.com/ajax/profile/topicContent",
|
||
params={"tabid": "231093_-_chaohua", "page": str(page)},
|
||
headers=headers, cookies=cookies,
|
||
)
|
||
data = resp.json()
|
||
if data.get("ok") != 1:
|
||
break
|
||
tlist = data.get("data", {}).get("list", [])
|
||
if not tlist:
|
||
break
|
||
for item in tlist:
|
||
title = item.get("topic_name", "") or item.get("title", "")
|
||
cid = ""
|
||
for field in ("oid", "scheme"):
|
||
m = re.search(r"100808[0-9a-fA-F]+", item.get(field, ""))
|
||
if m:
|
||
cid = m.group(0)
|
||
break
|
||
if title and cid:
|
||
topics.append({"title": title, "containerid": cid})
|
||
if page >= data.get("data", {}).get("max_page", 1):
|
||
break
|
||
page += 1
|
||
return topics
|
||
|
||
|
||
async def _do_single_signin(cookies: dict, topic: dict) -> dict:
|
||
"""签到单个超话。"""
|
||
import httpx
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
|
||
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
|
||
xsrf = client.cookies.get("XSRF-TOKEN", "")
|
||
h = {
|
||
**WEIBO_HEADERS,
|
||
"Referer": f"https://weibo.com/p/{topic['containerid']}/super_index",
|
||
"X-Requested-With": "XMLHttpRequest",
|
||
}
|
||
if xsrf:
|
||
h["X-XSRF-TOKEN"] = xsrf
|
||
|
||
resp = await client.get(
|
||
"https://weibo.com/p/aj/general/button",
|
||
params={
|
||
"ajwvr": "6",
|
||
"api": "http://i.huati.weibo.com/aj/super/checkin",
|
||
"texta": "签到", "textb": "已签到",
|
||
"status": "0",
|
||
"id": topic["containerid"],
|
||
"location": "page_100808_super_index",
|
||
"timezone": "GMT+0800",
|
||
"lang": "zh-cn",
|
||
"plat": "Win32",
|
||
"ua": WEIBO_HEADERS["User-Agent"],
|
||
"screen": "1920*1080",
|
||
"__rnd": str(int(_time.time() * 1000)),
|
||
},
|
||
headers=h, cookies=cookies,
|
||
)
|
||
data = resp.json()
|
||
code = str(data.get("code", ""))
|
||
msg = data.get("msg", "")
|
||
|
||
if code == "100000":
|
||
tip = ""
|
||
if isinstance(data.get("data"), dict):
|
||
tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "")
|
||
return {"status": "success", "message": tip or "签到成功"}
|
||
elif code == "382004":
|
||
return {"status": "already_signed", "message": msg or f"今天已签到({code})"}
|
||
else:
|
||
return {"status": "failed", "message": f"code={code}, msg={msg}"}
|
||
except Exception as e:
|
||
return {"status": "failed", "message": str(e)}
|
||
|
||
|
||
# =============== 日志清理 ===============
|
||
|
||
def cleanup_old_signin_logs():
|
||
"""清理超过 N 天的签到日志,分批删除避免锁表。"""
|
||
logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...")
|
||
try:
|
||
result = _run_async(_async_cleanup())
|
||
logger.info(f"✅ 日志清理完成: {result}")
|
||
except Exception as e:
|
||
logger.error(f"❌ 日志清理失败: {e}")
|
||
|
||
|
||
async def _async_cleanup():
|
||
from sqlalchemy import select, delete
|
||
from shared.models.signin_log import SigninLog
|
||
|
||
cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS)
|
||
total_deleted = 0
|
||
|
||
SessionFactory, eng = _make_session()
|
||
try:
|
||
async with SessionFactory() as session:
|
||
while True:
|
||
stmt = (
|
||
select(SigninLog.id)
|
||
.where(SigninLog.signed_at < cutoff)
|
||
.limit(CLEANUP_BATCH_SIZE)
|
||
)
|
||
result = await session.execute(stmt)
|
||
ids = [row[0] for row in result.all()]
|
||
|
||
if not ids:
|
||
break
|
||
|
||
del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids))
|
||
await session.execute(del_stmt)
|
||
await session.commit()
|
||
total_deleted += len(ids)
|
||
logger.info(f"🧹 已删除 {total_deleted} 条...")
|
||
|
||
if len(ids) < CLEANUP_BATCH_SIZE:
|
||
break
|
||
finally:
|
||
await eng.dispose()
|
||
|
||
return {"deleted": total_deleted, "cutoff": cutoff.isoformat()}
|
||
|
||
|
||
# =============== Redis 订阅(实时同步任务变更) ===============
|
||
|
||
def _start_redis_listener():
|
||
"""启动 Redis pub/sub 监听线程,收到 task_updates 消息后立即同步。"""
|
||
global _redis_listener_running
|
||
if not shared_settings.USE_REDIS or not shared_settings.REDIS_URL:
|
||
logger.info("⚠️ Redis 未启用,任务变更仅靠 5 分钟轮询同步")
|
||
return
|
||
|
||
import threading
|
||
import redis
|
||
|
||
def _listen():
|
||
global _redis_listener_running
|
||
_redis_listener_running = True
|
||
logger.info("📡 Redis 订阅线程启动,监听 task_updates 频道")
|
||
while _redis_listener_running:
|
||
try:
|
||
r = redis.from_url(shared_settings.REDIS_URL, decode_responses=True)
|
||
pubsub = r.pubsub()
|
||
pubsub.subscribe("task_updates")
|
||
for message in pubsub.listen():
|
||
if not _redis_listener_running:
|
||
break
|
||
if message["type"] == "message":
|
||
logger.info(f"📡 收到任务变更通知: {message['data'][:200]}")
|
||
sync_db_tasks()
|
||
pubsub.close()
|
||
r.close()
|
||
except Exception as e:
|
||
logger.warning(f"Redis 订阅异常,5 秒后重连: {e}")
|
||
_time.sleep(5)
|
||
|
||
t = threading.Thread(target=_listen, daemon=True, name="redis-listener")
|
||
t.start()
|
||
|
||
|
||
# =============== 启动入口 ===============
|
||
|
||
def _shutdown(signum, frame):
|
||
global _redis_listener_running
|
||
logger.info(f"收到信号 {signum},正在关闭调度器...")
|
||
_redis_listener_running = False
|
||
scheduler.shutdown(wait=False)
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
logger.info("=" * 60)
|
||
logger.info("🚀 Weibo-HotSign 定时调度器启动 (APScheduler)")
|
||
logger.info("=" * 60)
|
||
|
||
# 注册信号处理
|
||
signal.signal(signal.SIGTERM, _shutdown)
|
||
signal.signal(signal.SIGINT, _shutdown)
|
||
|
||
# 启动调度器
|
||
scheduler.start()
|
||
|
||
# 首次同步 DB 任务
|
||
sync_db_tasks()
|
||
|
||
# 启动 Redis 订阅,实时接收任务变更通知
|
||
_start_redis_listener()
|
||
|
||
# 每 5 分钟重新同步 DB(处理新增/删除/修改的任务)
|
||
scheduler.add_job(
|
||
sync_db_tasks,
|
||
trigger=IntervalTrigger(minutes=5),
|
||
id="sync_db_tasks",
|
||
replace_existing=True,
|
||
misfire_grace_time=60,
|
||
)
|
||
|
||
# 每天凌晨 3 点清理过期签到日志
|
||
scheduler.add_job(
|
||
cleanup_old_signin_logs,
|
||
trigger=CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"),
|
||
id="cleanup_old_signin_logs",
|
||
replace_existing=True,
|
||
misfire_grace_time=3600,
|
||
)
|
||
|
||
logger.info("📋 调度器已启动,等待任务触发...")
|
||
logger.info(f"📋 日志清理: 每天 03:00, 保留 {SIGNIN_LOG_RETAIN_DAYS} 天")
|
||
|
||
# 打印当前所有已注册的 job
|
||
jobs = scheduler.get_jobs()
|
||
for job in jobs:
|
||
logger.info(f" 📌 {job.id}: next_run={job.next_run_time}")
|
||
|
||
# 主线程保持运行
|
||
try:
|
||
while True:
|
||
_time.sleep(1)
|
||
except (KeyboardInterrupt, SystemExit):
|
||
logger.info("调度器关闭中...")
|
||
scheduler.shutdown(wait=False)
|