Files
weibo_signin/backend/task_scheduler/app/main.py

988 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Weibo-HotSign 定时任务调度器 (APScheduler)
启动时从 DB 加载所有 enabled 的 Task按 cron 精确注册。
每 5 分钟重新同步一次 DB处理新增/删除/修改的任务)。
签到逻辑直接操作数据库和微博 API不依赖其他服务。
"""
import os
import sys
import re
import asyncio
import logging
import signal
import time as _time
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.jobstores.memory import MemoryJobStore
# 确保 shared 模块可导入
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
from shared.config import shared_settings
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("scheduler")
scheduler = BackgroundScheduler(
jobstores={"default": MemoryJobStore()},
timezone="Asia/Shanghai",
)
# 已注册的任务 ID 集合,用于同步时对比
_registered_tasks: dict = {} # task_id -> cron_expression
# 日志保留天数
SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30"))
CLEANUP_BATCH_SIZE = 1000
# Webhook 推送地址和时间(从 DB 加载env 作为 fallback
WEBHOOK_URL = os.getenv("WEBHOOK_URL", "")
DAILY_REPORT_HOUR = int(os.getenv("DAILY_REPORT_HOUR", "23"))
DAILY_REPORT_MINUTE = int(os.getenv("DAILY_REPORT_MINUTE", "30"))
# Redis 订阅线程是否运行
_redis_listener_running = False
WEIBO_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Referer": "https://weibo.com/",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
def _make_session():
"""每次创建独立的 engine + session避免 event loop 冲突。"""
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
kwargs = {"echo": False}
if "sqlite" not in shared_settings.DATABASE_URL:
kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True)
eng = create_async_engine(shared_settings.DATABASE_URL, **kwargs)
factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False)
return factory, eng
def _run_async(coro):
"""在新 event loop 中运行 async 函数。"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
def _parse_cookies(cookie_str: str) -> dict:
cookies = {}
for pair in cookie_str.split(";"):
pair = pair.strip()
if "=" in pair:
k, v = pair.split("=", 1)
cookies[k.strip()] = v.strip()
return cookies
# =============== 任务同步 ===============
def load_config_from_db():
"""从 DB 加载系统配置,更新全局变量并重新调度日报任务。"""
global WEBHOOK_URL, DAILY_REPORT_HOUR, DAILY_REPORT_MINUTE
try:
config = _run_async(_fetch_config())
new_url = config.get("webhook_url", WEBHOOK_URL)
new_hour = int(config.get("daily_report_hour", DAILY_REPORT_HOUR))
new_minute = int(config.get("daily_report_minute", DAILY_REPORT_MINUTE))
changed = (new_url != WEBHOOK_URL or new_hour != DAILY_REPORT_HOUR or new_minute != DAILY_REPORT_MINUTE)
WEBHOOK_URL = new_url
DAILY_REPORT_HOUR = new_hour
DAILY_REPORT_MINUTE = new_minute
if changed and scheduler.running:
# 重新调度日报任务
try:
scheduler.remove_job("daily_report")
except Exception:
pass
if WEBHOOK_URL:
scheduler.add_job(
send_daily_report,
trigger=CronTrigger(hour=DAILY_REPORT_HOUR, minute=DAILY_REPORT_MINUTE, timezone="Asia/Shanghai"),
id="daily_report",
replace_existing=True,
misfire_grace_time=3600,
)
logger.info(f"📊 日报任务已更新: {DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d}")
else:
logger.info("📊 Webhook 为空,日报任务已移除")
logger.info(f"⚙️ 配置加载完成: webhook={'已配置' if WEBHOOK_URL else '未配置'}, 推送时间={DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d}")
except Exception as e:
logger.warning(f"从 DB 加载配置失败,使用默认值: {e}")
async def _fetch_config() -> dict:
from sqlalchemy import select
from shared.models.system_config import SystemConfig
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
result = await session.execute(select(SystemConfig))
return {r.key: r.value for r in result.scalars().all()}
finally:
await eng.dispose()
def sync_db_tasks():
"""从 DB 同步任务到 APScheduler。新增的加上删除的移除cron 变了的更新。"""
try:
tasks = _run_async(_load_tasks_from_db())
except Exception as e:
logger.error(f"同步任务失败: {e}")
return
db_task_ids = set()
for task_id, account_id, cron_expr in tasks:
db_task_ids.add(task_id)
job_id = f"signin_{task_id}"
# 已存在且 cron 没变 → 跳过
if task_id in _registered_tasks and _registered_tasks[task_id] == cron_expr:
continue
# 新增或 cron 变了 → 添加/替换
try:
parts = cron_expr.split()
if len(parts) != 5:
logger.warning(f"无效 cron: task={task_id}, expr={cron_expr}")
continue
# 提前 1 分钟触发,用于预加载超话列表,到整点再发签到请求
orig_minute = parts[0]
orig_hour = parts[1]
try:
m = int(orig_minute)
h = int(orig_hour) if orig_hour != "*" else None
if m == 0:
early_minute = "59"
early_hour = str(h - 1) if h is not None and h > 0 else "23" if h == 0 else "*"
else:
early_minute = str(m - 1)
early_hour = orig_hour
except (ValueError, TypeError):
# 复杂 cron 表达式(如 */5不做提前
early_minute = orig_minute
early_hour = orig_hour
trigger = CronTrigger(
minute=early_minute, hour=early_hour,
day=parts[2], month=parts[3], day_of_week=parts[4],
timezone="Asia/Shanghai",
)
scheduler.add_job(
run_signin,
trigger=trigger,
id=job_id,
args=[task_id, account_id, cron_expr],
replace_existing=True,
misfire_grace_time=300,
)
_registered_tasks[task_id] = cron_expr
actual_cron = f"{early_minute} {early_hour} {parts[2]} {parts[3]} {parts[4]}"
logger.info(f"✅ 注册任务: task={task_id}, account={account_id}, 用户cron={cron_expr}, 实际触发={actual_cron}")
except Exception as e:
logger.error(f"注册任务失败: task={task_id}, error={e}")
# 移除 DB 中已删除/禁用的任务
removed = set(_registered_tasks.keys()) - db_task_ids
for task_id in removed:
job_id = f"signin_{task_id}"
try:
scheduler.remove_job(job_id)
logger.info(f"🗑️ 移除任务: task={task_id}")
except Exception:
pass
_registered_tasks.pop(task_id, None)
logger.info(f"📋 任务同步完成: 活跃 {len(db_task_ids)} 个, 已注册 {len(_registered_tasks)}")
async def _load_tasks_from_db():
"""从 DB 读取所有 enabled 且账号 active/pending 的任务。"""
from sqlalchemy import select
from shared.models.task import Task
from shared.models.account import Account
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
stmt = (
select(Task.id, Task.account_id, Task.cron_expression)
.join(Account, Task.account_id == Account.id)
.where(Task.is_enabled == True)
.where(Account.status.in_(["pending", "active"]))
)
result = await session.execute(stmt)
return [(str(r[0]), str(r[1]), r[2]) for r in result.all()]
finally:
await eng.dispose()
# =============== 签到入口 ===============
def run_signin(task_id: str, account_id: str, cron_expr: str = ""):
"""APScheduler 调用的签到入口(同步函数,内部跑 async"""
logger.info(f"🎯 开始签到: task={task_id}, account={account_id}, cron={cron_expr}")
start = _time.time()
try:
result = _run_async(asyncio.wait_for(_async_do_signin(account_id, cron_expr), timeout=300))
elapsed = _time.time() - start
result["elapsed_seconds"] = round(elapsed, 1)
logger.info(f"✅ 签到完成: task={task_id}, 耗时={elapsed:.1f}s, result={result}")
# 签到完成后立即推送通知
_push_signin_result(account_id, result, elapsed)
except asyncio.TimeoutError:
elapsed = _time.time() - start
logger.error(f"⏰ 签到超时(5分钟): task={task_id}, account={account_id}")
_push_signin_result(account_id, {"status": "timeout"}, elapsed)
except Exception as e:
elapsed = _time.time() - start
logger.error(f"❌ 签到失败: task={task_id}, error={e}")
_push_signin_result(account_id, {"status": "error", "reason": str(e)}, elapsed)
def _push_signin_result(account_id: str, result: dict, elapsed: float):
"""签到完成后立即推送结果到 Webhook。"""
if not WEBHOOK_URL:
return
try:
# 获取账号备注名
remark = _run_async(_get_account_remark(account_id)) or account_id[:8]
status = result.get("status", "")
if status == "timeout":
lines = [f"{remark} 签到超时 ({elapsed:.1f}s)"]
elif status == "error":
lines = [f"{remark} 签到异常: {result.get('reason', '未知')}"]
elif status == "completed" and result.get("signed", 0) == 0 and result.get("total", 0) == 0:
msg = result.get("message", "")
if "no topics" in msg:
lines = [f"⚠️ {remark} Cookie 已失效,请重新扫码添加"]
elif "no selected" in msg:
lines = [f"⚠️ {remark} 签到跳过: 没有选中的超话"]
else:
lines = [f"⚠️ {remark} 签到异常: 0 个超话 ({msg})"]
else:
signed = result.get("signed", 0)
already = result.get("already_signed", 0)
failed = result.get("failed", 0)
total = result.get("total", 0)
details = result.get("details", [])
lines = [
f"🎯 {remark} 签到完成",
f"⏱ 耗时: {elapsed:.1f}s | 超话: {total}",
f"✅ 成功: {signed} 📌 已签: {already} ❌ 失败: {failed}",
]
# 签到名次明细
if details:
lines.append("")
for d in details:
topic = d.get("topic", "")
msg = d.get("message", "")
st = d.get("status", "")
icon = "" if st == "success" else "📌" if st == "already_signed" else ""
lines.append(f" {icon} {topic}: {msg}")
_send_webhook("\n".join(lines))
except Exception as e:
logger.warning(f"推送签到结果失败: {e}")
async def _get_account_remark(account_id: str) -> str:
from sqlalchemy import select
from shared.models.account import Account
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
result = await session.execute(
select(Account.remark, Account.weibo_user_id).where(Account.id == account_id)
)
row = result.first()
return (row[0] or row[1]) if row else ""
finally:
await eng.dispose()
async def _async_do_signin(account_id: str, cron_expr: str = ""):
"""
执行单个账号的全量超话签到。
流程:提前 1 分钟触发 → 预加载超话列表 + XSRF → 等到目标整点 → 快速签到
"""
import httpx
from sqlalchemy import select
from shared.models.account import Account
from shared.models.signin_log import SigninLog
from shared.crypto import decrypt_cookie, derive_key
# 计算目标签到时间(用户设定的 cron 对应的时间点)
target_time = None
if cron_expr:
try:
from croniter import croniter
# 从当前时间往后算下一个 cron 触发点(因为我们提前了 1 分钟)
now = datetime.now()
cron = croniter(cron_expr, now)
target_time = cron.get_next(datetime)
# 如果下次触发超过 2 分钟,说明计算有误,忽略
if (target_time - now).total_seconds() > 120:
target_time = None
except Exception:
target_time = None
# ---- 阶段 1: 短事务读取账号信息 ----
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
result = await session.execute(select(Account).where(Account.id == account_id))
account = result.scalar_one_or_none()
if not account:
return {"status": "error", "reason": "account not found"}
if account.status not in ("pending", "active"):
return {"status": "skipped", "reason": f"status={account.status}"}
key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY)
try:
cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key)
except Exception:
account.status = "invalid_cookie"
await session.commit()
return {"status": "failed", "reason": "cookie decryption failed"}
acc_id = str(account.id)
acc_selected_topics = account.selected_topics # 用户选择的超话列表
except Exception as e:
await eng.dispose()
raise e
cookies = _parse_cookies(cookie_str)
# ---- 阶段 2: 预加载超话列表 + XSRF token ----
try:
topics = await _fetch_topics(cookies)
if not topics:
async with SessionFactory() as session:
result = await session.execute(select(Account).where(Account.id == acc_id))
acc = result.scalar_one_or_none()
if acc:
acc.status = "invalid_cookie"
acc.last_checked_at = datetime.now()
await session.commit()
return {"status": "completed", "signed": 0, "message": "no topics - cookie可能已失效"}
# 如果用户选择了特定超话,只签选中的
if acc_selected_topics and isinstance(acc_selected_topics, list):
selected_cids = {t.get("containerid") for t in acc_selected_topics if t.get("containerid")}
if selected_cids:
topics = [t for t in topics if t["containerid"] in selected_cids]
logger.info(f"📌 按用户选择过滤: {len(topics)} 个超话 (共 {len(selected_cids)} 个已选)")
if not topics:
return {"status": "completed", "signed": 0, "message": "no selected topics"}
signed = already = failed = 0
log_entries = []
async with httpx.AsyncClient(timeout=15, follow_redirects=True) as client:
# 预获取 XSRF token
await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
xsrf = client.cookies.get("XSRF-TOKEN", "")
logger.info(f"📦 预加载完成: account={account_id}, topics={len(topics)}, xsrf={'' if xsrf else ''}")
# ---- 等到目标时间再开始签到 ----
if target_time:
wait_seconds = (target_time - datetime.now()).total_seconds()
if 0 < wait_seconds <= 90:
logger.info(f"⏳ 等待 {wait_seconds:.1f} 秒到目标时间 {target_time.strftime('%H:%M:%S')}")
await asyncio.sleep(wait_seconds)
# ---- 快速签到(间隔 0.5 秒) ----
for topic in topics:
r = await _do_single_signin(client, cookies, topic, xsrf)
if r["status"] == "success":
s, signed = "success", signed + 1
elif r["status"] == "already_signed":
s, already = "failed_already_signed", already + 1
else:
s, failed = "failed_network", failed + 1
log_entries.append(SigninLog(
account_id=acc_id, topic_title=topic["title"],
status=s, reward_info={"message": r["message"]},
signed_at=datetime.now(),
))
await asyncio.sleep(0.5)
# ---- 阶段 3: 短事务写日志 + 更新状态 ----
async with SessionFactory() as session:
for log in log_entries:
session.add(log)
result = await session.execute(select(Account).where(Account.id == acc_id))
acc = result.scalar_one_or_none()
if acc:
acc.last_checked_at = datetime.now()
if acc.status != "active":
acc.status = "active"
await session.commit()
logger.info(
f"签到结果: account={account_id}, "
f"signed={signed}, already={already}, failed={failed}"
)
return {
"signed": signed, "already_signed": already, "failed": failed,
"total": len(topics),
"details": [
{"topic": e.topic_title, "status": e.status,
"message": (e.reward_info or {}).get("message", "")}
for e in log_entries
],
}
except Exception as e:
logger.error(f"签到过程异常: account={account_id}, error={e}")
return {"status": "error", "reason": str(e)}
finally:
await eng.dispose()
async def _fetch_topics(cookies: dict) -> list:
"""
获取关注的超话列表。
带重试机制微博凌晨可能触发风控302 到通行证验证页),重试 2 次。
返回空列表表示获取失败Cookie 失效或风控)。
"""
import httpx
max_retries = 3
for attempt in range(max_retries):
topics = []
try:
# 不自动跟随重定向,手动检测 302
async with httpx.AsyncClient(timeout=15, follow_redirects=False) as client:
resp = await client.get("https://weibo.com/", headers=WEIBO_HEADERS, cookies=cookies)
# 302 重定向到登录页 = Cookie 失效或风控
if resp.status_code in (301, 302):
location = resp.headers.get("location", "")
if "login.sina.com.cn" in location or "passport" in location:
if attempt < max_retries - 1:
wait = (attempt + 1) * 3
logger.warning(f"被重定向到登录页(尝试 {attempt+1}/{max_retries}){wait}秒后重试...")
await asyncio.sleep(wait)
continue
else:
logger.warning(f"多次重试仍被重定向Cookie 可能已失效")
return []
# 其他 302如 weibo.com → www.weibo.com跟随一次
resp = await client.get(location, headers=WEIBO_HEADERS, cookies=cookies)
xsrf = ""
for c in resp.cookies.jar:
if c.name == "XSRF-TOKEN":
xsrf = c.value
break
headers = {**WEIBO_HEADERS, "X-Requested-With": "XMLHttpRequest"}
if xsrf:
headers["X-XSRF-TOKEN"] = xsrf
page = 1
while page <= 10:
resp = await client.get(
"https://weibo.com/ajax/profile/topicContent",
params={"tabid": "231093_-_chaohua", "page": str(page)},
headers=headers, cookies=cookies,
)
# 超话接口也可能被 302
if resp.status_code in (301, 302):
if attempt < max_retries - 1:
wait = (attempt + 1) * 3
logger.warning(f"超话接口被重定向(尝试 {attempt+1}/{max_retries}){wait}秒后重试...")
await asyncio.sleep(wait)
break # break inner loop, retry outer
else:
logger.warning("超话接口多次重定向,放弃")
return []
try:
data = resp.json()
except Exception:
logger.warning(f"超话列表响应非 JSON: {resp.text[:200]}")
break
if data.get("ok") != 1:
break
tlist = data.get("data", {}).get("list", [])
if not tlist:
break
for item in tlist:
title = item.get("topic_name", "") or item.get("title", "")
cid = ""
for field in ("oid", "scheme"):
m = re.search(r"100808[0-9a-fA-F]+", item.get(field, ""))
if m:
cid = m.group(0)
break
if title and cid:
topics.append({"title": title, "containerid": cid})
if page >= data.get("data", {}).get("max_page", 1):
break
page += 1
else:
# while 正常结束(没有 break说明成功
if topics:
return topics
# 如果有 topics 说明成功了
if topics:
return topics
except Exception as e:
logger.error(f"获取超话列表失败(尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(3)
return topics
async def _do_single_signin(client, cookies: dict, topic: dict, xsrf: str) -> dict:
"""签到单个超话(复用已有的 httpx client 和 xsrf token"""
try:
h = {
**WEIBO_HEADERS,
"Referer": f"https://weibo.com/p/{topic['containerid']}/super_index",
"X-Requested-With": "XMLHttpRequest",
}
if xsrf:
h["X-XSRF-TOKEN"] = xsrf
resp = await client.get(
"https://weibo.com/p/aj/general/button",
params={
"ajwvr": "6",
"api": "http://i.huati.weibo.com/aj/super/checkin",
"texta": "签到", "textb": "已签到",
"status": "0",
"id": topic["containerid"],
"location": "page_100808_super_index",
"timezone": "GMT+0800",
"lang": "zh-cn",
"plat": "Win32",
"ua": WEIBO_HEADERS["User-Agent"],
"screen": "1920*1080",
"__rnd": str(int(_time.time() * 1000)),
},
headers=h, cookies=cookies,
)
try:
data = resp.json()
except Exception:
return {"status": "failed", "message": f"非JSON响应: {resp.text[:100]}"}
code = str(data.get("code", ""))
msg = data.get("msg", "")
if code == "100000":
tip = ""
if isinstance(data.get("data"), dict):
tip = data["data"].get("alert_title", "") or data["data"].get("tipMessage", "")
return {"status": "success", "message": tip or "签到成功"}
elif code == "382004":
return {"status": "already_signed", "message": msg or f"今天已签到({code})"}
else:
return {"status": "failed", "message": f"code={code}, msg={msg}"}
except Exception as e:
return {"status": "failed", "message": str(e)}
# =============== Webhook 每日报告 ===============
def send_daily_report():
"""每日签到结果 + 账号状态汇总,推送到 Webhook。"""
if not WEBHOOK_URL:
logger.info("⚠️ WEBHOOK_URL 未配置,跳过每日报告推送")
return
logger.info("📊 开始生成每日报告...")
try:
report = _run_async(_build_daily_report())
_send_webhook(report)
logger.info("✅ 每日报告推送成功")
except Exception as e:
logger.error(f"❌ 每日报告推送失败: {e}")
async def _build_daily_report() -> str:
"""从 DB 汇总今日签到数据和账号状态。"""
from sqlalchemy import select, func as sa_func
from shared.models.account import Account
from shared.models.signin_log import SigninLog
from shared.models.user import User
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
now = datetime.now()
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
# 1. 账号总览
acc_status = await session.execute(
select(Account.status, sa_func.count())
.group_by(Account.status)
)
status_map = {row[0]: row[1] for row in acc_status.all()}
total_accounts = sum(status_map.values())
active = status_map.get("active", 0)
pending = status_map.get("pending", 0)
invalid = status_map.get("invalid_cookie", 0)
# 2. 今日签到统计
log_stats = await session.execute(
select(SigninLog.status, sa_func.count())
.where(SigninLog.signed_at >= today_start)
.group_by(SigninLog.status)
)
log_map = {row[0]: row[1] for row in log_stats.all()}
success = log_map.get("success", 0)
already = log_map.get("failed_already_signed", 0)
failed_net = log_map.get("failed_network", 0)
total_logs = sum(log_map.values())
# 3. 今日各账号签到明细(含名次)
detail_rows = await session.execute(
select(
Account.remark,
Account.weibo_user_id,
SigninLog.status,
sa_func.count(),
)
.join(Account, SigninLog.account_id == Account.id)
.where(SigninLog.signed_at >= today_start)
.group_by(Account.id, Account.remark, Account.weibo_user_id, SigninLog.status)
.order_by(Account.remark)
)
# 按账号聚合
account_details = {}
for remark, uid, st, cnt in detail_rows.all():
name = remark or uid
if name not in account_details:
account_details[name] = {"success": 0, "already": 0, "failed": 0}
if st == "success":
account_details[name]["success"] += cnt
elif st == "failed_already_signed":
account_details[name]["already"] += cnt
else:
account_details[name]["failed"] += cnt
# 5. 今日签到名次明细(从 reward_info 提取)
rank_rows = await session.execute(
select(
Account.remark, Account.weibo_user_id,
SigninLog.topic_title, SigninLog.reward_info,
)
.join(Account, SigninLog.account_id == Account.id)
.where(SigninLog.signed_at >= today_start)
.where(SigninLog.status == "success")
.order_by(Account.remark, SigninLog.signed_at)
)
rank_details = []
for remark, uid, topic, reward in rank_rows.all():
name = remark or uid
msg = ""
if isinstance(reward, dict):
msg = reward.get("message", "")
elif isinstance(reward, str):
msg = reward
if msg:
rank_details.append({"name": name, "topic": topic, "message": msg})
# 4. Cookie 即将失效的账号(超过 3 天未检查)
stale_cutoff = now - timedelta(days=3)
stale_result = await session.execute(
select(Account.remark, Account.weibo_user_id, Account.last_checked_at)
.where(Account.status == "active")
.where(
(Account.last_checked_at < stale_cutoff) | (Account.last_checked_at == None)
)
)
stale_accounts = [
(row[0] or row[1], row[2].strftime("%m-%d %H:%M") if row[2] else "从未")
for row in stale_result.all()
]
finally:
await eng.dispose()
# 组装报告
lines = [
"📊 微博超话签到日报",
f"{now.strftime('%Y-%m-%d %H:%M')}",
"━━━━━━━━━━━━━━━━━━",
"",
"📱 账号状态",
f" 总计: {total_accounts}",
f" ✅ 正常: {active} ⏳ 待验证: {pending} ❌ 失效: {invalid}",
]
if invalid > 0:
lines.append(f" ⚠️ 有 {invalid} 个账号 Cookie 已失效,请及时更新")
lines += [
"",
"🎯 今日签到",
f" 总计: {total_logs} 条记录",
f" ✅ 成功: {success} 📌 已签: {already} ❌ 失败: {failed_net}",
]
if account_details:
lines += ["", "📋 账号明细"]
for name, d in account_details.items():
lines.append(f" {name}: ✅{d['success']} 📌{d['already']}{d['failed']}")
if rank_details:
lines += ["", "🏆 签到名次"]
for r in rank_details:
lines.append(f" {r['name']} - {r['topic']}: {r['message']}")
if stale_accounts:
lines += ["", "⚠️ 需要关注"]
for name, last in stale_accounts:
lines.append(f" {name} (上次检查: {last})")
if total_logs == 0:
lines += ["", "💤 今日暂无签到记录"]
return "\n".join(lines)
def _send_webhook(content: str):
"""发送消息到 Webhook自动适配企业微信/钉钉/飞书格式)。"""
import httpx
url = WEBHOOK_URL
if not url:
return
# 根据 URL 特征判断平台
if "qyapi.weixin.qq.com" in url:
# 企业微信
payload = {"msgtype": "markdown", "markdown": {"content": content}}
elif "oapi.dingtalk.com" in url:
# 钉钉
payload = {"msgtype": "markdown", "markdown": {"title": "签到日报", "text": content}}
elif "open.feishu.cn" in url:
# 飞书自定义机器人 - 使用富文本消息
# 将 Markdown 转为纯文本(飞书 text 类型不支持 Markdown
plain = content.replace("## ", "").replace("### ", "\n").replace("**", "").replace("> ", "")
payload = {"msg_type": "text", "content": {"text": plain}}
else:
# 通用 JSON兼容自定义 Webhook
payload = {"text": content, "markdown": content}
resp = httpx.post(url, json=payload, timeout=15)
if resp.status_code != 200:
logger.warning(f"Webhook 响应异常: status={resp.status_code}, body={resp.text[:200]}")
else:
resp_data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
if resp_data.get("code", 0) != 0 and resp_data.get("StatusCode", 0) != 0:
logger.warning(f"Webhook 业务异常: {resp.text[:300]}")
# =============== 日志清理 ===============
def cleanup_old_signin_logs():
"""清理超过 N 天的签到日志,分批删除避免锁表。"""
logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...")
try:
result = _run_async(_async_cleanup())
logger.info(f"✅ 日志清理完成: {result}")
except Exception as e:
logger.error(f"❌ 日志清理失败: {e}")
async def _async_cleanup():
from sqlalchemy import select, delete
from shared.models.signin_log import SigninLog
cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS)
total_deleted = 0
SessionFactory, eng = _make_session()
try:
async with SessionFactory() as session:
while True:
stmt = (
select(SigninLog.id)
.where(SigninLog.signed_at < cutoff)
.limit(CLEANUP_BATCH_SIZE)
)
result = await session.execute(stmt)
ids = [row[0] for row in result.all()]
if not ids:
break
del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids))
await session.execute(del_stmt)
await session.commit()
total_deleted += len(ids)
logger.info(f"🧹 已删除 {total_deleted} 条...")
if len(ids) < CLEANUP_BATCH_SIZE:
break
finally:
await eng.dispose()
return {"deleted": total_deleted, "cutoff": cutoff.isoformat()}
# =============== Redis 订阅(实时同步任务变更) ===============
def _start_redis_listener():
"""启动 Redis pub/sub 监听线程,收到 task_updates 消息后立即同步。"""
global _redis_listener_running
if not shared_settings.USE_REDIS or not shared_settings.REDIS_URL:
logger.info("⚠️ Redis 未启用,任务变更仅靠 5 分钟轮询同步")
return
import threading
import redis
def _listen():
global _redis_listener_running
_redis_listener_running = True
logger.info("📡 Redis 订阅线程启动,监听 task_updates / config_updates 频道")
while _redis_listener_running:
try:
r = redis.from_url(shared_settings.REDIS_URL, decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe("task_updates", "config_updates")
for message in pubsub.listen():
if not _redis_listener_running:
break
if message["type"] == "message":
channel = message.get("channel", "")
if channel == "config_updates":
logger.info("📡 收到配置变更通知,重新加载...")
load_config_from_db()
else:
logger.info(f"📡 收到任务变更通知: {message['data'][:200]}")
sync_db_tasks()
pubsub.close()
r.close()
except Exception as e:
logger.warning(f"Redis 订阅异常5 秒后重连: {e}")
_time.sleep(5)
t = threading.Thread(target=_listen, daemon=True, name="redis-listener")
t.start()
# =============== 启动入口 ===============
def _shutdown(signum, frame):
global _redis_listener_running
logger.info(f"收到信号 {signum},正在关闭调度器...")
_redis_listener_running = False
scheduler.shutdown(wait=False)
sys.exit(0)
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("🚀 Weibo-HotSign 定时调度器启动 (APScheduler)")
logger.info("=" * 60)
# 注册信号处理
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
# 启动调度器
scheduler.start()
# 从 DB 加载配置Webhook 地址、推送时间等)
load_config_from_db()
# 首次同步 DB 任务
sync_db_tasks()
# 启动 Redis 订阅,实时接收任务变更通知
_start_redis_listener()
# 每 50 分钟重新同步 DB兜底正常靠 Redis 实时通知)
scheduler.add_job(
sync_db_tasks,
trigger=IntervalTrigger(minutes=50),
id="sync_db_tasks",
replace_existing=True,
misfire_grace_time=60,
)
# 每天凌晨 3 点清理过期签到日志
scheduler.add_job(
cleanup_old_signin_logs,
trigger=CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"),
id="cleanup_old_signin_logs",
replace_existing=True,
misfire_grace_time=3600,
)
# 每天定时推送签到日报到 Webhook如果 load_config_from_db 已调度则跳过)
if WEBHOOK_URL and not scheduler.get_job("daily_report"):
scheduler.add_job(
send_daily_report,
trigger=CronTrigger(hour=DAILY_REPORT_HOUR, minute=DAILY_REPORT_MINUTE, timezone="Asia/Shanghai"),
id="daily_report",
replace_existing=True,
misfire_grace_time=3600,
)
if WEBHOOK_URL:
logger.info(f"📊 每日报告: 每天 {DAILY_REPORT_HOUR:02d}:{DAILY_REPORT_MINUTE:02d} 推送到 Webhook")
else:
logger.info("⚠️ Webhook 未配置,每日报告推送已禁用(可在管理面板设置)")
logger.info("📋 调度器已启动,等待任务触发...")
logger.info(f"📋 日志清理: 每天 03:00, 保留 {SIGNIN_LOG_RETAIN_DAYS}")
# 打印当前所有已注册的 job
jobs = scheduler.get_jobs()
for job in jobs:
logger.info(f" 📌 {job.id}: next_run={job.next_run_time}")
# 主线程保持运行
try:
while True:
_time.sleep(1)
except (KeyboardInterrupt, SystemExit):
logger.info("调度器关闭中...")
scheduler.shutdown(wait=False)