Files
weidian/server/services/scheduler.py
Jeason 822a4636c0 feat: Web管理系统 + Docker支持
- 多账号管理(异步登录、状态轮询)
- 购物车预售商品同步(倒计时/定时开售)
- 定时抢购(自动刷新、SKU选择、重试机制)
- 账号隔离调度(同账号顺序、跨账号并行)
- Web面板(任务分组、实时倒计时、批量操作)
- Dockerfile + docker-compose
2026-03-18 13:38:17 +08:00

111 lines
3.3 KiB
Python

"""
任务调度器 - 按账号隔离执行
同一账号的任务在同一个线程/浏览器中顺序执行,不同账号并行。
"""
import asyncio
import threading
from datetime import datetime
from server.database import get_db
from server.services.snatcher import run_snatch
# {task_id: thread} 跟踪运行中的任务
_running_tasks = {}
# {account_id: thread} 跟踪每个账号的执行线程
_account_threads = {}
_lock = threading.Lock()
def start_task(task_id):
"""启动单个任务"""
with _lock:
if task_id in _running_tasks and _running_tasks[task_id].is_alive():
return False, "任务已在运行中"
def _run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(run_snatch(task_id))
finally:
loop.close()
with _lock:
_running_tasks.pop(task_id, None)
t = threading.Thread(target=_run, daemon=True)
t.start()
with _lock:
_running_tasks[task_id] = t
return True, "任务已启动"
def start_account_tasks(account_id):
"""
启动指定账号的所有 pending 任务。
同一账号的任务在同一线程中顺序执行(共享浏览器上下文)。
"""
with _lock:
if account_id in _account_threads and _account_threads[account_id].is_alive():
return False, "该账号已有任务在执行中"
db = get_db()
tasks = db.execute(
"SELECT id FROM tasks WHERE account_id = ? AND status = 'pending' ORDER BY snatch_time ASC",
(account_id,)
).fetchall()
db.close()
if not tasks:
return False, "该账号没有待执行的任务"
task_ids = [row['id'] for row in tasks]
def _run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
for tid in task_ids:
# 检查任务是否被取消
db2 = get_db()
t = db2.execute('SELECT status FROM tasks WHERE id = ?', (tid,)).fetchone()
db2.close()
if not t or t['status'] == 'cancelled':
continue
with _lock:
_running_tasks[tid] = threading.current_thread()
try:
loop.run_until_complete(run_snatch(tid))
finally:
with _lock:
_running_tasks.pop(tid, None)
finally:
loop.close()
with _lock:
_account_threads.pop(account_id, None)
t = threading.Thread(target=_run, daemon=True)
t.start()
with _lock:
_account_threads[account_id] = t
for tid in task_ids:
_running_tasks[tid] = t
return True, f"已启动 {len(task_ids)} 个任务"
def stop_task(task_id):
"""停止任务(标记状态)"""
db = get_db()
db.execute(
"UPDATE tasks SET status = 'cancelled', updated_at = ? WHERE id = ?",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), task_id)
)
db.commit()
db.close()
with _lock:
_running_tasks.pop(task_id, None)
return True, "任务已取消"
def get_running_task_ids():
with _lock:
return [tid for tid, t in _running_tasks.items() if t.is_alive()]