Files
weibo_signin/backend/task_scheduler/app/main.py

730 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Weibo-HotSign 定时任务调度器 (APScheduler)
启动时从 DB 加载所有 enabled 的 Task按 cron 精确注册。
每 5 分钟重新同步一次 DB处理新增/删除/修改的任务)。
签到逻辑直接操作数据库和微博 API不依赖其他服务。
"""
import os
import sys
import re
import asyncio
import logging
import signal
import time as _time
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.jobstores.memory import MemoryJobStore
# 确保 shared 模块可导入
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
from shared.config import shared_settings
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("scheduler")
scheduler = BackgroundScheduler(
jobstores={"default": MemoryJobStore()},
timezone="Asia/Shanghai",
)
# 已注册的任务 ID 集合,用于同步时对比
_registered_tasks: dict = {} # task_id -> cron_expression
# 日志保留天数
SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30"))
CLEANUP_BATCH_SIZE = 1000
# Webhook 推送地址和时间(从 DB 加载env 作为 fallback
WEBHOOK_URL = os.getenv("WEBHOOK_URL", "")
DAILY_REPORT_HOUR = int(os.getenv("DAILY_REPORT_HOUR", "23"))
DAILY_REPORT_MINUTE = int(os.getenv("DAILY_REPORT_MINUTE", "30"))
# Redis 订阅线程是否运行
_redis_listener_running = False
WEIBO_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Referer": "https://weibo.com/",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
def _make_session():
"""每次创建独立的 engine + session避免 event loop 冲突。"""
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
kwargs = {"echo": False}
if "sqlite" not in shared_settings.DATABASE_URL:
kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True)
eng = create_async_engine(shared_settings.DATABASE_URL, **kwargs)
factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False)
return factory, eng
def _run_async(coro):
"""在新 event loop 中运行 async 函数。"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
def _parse_cookies(cookie_str: str) -> dict:
cookies = {}
for pair in cookie_str.split(";"):
pair = pair.strip()
if "=" in pair:
k, v = pair.split("=", 1)
cookies[k.strip()] = v.strip()
return cookies
# =============== 任务同步 ===============
def load_config_from_db():
"""从 DB 加载系统配置,更新全局变量并重新调度日报任务。"""
global WEBHOOK_URL, DAILY_REPORT_HOUR, DAILY_REPORT_MINUTE
try:
config = _run_async(_fetch_config())
new_url = config.get("webhook_url", WEBHOOK_URL)
new_hour = int(config.get("daily_report_hour", DAILY_REPORT_HOUR))
new_minute = int(config.get("daily_report_minute", DAILY_REPORT_MINUTE))
changed = (new_url != WEBHOOK_URL or new_hour != DAILY_REPORT_HOUR or new_minute != DAILY_REPORT_MINUTE)
WEBHOOK_URL = new_url
DAILY_REPORT_HOUR = new_hour
DAILY_REPORT_MINUTE = new_minute
if changed and scheduler.running:
# 重新调度日报任务
try:
scheduler.remove_job("daily_report")
except Exception:
pass
if WEBHOOK_URL:
scheduler.add_job(
send_daily_report,
trigger=CronTrigger(hour=DAILY_REPORT_HOUR, minute=DAILY_REPORT_MINUTE, timezone="Asia/Shanghai"),
id="daily_report",
replace_existing=True,
misfire_grace_time=3600,
)
logger.info(f"📊 日报任务已更新: {DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d}")
else:
logger.info("📊 Webhook 为空,日报任务已移除")
logger.info(f"⚙️ 配置加载完成: webhook={'已配置' if WEBHOOK_URL else '未配置'}, 推送时间={DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d}")
except Exception as e:
logger.warning(f"从 DB 加载配置失败,使用默认值: {e}")
async def _fetch_config() -> dict:
from sqlalchemy import select
from shared.models.system_config import SystemConfig
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
result = await session.execute(select(SystemConfig))
return {r.key: r.value for r in result.scalars().all()}
finally:
await eng.dispose()
def sync_db_tasks():
"""从 DB 同步任务到 APScheduler。新增的加上删除的移除cron 变了的更新。"""
try:
tasks = _run_async(_load_tasks_from_db())
except Exception as e:
logger.error(f"同步任务失败: {e}")
return
db_task_ids = set()
for task_id, account_id, cron_expr in tasks:
db_task_ids.add(task_id)
job_id = f"signin_{task_id}"
# 已存在且 cron 没变 → 跳过
if task_id in _registered_tasks and _registered_tasks[task_id] == cron_expr:
continue
# 新增或 cron 变了 → 添加/替换
try:
parts = cron_expr.split()
if len(parts) != 5:
logger.warning(f"无效 cron: task={task_id}, expr={cron_expr}")
continue
trigger = CronTrigger(
minute=parts[0], hour=parts[1],
day=parts[2], month=parts[3], day_of_week=parts[4],
timezone="Asia/Shanghai",
)
scheduler.add_job(
run_signin,
trigger=trigger,
id=job_id,
args=[task_id, account_id],
replace_existing=True,
misfire_grace_time=300, # 5 分钟内的 misfire 仍然执行
)
_registered_tasks[task_id] = cron_expr
logger.info(f"✅ 注册任务: task={task_id}, account={account_id}, cron={cron_expr}")
except Exception as e:
logger.error(f"注册任务失败: task={task_id}, error={e}")
# 移除 DB 中已删除/禁用的任务
removed = set(_registered_tasks.keys()) - db_task_ids
for task_id in removed:
job_id = f"signin_{task_id}"
try:
scheduler.remove_job(job_id)
logger.info(f"🗑️ 移除任务: task={task_id}")
except Exception:
pass
_registered_tasks.pop(task_id, None)
logger.info(f"📋 任务同步完成: 活跃 {len(db_task_ids)} 个, 已注册 {len(_registered_tasks)}")
async def _load_tasks_from_db():
"""从 DB 读取所有 enabled 且账号 active/pending 的任务。"""
from sqlalchemy import select
from shared.models.task import Task
from shared.models.account import Account
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
stmt = (
select(Task.id, Task.account_id, Task.cron_expression)
.join(Account, Task.account_id == Account.id)
.where(Task.is_enabled == True)
.where(Account.status.in_(["pending", "active"]))
)
result = await session.execute(stmt)
return [(str(r[0]), str(r[1]), r[2]) for r in result.all()]
finally:
await eng.dispose()
# =============== 签到入口 ===============
def run_signin(task_id: str, account_id: str):
"""APScheduler 调用的签到入口(同步函数,内部跑 async"""
logger.info(f"🎯 开始签到: task={task_id}, account={account_id}")
try:
result = _run_async(_async_do_signin(account_id))
logger.info(f"✅ 签到完成: task={task_id}, result={result}")
except Exception as e:
logger.error(f"❌ 签到失败: task={task_id}, error={e}")
async def _async_do_signin(account_id: str):
"""执行单个账号的全量超话签到。"""
import httpx
from sqlalchemy import select
from shared.models.account import Account
from shared.models.signin_log import SigninLog
from shared.crypto import decrypt_cookie, derive_key
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
result = await session.execute(select(Account).where(Account.id == account_id))
account = result.scalar_one_or_none()
if not account:
return {"status": "error", "reason": "account not found"}
if account.status not in ("pending", "active"):
return {"status": "skipped", "reason": f"status={account.status}"}
key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY)
try:
cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key)
except Exception:
account.status = "invalid_cookie"
await session.commit()
return {"status": "failed", "reason": "cookie decryption failed"}
cookies = _parse_cookies(cookie_str)
# 获取超话列表
topics = await _fetch_topics(cookies)
if not topics:
return {"status": "completed", "signed": 0, "message": "no topics"}
# 逐个签到
signed = already = failed = 0
for topic in topics:
await asyncio.sleep(1.5)
r = await _do_single_signin(cookies, topic)
if r["status"] == "success":
s, signed = "success", signed + 1
elif r["status"] == "already_signed":
s, already = "failed_already_signed", already + 1
else:
s, failed = "failed_network", failed + 1
session.add(SigninLog(
account_id=account.id, topic_title=topic["title"],
status=s, reward_info={"message": r["message"]},
signed_at=datetime.now(),
))
account.last_checked_at = datetime.now()
if account.status != "active":
account.status = "active"
await session.commit()
logger.info(
f"签到结果: account={account_id}, "
f"signed={signed}, already={already}, failed={failed}"
)
return {"signed": signed, "already_signed": already, "failed": failed, "total": len(topics)}
finally:
await eng.dispose()
async def _fetch_topics(cookies: dict) -> list:
"""获取关注的超话列表。"""
import httpx
topics = []
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
xsrf = client.cookies.get("XSRF-TOKEN", "")
headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"}
if xsrf:
headers["X-XSRF-TOKEN"] = xsrf
page = 1
while page <= 10:
resp = await client.get(
"https://weibo.com/ajax/profile/topicContent",
params={"tabid": "231093_-_chaohua", "page": str(page)},
headers=headers, cookies=cookies,
)
data = resp.json()
if data.get("ok") != 1:
break
tlist = data.get("data", {}).get("list", [])
if not tlist:
break
for item in tlist:
title = item.get("topic_name", "") or item.get("title", "")
cid = ""
for field in ("oid", "scheme"):
m = re.search(r"100808[0-9a-fA-F]+", item.get(field, ""))
if m:
cid = m.group(0)
break
if title and cid:
topics.append({"title": title, "containerid": cid})
if page >= data.get("data", {}).get("max_page", 1):
break
page += 1
return topics
async def _do_single_signin(cookies: dict, topic: dict) -> dict:
"""签到单个超话。"""
import httpx
try:
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
xsrf = client.cookies.get("XSRF-TOKEN", "")
h = {
**WEIBO_HEADERS,
"Referer": f"https://weibo.com/p/{topic['containerid']}/super_index",
"X-Requested-With": "XMLHttpRequest",
}
if xsrf:
h["X-XSRF-TOKEN"] = xsrf
resp = await client.get(
"https://weibo.com/p/aj/general/button",
params={
"ajwvr": "6",
"api": "http://i.huati.weibo.com/aj/super/checkin",
"texta": "签到", "textb": "已签到",
"status": "0",
"id": topic["containerid"],
"location": "page_100808_super_index",
"timezone": "GMT+0800",
"lang": "zh-cn",
"plat": "Win32",
"ua": WEIBO_HEADERS["User-Agent"],
"screen": "1920*1080",
"__rnd": str(int(_time.time() * 1000)),
},
headers=h, cookies=cookies,
)
data = resp.json()
code = str(data.get("code", ""))
msg = data.get("msg", "")
if code == "100000":
tip = ""
if isinstance(data.get("data"), dict):
tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "")
return {"status": "success", "message": tip or "签到成功"}
elif code == "382004":
return {"status": "already_signed", "message": msg or f"今天已签到({code})"}
else:
return {"status": "failed", "message": f"code={code}, msg={msg}"}
except Exception as e:
return {"status": "failed", "message": str(e)}
# =============== Webhook 每日报告 ===============
def send_daily_report():
"""每日签到结果 + 账号状态汇总,推送到 Webhook。"""
if not WEBHOOK_URL:
logger.info("⚠️ WEBHOOK_URL 未配置,跳过每日报告推送")
return
logger.info("📊 开始生成每日报告...")
try:
report = _run_async(_build_daily_report())
_send_webhook(report)
logger.info("✅ 每日报告推送成功")
except Exception as e:
logger.error(f"❌ 每日报告推送失败: {e}")
async def _build_daily_report() -> str:
"""从 DB 汇总今日签到数据和账号状态。"""
from sqlalchemy import select, func as sa_func
from shared.models.account import Account
from shared.models.signin_log import SigninLog
from shared.models.user import User
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
now = datetime.now()
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
# 1. 账号总览
acc_status = await session.execute(
select(Account.status, sa_func.count())
.group_by(Account.status)
)
status_map = {row[0]: row[1] for row in acc_status.all()}
total_accounts = sum(status_map.values())
active = status_map.get("active", 0)
pending = status_map.get("pending", 0)
invalid = status_map.get("invalid_cookie", 0)
# 2. 今日签到统计
log_stats = await session.execute(
select(SigninLog.status, sa_func.count())
.where(SigninLog.signed_at >= today_start)
.group_by(SigninLog.status)
)
log_map = {row[0]: row[1] for row in log_stats.all()}
success = log_map.get("success", 0)
already = log_map.get("failed_already_signed", 0)
failed_net = log_map.get("failed_network", 0)
total_logs = sum(log_map.values())
# 3. 今日各账号签到明细
detail_rows = await session.execute(
select(
Account.remark,
Account.weibo_user_id,
SigninLog.status,
sa_func.count(),
)
.join(Account, SigninLog.account_id == Account.id)
.where(SigninLog.signed_at >= today_start)
.group_by(Account.id, Account.remark, Account.weibo_user_id, SigninLog.status)
.order_by(Account.remark)
)
# 按账号聚合
account_details = {}
for remark, uid, st, cnt in detail_rows.all():
name = remark or uid
if name not in account_details:
account_details[name] = {"success": 0, "already": 0, "failed": 0}
if st == "success":
account_details[name]["success"] += cnt
elif st == "failed_already_signed":
account_details[name]["already"] += cnt
else:
account_details[name]["failed"] += cnt
# 4. Cookie 即将失效的账号(超过 3 天未检查)
stale_cutoff = now - timedelta(days=3)
stale_result = await session.execute(
select(Account.remark, Account.weibo_user_id, Account.last_checked_at)
.where(Account.status == "active")
.where(
(Account.last_checked_at < stale_cutoff) | (Account.last_checked_at == None)
)
)
stale_accounts = [
(row[0] or row[1], row[2].strftime("%m-%d %H:%M") if row[2] else "从未")
for row in stale_result.all()
]
finally:
await eng.dispose()
# 组装报告
lines = [
"📊 微博超话签到日报",
f"{now.strftime('%Y-%m-%d %H:%M')}",
"━━━━━━━━━━━━━━━━━━",
"",
"📱 账号状态",
f" 总计: {total_accounts}",
f" ✅ 正常: {active} ⏳ 待验证: {pending} ❌ 失效: {invalid}",
]
if invalid > 0:
lines.append(f" ⚠️ 有 {invalid} 个账号 Cookie 已失效,请及时更新")
lines += [
"",
"🎯 今日签到",
f" 总计: {total_logs} 条记录",
f" ✅ 成功: {success} 📌 已签: {already} ❌ 失败: {failed_net}",
]
if account_details:
lines += ["", "📋 账号明细"]
for name, d in account_details.items():
lines.append(f" {name}: ✅{d['success']} 📌{d['already']}{d['failed']}")
if stale_accounts:
lines += ["", "⚠️ 需要关注"]
for name, last in stale_accounts:
lines.append(f" {name} (上次检查: {last})")
if total_logs == 0:
lines += ["", "💤 今日暂无签到记录"]
return "\n".join(lines)
def _send_webhook(content: str):
"""发送消息到 Webhook自动适配企业微信/钉钉/飞书格式)。"""
import httpx
url = WEBHOOK_URL
if not url:
return
# 根据 URL 特征判断平台
if "qyapi.weixin.qq.com" in url:
# 企业微信
payload = {"msgtype": "markdown", "markdown": {"content": content}}
elif "oapi.dingtalk.com" in url:
# 钉钉
payload = {"msgtype": "markdown", "markdown": {"title": "签到日报", "text": content}}
elif "open.feishu.cn" in url:
# 飞书自定义机器人 - 使用富文本消息
# 将 Markdown 转为纯文本(飞书 text 类型不支持 Markdown
plain = content.replace("## ", "").replace("### ", "\n").replace("**", "").replace("> ", "")
payload = {"msg_type": "text", "content": {"text": plain}}
else:
# 通用 JSON兼容自定义 Webhook
payload = {"text": content, "markdown": content}
resp = httpx.post(url, json=payload, timeout=15)
if resp.status_code != 200:
logger.warning(f"Webhook 响应异常: status={resp.status_code}, body={resp.text[:200]}")
else:
resp_data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
if resp_data.get("code", 0) != 0 and resp_data.get("StatusCode", 0) != 0:
logger.warning(f"Webhook 业务异常: {resp.text[:300]}")
# =============== 日志清理 ===============
def cleanup_old_signin_logs():
"""清理超过 N 天的签到日志,分批删除避免锁表。"""
logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...")
try:
result = _run_async(_async_cleanup())
logger.info(f"✅ 日志清理完成: {result}")
except Exception as e:
logger.error(f"❌ 日志清理失败: {e}")
async def _async_cleanup():
from sqlalchemy import select, delete
from shared.models.signin_log import SigninLog
cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS)
total_deleted = 0
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
while True:
stmt = (
select(SigninLog.id)
.where(SigninLog.signed_at < cutoff)
.limit(CLEANUP_BATCH_SIZE)
)
result = await session.execute(stmt)
ids = [row[0] for row in result.all()]
if not ids:
break
del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids))
await session.execute(del_stmt)
await session.commit()
total_deleted += len(ids)
logger.info(f"🧹 已删除 {total_deleted} 条...")
if len(ids) < CLEANUP_BATCH_SIZE:
break
finally:
await eng.dispose()
return {"deleted": total_deleted, "cutoff": cutoff.isoformat()}
# =============== Redis 订阅(实时同步任务变更) ===============
def _start_redis_listener():
"""启动 Redis pub/sub 监听线程,收到 task_updates 消息后立即同步。"""
global _redis_listener_running
if not shared_settings.USE_REDIS or not shared_settings.REDIS_URL:
logger.info("⚠️ Redis 未启用,任务变更仅靠 5 分钟轮询同步")
return
import threading
import redis
def _listen():
global _redis_listener_running
_redis_listener_running = True
logger.info("📡 Redis 订阅线程启动,监听 task_updates / config_updates 频道")
while _redis_listener_running:
try:
r = redis.from_url(shared_settings.REDIS_URL, decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe("task_updates", "config_updates")
for message in pubsub.listen():
if not _redis_listener_running:
break
if message["type"] == "message":
channel = message.get("channel", "")
if channel == "config_updates":
logger.info("📡 收到配置变更通知,重新加载...")
load_config_from_db()
else:
logger.info(f"📡 收到任务变更通知: {message['data'][:200]}")
sync_db_tasks()
pubsub.close()
r.close()
except Exception as e:
logger.warning(f"Redis 订阅异常5 秒后重连: {e}")
_time.sleep(5)
t = threading.Thread(target=_listen, daemon=True, name="redis-listener")
t.start()
# =============== 启动入口 ===============
def _shutdown(signum, frame):
global _redis_listener_running
logger.info(f"收到信号 {signum},正在关闭调度器...")
_redis_listener_running = False
scheduler.shutdown(wait=False)
sys.exit(0)
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("🚀 Weibo-HotSign 定时调度器启动 (APScheduler)")
logger.info("=" * 60)
# 注册信号处理
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
# 启动调度器
scheduler.start()
# 从 DB 加载配置Webhook 地址、推送时间等)
load_config_from_db()
# 首次同步 DB 任务
sync_db_tasks()
# 启动 Redis 订阅,实时接收任务变更通知
_start_redis_listener()
# 每 50 分钟重新同步 DB兜底正常靠 Redis 实时通知)
scheduler.add_job(
sync_db_tasks,
trigger=IntervalTrigger(minutes=50),
id="sync_db_tasks",
replace_existing=True,
misfire_grace_time=60,
)
# 每天凌晨 3 点清理过期签到日志
scheduler.add_job(
cleanup_old_signin_logs,
trigger=CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"),
id="cleanup_old_signin_logs",
replace_existing=True,
misfire_grace_time=3600,
)
# 每天定时推送签到日报到 Webhook如果 load_config_from_db 已调度则跳过)
if WEBHOOK_URL and not scheduler.get_job("daily_report"):
scheduler.add_job(
send_daily_report,
trigger=CronTrigger(hour=DAILY_REPORT_HOUR, minute=DAILY_REPORT_MINUTE, timezone="Asia/Shanghai"),
id="daily_report",
replace_existing=True,
misfire_grace_time=3600,
)
if WEBHOOK_URL:
logger.info(f"📊 每日报告: 每天 {DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d} 推送到 Webhook")
else:
logger.info("⚠️ Webhook 未配置,每日报告推送已禁用(可在管理面板设置)")
logger.info("📋 调度器已启动,等待任务触发...")
logger.info(f"📋 日志清理: 每天 03:00, 保留 {SIGNIN_LOG_RETAIN_DAYS}")
# 打印当前所有已注册的 job
jobs = scheduler.get_jobs()
for job in jobs:
logger.info(f" 📌 {job.id}: next_run={job.next_run_time}")
# 主线程保持运行
try:
while True:
_time.sleep(1)
except (KeyboardInterrupt, SystemExit):
logger.info("调度器关闭中...")
scheduler.shutdown(wait=False)