From 9a3e846be99f6ab2f82cf068d308d44066701612 Mon Sep 17 00:00:00 2001 From: Jeason <1710884619@qq.com> Date: Wed, 18 Mar 2026 09:27:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=AF=8F=E6=AC=A1=E4=BB=BB=E5=8A=A1=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E6=97=B6=E7=94=A8=20=5Fmake=5Fsession()=20=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E7=8B=AC=E7=AB=8B=E7=9A=84=20engine=20+=20session?= =?UTF-8?q?=EF=BC=8C=E7=94=A8=E5=AE=8C=20eng.dispose()=20=E9=87=8A?= =?UTF-8?q?=E6=94=BE=EF=BC=8C=E5=BD=BB=E5=BA=95=E9=81=BF=E5=85=8D=20loop?= =?UTF-8?q?=20=E5=86=B2=E7=AA=81=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task_scheduler/app/tasks/signin_tasks.py | 169 ++++++++++-------- 1 file changed, 97 insertions(+), 72 deletions(-) diff --git a/backend/task_scheduler/app/tasks/signin_tasks.py b/backend/task_scheduler/app/tasks/signin_tasks.py index 22d0727..32de6bf 100644 --- a/backend/task_scheduler/app/tasks/signin_tasks.py +++ b/backend/task_scheduler/app/tasks/signin_tasks.py @@ -26,7 +26,7 @@ from sqlalchemy import select sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) -from shared.models.base import AsyncSessionLocal +from shared.models.base import Base from shared.models.task import Task from shared.models.account import Account from shared.models.signin_log import SigninLog @@ -38,6 +38,23 @@ from ..config import settings logger = logging.getLogger(__name__) + +def _make_session(): + """ + 每次创建新的 engine + session,避免 Celery worker fork 后 + 复用主进程 event loop 绑定的连接池导致 'Future attached to a different loop' 错误。 + """ + from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine + from sqlalchemy.orm import sessionmaker + + engine_kwargs: dict = {"echo": False} + if "sqlite" not in shared_settings.DATABASE_URL: + engine_kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True) + + eng = create_async_engine(shared_settings.DATABASE_URL, **engine_kwargs) + factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False) + return factory, eng + WEIBO_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " @@ -117,15 +134,19 @@ async def _async_check_due(): now = datetime.now() submitted = 0 - async with AsyncSessionLocal() as session: - stmt = ( - select(Task, Account) - .join(Account, Task.account_id == Account.id) - .where(Task.is_enabled == True) - .where(Account.status.in_(["pending", "active"])) - ) - result = await session.execute(stmt) - pairs = result.all() + SessionFactory, eng = _make_session() + try: + async with SessionFactory() as session: + stmt = ( + select(Task, Account) + .join(Account, Task.account_id == Account.id) + .where(Task.is_enabled == True) + .where(Account.status.in_(["pending", "active"])) + ) + result = await session.execute(stmt) + pairs = result.all() + finally: + await eng.dispose() for task, account in pairs: try: @@ -212,73 +233,77 @@ async def _async_do_signin(account_id: str) -> Dict[str, Any]: """直接操作数据库和微博 API 完成签到""" import httpx - async with AsyncSessionLocal() as session: - stmt = select(Account).where(Account.id == account_id) - result = await session.execute(stmt) - account = result.scalar_one_or_none() - if not account: - raise Exception(f"Account {account_id} not found") + SessionFactory, eng = _make_session() + try: + async with SessionFactory() as session: + stmt = select(Account).where(Account.id == account_id) + result = await session.execute(stmt) + account = result.scalar_one_or_none() + if not account: + raise Exception(f"Account {account_id} not found") - if account.status not in ("pending", "active"): - return {"status": "skipped", "reason": f"status={account.status}"} + if account.status not in ("pending", "active"): + return {"status": "skipped", "reason": f"status={account.status}"} - # 解密 cookie - key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY) - try: - cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) - except Exception: - account.status = "invalid_cookie" + # 解密 cookie + key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY) + try: + cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) + except Exception: + account.status = "invalid_cookie" + await session.commit() + return {"status": "failed", "reason": "cookie decryption failed"} + + cookies = _parse_cookies(cookie_str) + + # 获取超话列表 + topics = await _fetch_topics(cookies) + if not topics: + return {"status": "completed", "signed": 0, "message": "no topics"} + + # 逐个签到 + signed = already = failed = 0 + for topic in topics: + await asyncio.sleep(1.5) + r = await _do_single_signin(cookies, topic) + + if r["status"] == "success": + s = "success" + signed += 1 + elif r["status"] == "already_signed": + s = "failed_already_signed" + already += 1 + else: + s = "failed_network" + failed += 1 + + log = SigninLog( + account_id=account.id, + topic_title=topic["title"], + status=s, + reward_info={"message": r["message"]}, + signed_at=datetime.now(), + ) + session.add(log) + + account.last_checked_at = datetime.now() + if account.status != "active": + account.status = "active" await session.commit() - return {"status": "failed", "reason": "cookie decryption failed"} - cookies = _parse_cookies(cookie_str) - - # 获取超话列表 - topics = await _fetch_topics(cookies) - if not topics: - return {"status": "completed", "signed": 0, "message": "no topics"} - - # 逐个签到 - signed = already = failed = 0 - for topic in topics: - await asyncio.sleep(1.5) - r = await _do_single_signin(cookies, topic) - - if r["status"] == "success": - s = "success" - signed += 1 - elif r["status"] == "already_signed": - s = "failed_already_signed" - already += 1 - else: - s = "failed_network" - failed += 1 - - log = SigninLog( - account_id=account.id, - topic_title=topic["title"], - status=s, - reward_info={"message": r["message"]}, - signed_at=datetime.now(), + logger.info( + f"签到结果: account={account_id}, " + f"signed={signed}, already={already}, failed={failed}" ) - session.add(log) - - account.last_checked_at = datetime.now() - if account.status != "active": - account.status = "active" - await session.commit() - - logger.info( - f"签到结果: account={account_id}, " - f"signed={signed}, already={already}, failed={failed}" - ) - return { - "status": "completed", - "signed": signed, - "already_signed": already, - "failed": failed, - "total_topics": len(topics), - } + return { + "status": "completed", + "signed": signed, + "already_signed": already, + "failed": failed, + "total_topics": len(topics), + } + finally: + await eng.dispose() async def _fetch_topics(cookies: Dict[str, str]) -> List[dict]: