""" 任务调度器 - 按账号隔离执行 同一账号的任务在同一个线程/浏览器中顺序执行,不同账号并行。 """ import asyncio import threading from datetime import datetime from server.database import get_db from server.services.snatcher import run_snatch # {task_id: thread} 跟踪运行中的任务 _running_tasks = {} # {account_id: thread} 跟踪每个账号的执行线程 _account_threads = {} _lock = threading.Lock() def start_task(task_id): """启动单个任务""" with _lock: if task_id in _running_tasks and _running_tasks[task_id].is_alive(): return False, "任务已在运行中" def _run(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(run_snatch(task_id)) finally: loop.close() with _lock: _running_tasks.pop(task_id, None) t = threading.Thread(target=_run, daemon=True) t.start() with _lock: _running_tasks[task_id] = t return True, "任务已启动" def start_account_tasks(account_id): """ 启动指定账号的所有 pending 任务。 同一账号的任务在同一线程中顺序执行(共享浏览器上下文)。 """ with _lock: if account_id in _account_threads and _account_threads[account_id].is_alive(): return False, "该账号已有任务在执行中" db = get_db() tasks = db.execute( "SELECT id FROM tasks WHERE account_id = ? AND status = 'pending' ORDER BY snatch_time ASC", (account_id,) ).fetchall() db.close() if not tasks: return False, "该账号没有待执行的任务" task_ids = [row['id'] for row in tasks] def _run(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: for tid in task_ids: # 检查任务是否被取消 db2 = get_db() t = db2.execute('SELECT status FROM tasks WHERE id = ?', (tid,)).fetchone() db2.close() if not t or t['status'] == 'cancelled': continue with _lock: _running_tasks[tid] = threading.current_thread() try: loop.run_until_complete(run_snatch(tid)) finally: with _lock: _running_tasks.pop(tid, None) finally: loop.close() with _lock: _account_threads.pop(account_id, None) t = threading.Thread(target=_run, daemon=True) t.start() with _lock: _account_threads[account_id] = t for tid in task_ids: _running_tasks[tid] = t return True, f"已启动 {len(task_ids)} 个任务" def stop_task(task_id): """停止任务(标记状态)""" db = get_db() db.execute( "UPDATE tasks SET status = 'cancelled', updated_at = ? WHERE id = ?", (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), task_id) ) db.commit() db.close() with _lock: _running_tasks.pop(task_id, None) return True, "任务已取消" def get_running_task_ids(): with _lock: return [tid for tid, t in _running_tasks.items() if t.is_alive()]