# coding: utf-8 """ 多账户管理模块 支持:并发抢票、账户轮换 """ import logging import threading from dataclasses import dataclass from typing import Callable, Dict, List, Optional logger = logging.getLogger("damai.accounts") @dataclass class AccountInfo: username: str password: str target_url: str auto_buy_time: str = "" viewer_person: List[int] = None def __post_init__(self): if self.viewer_person is None: self.viewer_person = [1] def load_accounts_from_config(config: dict) -> Dict[str, AccountInfo]: """从配置字典加载账户列表""" accounts = {} raw = config.get("accounts", []) if isinstance(raw, dict): items = raw.items() elif isinstance(raw, list): items = enumerate(raw) else: return accounts for key, info in items: acc_id = str(key) accounts[acc_id] = AccountInfo( username=info.get("username", ""), password=info.get("password", ""), target_url=info.get("target_url", ""), auto_buy_time=info.get("auto_buy_time", ""), viewer_person=info.get("viewer_person", [1]), ) logger.info(f"加载了 {len(accounts)} 个账户") return accounts def run_parallel( accounts: Dict[str, AccountInfo], task_fn: Callable[[str, AccountInfo], None], max_workers: int = 3, ) -> List[threading.Thread]: """ 并行执行多个账户的抢票任务 task_fn: 接受 (account_id, account_info) 的回调函数 max_workers: 最大并发数 """ threads: List[threading.Thread] = [] for acc_id, acc_info in accounts.items(): if len(threads) >= max_workers: logger.warning(f"达到最大并发数 {max_workers},跳过账户 {acc_id}") continue def run_task(aid=acc_id, ainfo=acc_info): try: logger.info(f"账户 {aid} 任务启动") task_fn(aid, ainfo) logger.info(f"账户 {aid} 任务完成") except Exception as e: logger.error(f"账户 {aid} 任务异常: {e}") t = threading.Thread(target=run_task, daemon=True, name=f"account-{acc_id}") t.start() threads.append(t) logger.info(f"已启动线程: account-{acc_id}") return threads