每次任务执行时用 _make_session() 创建独立的 engine + session,用完 eng.dispose() 释放,彻底避免 loop 冲突。

This commit is contained in:
2026-03-18 09:27:20 +08:00
parent 633e4249de
commit 9a3e846be9

View File

@@ -26,7 +26,7 @@ from sqlalchemy import select
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../..")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../.."))
from shared.models.base import AsyncSessionLocal from shared.models.base import Base
from shared.models.task import Task from shared.models.task import Task
from shared.models.account import Account from shared.models.account import Account
from shared.models.signin_log import SigninLog from shared.models.signin_log import SigninLog
@@ -38,6 +38,23 @@ from ..config import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _make_session():
"""
每次创建新的 engine + session避免 Celery worker fork 后
复用主进程 event loop 绑定的连接池导致 'Future attached to a different loop' 错误。
"""
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
engine_kwargs: dict = {"echo": False}
if "sqlite" not in shared_settings.DATABASE_URL:
engine_kwargs.update(pool_size=5, max_overflow=10, pool_pre_ping=True)
eng = create_async_engine(shared_settings.DATABASE_URL, **engine_kwargs)
factory = sessionmaker(eng, class_=AsyncSession, expire_on_commit=False)
return factory, eng
WEIBO_HEADERS = { WEIBO_HEADERS = {
"User-Agent": ( "User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
@@ -117,15 +134,19 @@ async def _async_check_due():
now = datetime.now() now = datetime.now()
submitted = 0 submitted = 0
async with AsyncSessionLocal() as session: SessionFactory, eng = _make_session()
stmt = ( try:
select(Task, Account) async with SessionFactory() as session:
.join(Account, Task.account_id == Account.id) stmt = (
.where(Task.is_enabled == True) select(Task, Account)
.where(Account.status.in_(["pending", "active"])) .join(Account, Task.account_id == Account.id)
) .where(Task.is_enabled == True)
result = await session.execute(stmt) .where(Account.status.in_(["pending", "active"]))
pairs = result.all() )
result = await session.execute(stmt)
pairs = result.all()
finally:
await eng.dispose()
for task, account in pairs: for task, account in pairs:
try: try:
@@ -212,73 +233,77 @@ async def _async_do_signin(account_id: str) -> Dict[str, Any]:
"""直接操作数据库和微博 API 完成签到""" """直接操作数据库和微博 API 完成签到"""
import httpx import httpx
async with AsyncSessionLocal() as session: SessionFactory, eng = _make_session()
stmt = select(Account).where(Account.id == account_id) try:
result = await session.execute(stmt) async with SessionFactory() as session:
account = result.scalar_one_or_none() stmt = select(Account).where(Account.id == account_id)
if not account: result = await session.execute(stmt)
raise Exception(f"Account {account_id} not found") account = result.scalar_one_or_none()
if not account:
raise Exception(f"Account {account_id} not found")
if account.status not in ("pending", "active"): if account.status not in ("pending", "active"):
return {"status": "skipped", "reason": f"status={account.status}"} return {"status": "skipped", "reason": f"status={account.status}"}
# 解密 cookie # 解密 cookie
key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY) key = derive_key(shared_settings.COOKIE_ENCRYPTION_KEY)
try: try:
cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key) cookie_str = decrypt_cookie(account.encrypted_cookies, account.iv, key)
except Exception: except Exception:
account.status = "invalid_cookie" account.status = "invalid_cookie"
await session.commit()
return {"status": "failed", "reason": "cookie decryption failed"}
cookies = _parse_cookies(cookie_str)
# 获取超话列表
topics = await _fetch_topics(cookies)
if not topics:
return {"status": "completed", "signed": 0, "message": "no topics"}
# 逐个签到
signed = already = failed = 0
for topic in topics:
await asyncio.sleep(1.5)
r = await _do_single_signin(cookies, topic)
if r["status"] == "success":
s = "success"
signed += 1
elif r["status"] == "already_signed":
s = "failed_already_signed"
already += 1
else:
s = "failed_network"
failed += 1
log = SigninLog(
account_id=account.id,
topic_title=topic["title"],
status=s,
reward_info={"message": r["message"]},
signed_at=datetime.now(),
)
session.add(log)
account.last_checked_at = datetime.now()
if account.status != "active":
account.status = "active"
await session.commit() await session.commit()
return {"status": "failed", "reason": "cookie decryption failed"}
cookies = _parse_cookies(cookie_str) logger.info(
f"签到结果: account={account_id}, "
# 获取超话列表 f"signed={signed}, already={already}, failed={failed}"
topics = await _fetch_topics(cookies)
if not topics:
return {"status": "completed", "signed": 0, "message": "no topics"}
# 逐个签到
signed = already = failed = 0
for topic in topics:
await asyncio.sleep(1.5)
r = await _do_single_signin(cookies, topic)
if r["status"] == "success":
s = "success"
signed += 1
elif r["status"] == "already_signed":
s = "failed_already_signed"
already += 1
else:
s = "failed_network"
failed += 1
log = SigninLog(
account_id=account.id,
topic_title=topic["title"],
status=s,
reward_info={"message": r["message"]},
signed_at=datetime.now(),
) )
session.add(log) return {
"status": "completed",
account.last_checked_at = datetime.now() "signed": signed,
if account.status != "active": "already_signed": already,
account.status = "active" "failed": failed,
await session.commit() "total_topics": len(topics),
}
logger.info( finally:
f"签到结果: account={account_id}, " await eng.dispose()
f"signed={signed}, already={already}, failed={failed}"
)
return {
"status": "completed",
"signed": signed,
"already_signed": already,
"failed": failed,
"total_topics": len(topics),
}
async def _fetch_topics(cookies: Dict[str, str]) -> List[dict]: async def _fetch_topics(cookies: Dict[str, str]) -> List[dict]: