Files
damai/scripts/multi_account_manager.py

86 lines
2.3 KiB
Python
Raw Permalink Normal View History

# coding: utf-8
"""
多账户管理模块
支持并发抢票账户轮换
"""
import logging
import threading
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional
logger = logging.getLogger("damai.accounts")
@dataclass
class AccountInfo:
username: str
password: str
target_url: str
auto_buy_time: str = ""
viewer_person: List[int] = None
def __post_init__(self):
if self.viewer_person is None:
self.viewer_person = [1]
def load_accounts_from_config(config: dict) -> Dict[str, AccountInfo]:
"""从配置字典加载账户列表"""
accounts = {}
raw = config.get("accounts", [])
if isinstance(raw, dict):
items = raw.items()
elif isinstance(raw, list):
items = enumerate(raw)
else:
return accounts
for key, info in items:
acc_id = str(key)
accounts[acc_id] = AccountInfo(
username=info.get("username", ""),
password=info.get("password", ""),
target_url=info.get("target_url", ""),
auto_buy_time=info.get("auto_buy_time", ""),
viewer_person=info.get("viewer_person", [1]),
)
logger.info(f"加载了 {len(accounts)} 个账户")
return accounts
def run_parallel(
accounts: Dict[str, AccountInfo],
task_fn: Callable[[str, AccountInfo], None],
max_workers: int = 3,
) -> List[threading.Thread]:
"""
并行执行多个账户的抢票任务
task_fn: 接受 (account_id, account_info) 的回调函数
max_workers: 最大并发数
"""
threads: List[threading.Thread] = []
for acc_id, acc_info in accounts.items():
if len(threads) >= max_workers:
logger.warning(f"达到最大并发数 {max_workers},跳过账户 {acc_id}")
continue
def run_task(aid=acc_id, ainfo=acc_info):
try:
logger.info(f"账户 {aid} 任务启动")
task_fn(aid, ainfo)
logger.info(f"账户 {aid} 任务完成")
except Exception as e:
logger.error(f"账户 {aid} 任务异常: {e}")
t = threading.Thread(target=run_task, daemon=True, name=f"account-{acc_id}")
t.start()
threads.append(t)
logger.info(f"已启动线程: account-{acc_id}")
return threads