diff --git a/backend/task_scheduler/app/celery_app.py b/backend/task_scheduler/app/celery_app.py index d0e8971..47f0cab 100644 --- a/backend/task_scheduler/app/celery_app.py +++ b/backend/task_scheduler/app/celery_app.py @@ -13,6 +13,7 @@ import sys import logging from celery import Celery +from celery.schedules import crontab # 确保 shared 模块可导入 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) @@ -42,6 +43,10 @@ celery_app.conf.update( "task": "task_scheduler.app.tasks.signin_tasks.check_and_run_due_tasks", "schedule": 60.0, # 每分钟检查一次(轻量查询,只在到期前 5 分钟才真正提交) }, + "cleanup-old-logs": { + "task": "task_scheduler.app.tasks.signin_tasks.cleanup_old_signin_logs", + "schedule": crontab(hour=3, minute=0), # 每天凌晨 3 点 + }, }, ) diff --git a/backend/task_scheduler/app/tasks/signin_tasks.py b/backend/task_scheduler/app/tasks/signin_tasks.py index 32de6bf..4a4618d 100644 --- a/backend/task_scheduler/app/tasks/signin_tasks.py +++ b/backend/task_scheduler/app/tasks/signin_tasks.py @@ -396,3 +396,59 @@ async def _do_single_signin(cookies: Dict[str, str], topic: dict) -> dict: return {"status": "failed", "message": f"code={code}, msg={msg}"} except Exception as e: return {"status": "failed", "message": str(e)} + + +# =============== 日志清理 =============== + +# 保留天数,可通过环境变量覆盖 +SIGNIN_LOG_RETAIN_DAYS = int(os.getenv("SIGNIN_LOG_RETAIN_DAYS", "30")) +# 每批删除条数,避免长时间锁表 +CLEANUP_BATCH_SIZE = 1000 + + +@celery_app.task +def cleanup_old_signin_logs(): + """ + 清理超过 N 天的签到日志。 + 分批删除,每批最多 CLEANUP_BATCH_SIZE 条,避免锁表。 + """ + logger.info(f"🧹 开始清理 {SIGNIN_LOG_RETAIN_DAYS} 天前的签到日志...") + return _run_async(_async_cleanup()) + + +async def _async_cleanup(): + from sqlalchemy import delete, func + + cutoff = datetime.now() - timedelta(days=SIGNIN_LOG_RETAIN_DAYS) + total_deleted = 0 + + SessionFactory, eng = _make_session() + try: + async with SessionFactory() as session: + while True: + # 分批删除:先查出一批 id,再按 id 删除 + stmt = ( + select(SigninLog.id) + .where(SigninLog.signed_at < cutoff) + .limit(CLEANUP_BATCH_SIZE) + ) + result = await session.execute(stmt) + ids = [row[0] for row in result.all()] + + if not ids: + break + + del_stmt = delete(SigninLog).where(SigninLog.id.in_(ids)) + await session.execute(del_stmt) + await session.commit() + total_deleted += len(ids) + logger.info(f"🧹 已删除 {total_deleted} 条...") + + # 如果本批不满,说明已经删完 + if len(ids) < CLEANUP_BATCH_SIZE: + break + finally: + await eng.dispose() + + logger.info(f"✅ 日志清理完成,共删除 {total_deleted} 条(截止 {cutoff.strftime('%Y-%m-%d')})") + return {"deleted": total_deleted, "cutoff": cutoff.isoformat()}