commit b711b98dc8857218e0db4bd9f15613a57073c618 Author: zhaojie Date: Wed Apr 1 15:24:34 2026 +0800 v1.0: 重写核心代码,清理空壳脚本 - 重写 ticket_script.py: 状态机、完整的选票/下单流程、反检测 - 重写 scripts/: 验证码处理、定时调度、NTP校时、多账户管理 - 删除空壳 appium_simulator.py - 清理配置文件中的硬编码密码 - 重写 README,去除虚假宣传 diff --git a/README.md b/README.md new file mode 100644 index 0000000..65a0148 --- /dev/null +++ b/README.md @@ -0,0 +1,71 @@ +# 大麦网抢票工具 + +基于 Selenium 的大麦网自动抢票脚本,支持定时开抢、多账户并发、验证码处理。 + +## 功能 + +- **自动抢票**:自动选日期→场次→票价→提交订单 +- **定时开抢**:NTP 时间校准,精确到毫秒级触发 +- **多账户**:支持多账号并发抢票 +- **验证码**:自动检测和处理滑块验证码 +- **Cookie 持久化**:扫码登录后自动保存/加载 Cookie +- **反检测**:隐藏 webdriver 痕迹,移动端模拟 + +## 快速开始 + +```bash +# 1. 安装依赖 +pip install selenium ntplib Pillow + +# 2. 下载 chromedriver(匹配你的 Chrome 版本) +# https://googlechromelabs.github.io/chrome-for-testing/ + +# 3. 编辑配置 +cp config/config.json config/my_config.json +# 修改 target_url、date、sess、price 等参数 + +# 4. 运行(首次需要扫码登录) +python ticket_script.py config/my_config.json +``` + +## 配置说明 + +```json +{ + "date": [1, 2], // 日期优先级序号 (1-based) + "sess": [1, 2], // 场次优先级序号 (1-based) + "price": [1, 2, 3], // 票价优先级序号 (1-based) + "ticket_num": 2, // 购买张数 + "viewer_person": [1, 2], // 观影人序号 (1-based) + "nick_name": "", // 用户昵称 + "driver_path": "/usr/local/bin/chromedriver", + "damai_url": "https://www.damai.cn/", + "target_url": "https://m.damai.cn/damai/detail/item.html?itemId=xxx", + "max_retries": 180, // 最大重试次数 + "retry_delay": 0.3 // 重试间隔(秒) +} +``` + +## 项目结构 + +``` +├── ticket_script.py # 核心抢票逻辑 +├── scripts/ +│ ├── main.py # 主入口(单/多账户) +│ ├── selenium_driver.py # 浏览器驱动工厂 +│ ├── captcha_solver.py # 验证码检测与处理 +│ ├── scheduler.py # 定时调度与 NTP 校时 +│ ├── multi_account_manager.py # 多账户管理 +│ └── mock_dependency_manager.py # GUI 依赖模拟 +├── GUI.py # Tkinter 配置界面 +└── config/ + ├── config.json # 默认配置 + └── demo_config.json # 示例配置 +``` + +## 说明 + +- 首次运行会打开浏览器,需要手动扫码登录 +- 登录成功后 Cookie 会保存到 `cookies.pkl`,下次自动加载 +- 支持通过 GUI 界面编辑配置(`python GUI.py`) +- 仅限个人学习研究使用 diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..5ccd68a --- /dev/null +++ b/config/config.json @@ -0,0 +1,13 @@ +{ + "date": [1, 2, 3], + "sess": [1, 2], + "price": [1, 2, 3], + "ticket_num": 2, + "viewer_person": [1, 2], + "nick_name": "", + "driver_path": "/usr/local/bin/chromedriver", + "damai_url": "https://www.damai.cn/", + "target_url": "https://m.damai.cn/damai/detail/item.html?itemId=714001339730", + "max_retries": 180, + "retry_delay": 0.3 +} diff --git a/config/demo_config.json b/config/demo_config.json new file mode 100644 index 0000000..5127c18 --- /dev/null +++ b/config/demo_config.json @@ -0,0 +1,32 @@ +{ + "version": "1.0", + "global": { + "log_level": "INFO", + "timezone": "Asia/Shanghai" + }, + "accounts": { + "acc_primary": { + "platform": "taopiaopiao", + "credentials": { + "mobile": "138xxxxxxxx", + "password": "" + }, + "target": { + "event_url": "https://example.com/detail/987654321", + "priorities": { + "date": [1, 2], + "session": [1], + "price_range": "lowest_to_highest" + }, + "tickets": 2, + "viewers": [0, 1] + } + } + }, + "strategy": { + "auto_strike": true, + "strike_time": "2026-01-25T12:00:00", + "preheat_stages": [5.0, 2.0, 0.5], + "max_retries": 180 + } +} diff --git a/config/platform_config.json b/config/platform_config.json new file mode 100644 index 0000000..632649b --- /dev/null +++ b/config/platform_config.json @@ -0,0 +1,20 @@ +{ + "platforms": { + "damai": { + "platform_name": "大麦网", + "login_url": "https://www.damai.cn/login", + "login_method": "scan", + "mobile_url": "https://m.damai.cn/" + }, + "taopiaopiao": { + "platform_name": "淘票票", + "login_url": "https://m.taopiaopiao.com/", + "login_method": "sms" + }, + "showstart": { + "platform_name": "秀动", + "login_url": "https://www.showstart.com/", + "login_method": "scan" + } + } +} diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..4ccc5b6 --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1,11 @@ +""" +damai.scripts - 抢票工具模块 + +模块说明: +- ticket_script 核心抢票逻辑 (主入口) +- selenium_driver 浏览器驱动工厂 +- captcha_solver 验证码检测与处理 +- scheduler 定时调度与 NTP 校时 +- multi_account_manager 多账户管理 +- mock_dependency_manager GUI 依赖模拟 (仅用于界面演示) +""" diff --git a/scripts/captcha_solver.py b/scripts/captcha_solver.py new file mode 100644 index 0000000..04baeb3 --- /dev/null +++ b/scripts/captcha_solver.py @@ -0,0 +1,160 @@ +# coding: utf-8 +""" +验证码处理模块 +支持:图像验证码、滑块验证码 +依赖:Pillow (PIL) +""" + +import io +import logging +from dataclasses import dataclass +from enum import Enum, auto +from typing import Optional, Tuple + +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webelement import WebElement + +logger = logging.getLogger("damai.captcha") + + +class CaptchaType(Enum): + NONE = auto() + IMAGE = auto() # 图形验证码 + SLIDER = auto() # 滑块验证码 + CLICK = auto() # 点选验证码 + UNKNOWN = auto() + + +@dataclass +class CaptchaResult: + success: bool + captcha_type: CaptchaType + message: str = "" + + +def detect_captcha(driver) -> CaptchaType: + """检测当前页面是否存在验证码""" + # 检测滑块 + slider_selectors = [ + ".captcha-slider", + ".slide-verify", + "[class*='slider']", + ".nc-lang-cn", # 阿里云盾滑块 + ] + for sel in slider_selectors: + try: + driver.find_element(By.CSS_SELECTOR, sel) + logger.info(f"检测到滑块验证码: {sel}") + return CaptchaType.SLIDER + except Exception: + continue + + # 检测图形验证码 + image_selectors = [ + ".captcha-img", + "img[src*='captcha']", + "img[src*='verify']", + "#captcha_img", + ] + for sel in image_selectors: + try: + driver.find_element(By.CSS_SELECTOR, sel) + logger.info(f"检测到图形验证码: {sel}") + return CaptchaType.IMAGE + except Exception: + continue + + # 检测弹窗中是否有验证码关键词 + try: + page_source = driver.page_source.lower() + if "captcha" in page_source or "验证码" in page_source: + return CaptchaType.UNKNOWN + except Exception: + pass + + return CaptchaType.NONE + + +def solve_slider(driver, slider_element: Optional[WebElement] = None) -> CaptchaResult: + """ + 处理滑块验证码 + 简单实现:拖动滑块从左到右 + 更精确的方案需要图像处理来识别缺口位置 + """ + from selenium.webdriver.common.action_chains import ActionChains + + selectors = [ + ".captcha-slider .slide-btn", + ".slide-verify-slider-mask-item", + ".nc-lang-cn .btn_slide", + "[class*='slider'] [class*='btn']", + ] + + slider = slider_element + if not slider: + for sel in selectors: + try: + slider = driver.find_element(By.CSS_SELECTOR, sel) + break + except Exception: + continue + + if not slider: + return CaptchaResult(False, CaptchaType.SLIDER, "找不到滑块元素") + + try: + # 获取滑块轨道宽度 + track = slider.find_element(By.XPATH, "..") + track_width = track.size["width"] + slider_width = slider.size["width"] + distance = track_width - slider_width + + if distance <= 0: + distance = 300 # fallback + + actions = ActionChains(driver) + actions.click_and_hold(slider) + actions.pause(0.1) + + # 模拟人类拖动:分段加速减速 + steps = 20 + for i in range(steps): + progress = (i + 1) / steps + # 加速-减速曲线 + if progress < 0.3: + offset = distance * progress * 0.5 + elif progress < 0.7: + offset = distance * progress + else: + offset = distance * progress * 0.95 + move = int(offset - (distance * (i / steps) if i > 0 else 0)) + actions.move_by_offset(move, 0) + actions.pause(0.02) + + actions.release() + actions.perform() + + logger.info("滑块拖动完成") + return CaptchaResult(True, CaptchaType.SLIDER) + + except Exception as e: + logger.error(f"滑块处理失败: {e}") + return CaptchaResult(False, CaptchaType.SLIDER, str(e)) + + +def handle_captcha(driver) -> CaptchaResult: + """自动检测并处理验证码""" + captcha_type = detect_captcha(driver) + + if captcha_type == CaptchaType.NONE: + return CaptchaResult(True, CaptchaType.NONE) + + if captcha_type == CaptchaType.SLIDER: + return solve_slider(driver) + + if captcha_type == CaptchaType.IMAGE: + logger.warning("图形验证码需要人工处理或接入 OCR 服务") + return CaptchaResult(False, CaptchaType.IMAGE, "需要人工处理") + + logger.warning(f"未知验证码类型,需要人工处理") + return CaptchaResult(False, captcha_type, "未知类型") diff --git a/scripts/main.py b/scripts/main.py new file mode 100644 index 0000000..9e0bd4a --- /dev/null +++ b/scripts/main.py @@ -0,0 +1,88 @@ +# coding: utf-8 +""" +主入口模块 +支持单账户和多账户模式 +""" + +import json +import logging +import sys +from pathlib import Path + +from ticket_script import TicketBot, TicketConfig + +logger = logging.getLogger("damai") + + +def load_config(path: str = "config/config.json") -> dict: + config_path = Path(path) + if not config_path.exists(): + logger.error(f"配置文件不存在: {path}") + sys.exit(1) + with open(config_path, "r", encoding="utf-8") as f: + return json.load(f) + + +def run_single_account(config: dict): + """单账户模式""" + tc = TicketConfig( + date=config["date"], + sess=config["sess"], + price=config["price"], + ticket_num=config.get("ticket_num", 2), + viewer_person=config.get("viewer_person", [1]), + nick_name=config.get("nick_name", ""), + damai_url=config.get("damai_url", "https://www.damai.cn/"), + target_url=config["target_url"], + driver_path=config["driver_path"], + max_retries=config.get("max_retries", 180), + retry_delay=config.get("retry_delay", 0.3), + ) + + bot = TicketBot(tc) + bot.run() + + +def run_multi_account(config: dict): + """多账户模式""" + from scripts.multi_account_manager import load_accounts_from_config, run_parallel + + accounts = load_accounts_from_config(config) + + def account_task(acc_id, acc_info): + tc = TicketConfig( + date=config.get("date", [1]), + sess=config.get("sess", [1]), + price=config.get("price", [1]), + ticket_num=config.get("ticket_num", 2), + viewer_person=acc_info.viewer_person or [1], + nick_name=acc_info.username, + damai_url=config.get("damai_url", "https://www.damai.cn/"), + target_url=acc_info.target_url or config.get("target_url", ""), + driver_path=config.get("driver_path", ""), + max_retries=config.get("max_retries", 180), + retry_delay=config.get("retry_delay", 0.3), + ) + bot = TicketBot(tc) + bot.run() + + threads = run_parallel(accounts, account_task, max_workers=3) + + for t in threads: + t.join() + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", + ) + + config_path = sys.argv[1] if len(sys.argv) > 1 else "config/config.json" + config = load_config(config_path) + + if isinstance(config.get("accounts"), (list, dict)): + run_multi_account(config) + else: + run_single_account(config) diff --git a/scripts/multi_account_manager.py b/scripts/multi_account_manager.py new file mode 100644 index 0000000..158cf72 --- /dev/null +++ b/scripts/multi_account_manager.py @@ -0,0 +1,85 @@ +# coding: utf-8 +""" +多账户管理模块 +支持:并发抢票、账户轮换 +""" + +import logging +import threading +from dataclasses import dataclass +from typing import Callable, Dict, List, Optional + +logger = logging.getLogger("damai.accounts") + + +@dataclass +class AccountInfo: + username: str + password: str + target_url: str + auto_buy_time: str = "" + viewer_person: List[int] = None + + def __post_init__(self): + if self.viewer_person is None: + self.viewer_person = [1] + + +def load_accounts_from_config(config: dict) -> Dict[str, AccountInfo]: + """从配置字典加载账户列表""" + accounts = {} + raw = config.get("accounts", []) + + if isinstance(raw, dict): + items = raw.items() + elif isinstance(raw, list): + items = enumerate(raw) + else: + return accounts + + for key, info in items: + acc_id = str(key) + accounts[acc_id] = AccountInfo( + username=info.get("username", ""), + password=info.get("password", ""), + target_url=info.get("target_url", ""), + auto_buy_time=info.get("auto_buy_time", ""), + viewer_person=info.get("viewer_person", [1]), + ) + + logger.info(f"加载了 {len(accounts)} 个账户") + return accounts + + +def run_parallel( + accounts: Dict[str, AccountInfo], + task_fn: Callable[[str, AccountInfo], None], + max_workers: int = 3, +) -> List[threading.Thread]: + """ + 并行执行多个账户的抢票任务 + + task_fn: 接受 (account_id, account_info) 的回调函数 + max_workers: 最大并发数 + """ + threads: List[threading.Thread] = [] + + for acc_id, acc_info in accounts.items(): + if len(threads) >= max_workers: + logger.warning(f"达到最大并发数 {max_workers},跳过账户 {acc_id}") + continue + + def run_task(aid=acc_id, ainfo=acc_info): + try: + logger.info(f"账户 {aid} 任务启动") + task_fn(aid, ainfo) + logger.info(f"账户 {aid} 任务完成") + except Exception as e: + logger.error(f"账户 {aid} 任务异常: {e}") + + t = threading.Thread(target=run_task, daemon=True, name=f"account-{acc_id}") + t.start() + threads.append(t) + logger.info(f"已启动线程: account-{acc_id}") + + return threads diff --git a/scripts/scheduler.py b/scripts/scheduler.py new file mode 100644 index 0000000..7058879 --- /dev/null +++ b/scripts/scheduler.py @@ -0,0 +1,133 @@ +# coding: utf-8 +""" +定时调度模块 +支持:定时开抢、NTP 时间校准 +""" + +import logging +import time +from datetime import datetime, timezone, timedelta +from typing import Optional + +logger = logging.getLogger("damai.scheduler") + + +CST = timezone(timedelta(hours=8)) + + +def parse_strike_time(time_str: str) -> datetime: + """ + 解析开抢时间字符串 + 支持格式:2026-01-25T12:00:00 或 2026-01-25 12:00:00 + """ + for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"): + try: + dt = datetime.strptime(time_str, fmt) + return dt.replace(tzinfo=CST) + except ValueError: + continue + raise ValueError(f"无法解析时间: {time_str}") + + +def get_ntp_time() -> Optional[datetime]: + """从 NTP 服务器获取精确时间""" + try: + import ntplib + + client = ntplib.NTPClient() + servers = ["time.google.com", "ntp.aliyun.com", "pool.ntp.org"] + for server in servers: + try: + response = client.request(server, timeout=3) + ntp_time = datetime.fromtimestamp(response.tx_time, tz=CST) + return ntp_time + except Exception: + continue + except ImportError: + logger.debug("ntplib 未安装,使用本地时间") + except Exception: + pass + return None + + +def wait_until_strike(strike_time_str: str, preheat_stages: list = None) -> dict: + """ + 等待到开抢时间 + preheat_stages: 预热阶段列表,如 [5.0, 2.0, 0.5] 表示提前 5s/2s/0.5s 触发预热回调 + + 返回: {"strike_time": datetime, "ntp_offset": float} + """ + strike_time = parse_strike_time(strike_time_str) + if preheat_stages is None: + preheat_stages = [5.0, 2.0, 0.5] + + # 尝试 NTP 校时 + ntp_time = get_ntp_time() + if ntp_time: + local_time = datetime.now(CST) + offset = (ntp_time - local_time).total_seconds() + logger.info(f"NTP 校时成功 | 偏差: {offset:+.3f}s") + else: + offset = 0.0 + logger.warning("NTP 校时失败,使用本地时间") + + now = datetime.now(CST) + remaining = (strike_time - now).total_seconds() + + if remaining < 0: + logger.warning(f"开抢时间已过 {-remaining:.1f}s,立即执行") + return {"strike_time": strike_time, "ntp_offset": offset} + + logger.info( + f"距离开抢: {remaining:.1f}s | " + f"开抢时间: {strike_time.strftime('%H:%M:%S')} | " + f"预热阶段: {preheat_stages}" + ) + + # 排序预热阶段(降序) + stages = sorted(preheat_stages, reverse=True) + + for stage_sec in stages: + sleep_until = remaining - stage_sec + if sleep_until <= 0: + continue + + logger.info(f"等待 {sleep_until:.1f}s 到预热阶段 (提前 {stage_sec}s)...") + time.sleep(sleep_until) + remaining = (strike_time - datetime.now(CST)).total_seconds() + + # 最后精确等待 + while True: + now = datetime.now(CST) + remaining = (strike_time - now).total_seconds() + if remaining <= 0.005: # 5ms 精度 + break + if remaining > 0.1: + time.sleep(remaining * 0.5) + else: + # 忙等待最后阶段 + pass + + logger.info("⏰ 到达开抢时间!") + return {"strike_time": strike_time, "ntp_offset": offset} + + +def format_countdown(strike_time_str: str) -> str: + """格式化倒计时显示""" + strike_time = parse_strike_time(strike_time_str) + now = datetime.now(CST) + remaining = (strike_time - now).total_seconds() + + if remaining < 0: + return f"已开抢 {-remaining:.0f}s" + + hours = int(remaining // 3600) + minutes = int((remaining % 3600) // 60) + seconds = int(remaining % 60) + + if hours > 0: + return f"{hours}h {minutes}m {seconds}s" + elif minutes > 0: + return f"{minutes}m {seconds}s" + else: + return f"{seconds}s" diff --git a/scripts/selenium_driver.py b/scripts/selenium_driver.py new file mode 100644 index 0000000..db0d6a7 --- /dev/null +++ b/scripts/selenium_driver.py @@ -0,0 +1,93 @@ +# coding: utf-8 +""" +Selenium 驱动工厂 +提供统一的浏览器驱动创建接口 +""" + +import logging +import sys +from typing import Optional + +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service + +logger = logging.getLogger("damai.driver") + + +def create_driver( + driver_path: str, + mobile_emulation: bool = True, + headless: bool = False, + disable_images: bool = True, + page_load_strategy: str = "eager", +) -> webdriver.Chrome: + """ + 创建配置好的 Chrome 驱动 + + Args: + driver_path: chromedriver 可执行文件路径 + mobile_emulation: 是否模拟移动设备 + headless: 是否无头模式 + disable_images: 是否禁用图片加载 + page_load_strategy: 页面加载策略 (normal/eager/none) + + Returns: + 配置好的 Chrome WebDriver 实例 + """ + options = Options() + + # 移动端模拟 + if mobile_emulation: + options.add_experimental_option( + "mobileEmulation", {"deviceName": "Nexus 6"} + ) + + # 反检测 + options.add_argument("--disable-blink-features=AutomationControlled") + options.add_experimental_option("excludeSwitches", ["enable-automation"]) + options.add_experimental_option("useAutomationExtension", False) + + # 性能优化 + if disable_images: + prefs = { + "profile.managed_default_content_settings.images": 2, + "profile.managed_default_content_settings.fonts": 2, + } + options.add_experimental_option("prefs", prefs) + + # 页面加载策略 + options.page_load_strategy = page_load_strategy + + # 无头模式 + if headless: + options.add_argument("--headless=new") + + # Linux 环境 + if sys.platform != "win32": + options.add_argument("--no-sandbox") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--disable-gpu") + + service = Service(executable_path=driver_path) + driver = webdriver.Chrome(service=service, options=options) + + # 隐藏 webdriver 痕迹 + driver.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + { + "source": """ + Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); + window.navigator.chrome = {runtime: {}}; + Object.defineProperty(navigator, 'plugins', { + get: () => [1, 2, 3, 4, 5], + }); + Object.defineProperty(navigator, 'languages', { + get: () => ['zh-CN', 'en-US', 'en'], + }); + """ + }, + ) + + logger.info(f"Chrome 驱动已创建 | mobile={mobile_emulation} headless={headless}") + return driver diff --git a/ticket_script.py b/ticket_script.py new file mode 100644 index 0000000..8f7559f --- /dev/null +++ b/ticket_script.py @@ -0,0 +1,665 @@ +# coding: utf-8 +""" +大麦网抢票核心脚本 +支持:大麦网 (damai.cn) 移动端页面 +""" + +import json +import logging +import pickle +import sys +import time +from dataclasses import dataclass, field +from enum import Enum, auto +from os.path import exists +from pathlib import Path +from typing import List, Optional + +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.ui import WebDriverWait +from selenium.common.exceptions import ( + NoSuchElementException, + TimeoutException, + WebDriverException, +) + +logger = logging.getLogger("damai") + + +# ── 状态机 ─────────────────────────────────────────────────────────────── + +class State(Enum): + INIT = auto() + LOGGED_IN = auto() + PAGE_LOADED = auto() + TICKET_SELECTED = auto() + ORDER_CONFIRMED = auto() + SUCCESS = auto() + FAILED = auto() + + +# ── 配置 ───────────────────────────────────────────────────────────────── + +@dataclass +class TicketConfig: + date: List[int] # 日期优先级序号 (1-based) + sess: List[int] # 场次优先级序号 (1-based) + price: List[int] # 票价优先级序号 (1-based) + ticket_num: int # 购买张数 + viewer_person: List[int] # 观影人序号 (1-based) + nick_name: str # 用户昵称 + damai_url: str # 大麦首页 + target_url: str # 目标演出 URL + driver_path: str # chromedriver 路径 + cookies_file: str = "cookies.pkl" + max_retries: int = 180 + retry_delay: float = 0.3 + headless: bool = False + mobile_emulation: bool = True + + +# ── 工具函数 ───────────────────────────────────────────────────────────── + +def safe_click(driver, element: WebElement, delay: float = 0.05): + """带滚动和延迟的安全点击""" + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", element) + time.sleep(delay) + element.click() + + +def has_class(element: WebElement, class_name: str) -> bool: + """检查元素是否包含指定 class""" + classes = element.get_attribute("class") or "" + return class_name in classes.split() + + +def find_child(element: WebElement, by: str, value: str) -> Optional[WebElement]: + """安全查找子元素,找不到返回 None""" + try: + return element.find_element(by=by, value=value) + except NoSuchElementException: + return None + + +def find_children(element: WebElement, by: str, value: str) -> List[WebElement]: + """安全查找子元素列表""" + try: + return element.find_elements(by=by, value=value) + except NoSuchElementException: + return [] + + +# ── 核心类 ─────────────────────────────────────────────────────────────── + +class TicketBot: + def __init__(self, config: TicketConfig): + self.config = config + self.driver: Optional[webdriver.Chrome] = None + self.state = State.INIT + self.attempts = 0 + self.start_time = 0.0 + self.end_time = 0.0 + + # ── 浏览器管理 ── + + def _build_driver(self) -> webdriver.Chrome: + options = Options() + + if self.config.mobile_emulation: + options.add_experimental_option( + "mobileEmulation", {"deviceName": "Nexus 6"} + ) + + options.add_argument("--disable-blink-features=AutomationControlled") + options.add_experimental_option("excludeSwitches", ["enable-automation"]) + options.add_experimental_option("useAutomationExtension", False) + + # 性能优化:不加载图片 + prefs = { + "profile.managed_default_content_settings.images": 2, + "profile.managed_default_content_settings.fonts": 2, + } + options.add_experimental_option("prefs", prefs) + + # 页面加载策略:不等完全加载 + options.page_load_strategy = "eager" + + if self.config.headless: + options.add_argument("--headless=new") + + if sys.platform != "win32": + options.add_argument("--no-sandbox") + options.add_argument("--disable-dev-shm-usage") + + service = Service(executable_path=self.config.driver_path) + driver = webdriver.Chrome(service=service, options=options) + + # 隐藏 webdriver 特征 + driver.execute_cdp_cmd( + "Page.addScriptToEvaluateOnNewDocument", + { + "source": """ + Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); + window.navigator.chrome = {runtime: {}}; + Object.defineProperty(navigator, 'plugins', {get: () => [1,2,3,4,5]}); + Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN','en-US','en']}); + """ + }, + ) + return driver + + def start_browser(self): + """启动浏览器""" + logger.info("启动浏览器...") + self.driver = self._build_driver() + + def close_browser(self): + """关闭浏览器""" + if self.driver: + try: + self.driver.quit() + except Exception: + pass + self.driver = None + + # ── Cookie 管理 ── + + def _save_cookies(self): + cookies_path = Path(self.config.cookies_file) + with open(cookies_path, "wb") as f: + pickle.dump(self.driver.get_cookies(), f) + logger.info(f"Cookie 已保存到 {cookies_path}") + + def _load_cookies(self) -> bool: + cookies_path = Path(self.config.cookies_file) + if not cookies_path.exists(): + return False + try: + with open(cookies_path, "rb") as f: + cookies = pickle.load(f) + self.driver.get(self.config.damai_url) + time.sleep(1) + for cookie in cookies: + # selenium 要求 domain 不能带当前域之外的 cookie + cookie.pop("sameSite", None) + try: + self.driver.add_cookie(cookie) + except Exception: + continue + logger.info("Cookie 已加载") + return True + except Exception as e: + logger.warning(f"加载 Cookie 失败: {e}") + return False + + # ── 登录 ── + + def login(self) -> bool: + """登录大麦网,需要用户扫码""" + logger.info("开始登录流程...") + self.driver.get(self.config.damai_url) + time.sleep(2) + + # 尝试加载已有 cookie + if self._load_cookies(): + self.driver.get(self.config.target_url) + time.sleep(2) + # 检查是否已登录 + if self._is_logged_in(): + logger.info("Cookie 登录成功") + self.state = State.LOGGED_IN + return True + + # Cookie 失效,需要扫码登录 + logger.info("请扫码登录...") + self.driver.get(self.config.damai_url) + time.sleep(1) + + try: + login_btn = WebDriverWait(self.driver, 10).until( + EC.element_to_be_clickable((By.CLASS_NAME, "login-user")) + ) + login_btn.click() + except TimeoutException: + logger.error("找不到登录按钮") + return False + + # 等待用户扫码完成(标题变化) + logger.info("等待扫码登录...") + original_title = self.driver.title + timeout = 120 # 最多等 2 分钟 + start = time.time() + while time.time() - start < timeout: + if self.driver.title != original_title and "登录" not in self.driver.title: + break + time.sleep(1) + else: + logger.error("扫码超时") + return False + + self._save_cookies() + self.state = State.LOGGED_IN + logger.info("登录成功") + return True + + def _is_logged_in(self) -> bool: + """检查是否已登录""" + try: + self.driver.find_element(By.CLASS_NAME, "login-user") + return False + except NoSuchElementException: + return True + + # ── 选票核心逻辑 ── + + def enter_target_page(self): + """进入目标演出页面""" + logger.info(f"打开目标页面: {self.config.target_url}") + self.driver.get(self.config.target_url) + try: + WebDriverWait(self.driver, 15).until( + EC.presence_of_element_located((By.ID, "app")) + ) + except TimeoutException: + raise RuntimeError("页面加载超时") + self.state = State.PAGE_LOADED + + def _select_date(self, container: WebElement) -> bool: + """选择日期""" + try: + date_section = WebDriverWait(self.driver, 3).until( + EC.presence_of_element_located( + (By.CLASS_NAME, "bui-dm-sku-calendar") + ) + ) + except TimeoutException: + logger.debug("没有日期选择器,跳过") + return True + + date_items = find_children( + date_section, By.CLASS_NAME, "bui-calendar-day-box" + ) + if not date_items: + logger.warning("未找到日期选项") + return False + + for idx in self.config.date: + if idx <= len(date_items): + safe_click(self.driver, date_items[idx - 1]) + logger.info(f"选择日期: 第{idx}个") + time.sleep(0.3) + return True + + return False + + def _select_session(self) -> bool: + """选择场次""" + try: + session_section = WebDriverWait(self.driver, 3).until( + EC.presence_of_element_located( + (By.CLASS_NAME, "sku-times-card") + ) + ) + except TimeoutException: + logger.debug("没有场次选择器") + return True + + session_items = find_children( + session_section, By.CLASS_NAME, "bui-dm-sku-card-item" + ) + if not session_items: + logger.warning("未找到场次选项") + return False + + for idx in self.config.sess: + if idx > len(session_items): + continue + item = session_items[idx - 1] + tag = find_child(item, By.CLASS_NAME, "item-tag") + + if tag: + tag_text = tag.text.strip() + if tag_text == "无票": + logger.debug(f"场次{idx}: 无票,跳过") + continue + # 预售/惠/有票 → 可选 + + safe_click(self.driver, item) + logger.info(f"选择场次: 第{idx}个") + time.sleep(0.3) + return True + + logger.warning("所有场次均无票") + return False + + def _select_price(self) -> bool: + """选择票价档位""" + try: + price_section = WebDriverWait(self.driver, 3).until( + EC.presence_of_element_located( + (By.CLASS_NAME, "sku-tickets-card") + ) + ) + except TimeoutException: + logger.warning("找不到票价选择区域") + return False + + price_items = find_children( + price_section, By.CLASS_NAME, "bui-dm-sku-card-item" + ) + if not price_items: + logger.warning("未找到票价选项") + return False + + for idx in self.config.price: + if idx > len(price_items): + continue + item = price_items[idx - 1] + tag = find_child(item, By.CLASS_NAME, "item-tag") + + if tag and tag.text.strip() in ("缺货登记", "notticket"): + logger.debug(f"票价{idx}: 缺货,跳过") + continue + + safe_click(self.driver, item) + logger.info(f"选择票价: 第{idx}个") + time.sleep(0.3) + return True + + logger.warning("所有票价档位均缺货") + return False + + def _click_buy_button(self) -> str: + """点击购买按钮,返回按钮文字""" + try: + box = WebDriverWait(self.driver, 5).until( + EC.presence_of_element_located((By.ID, "app")) + ) + except TimeoutException: + raise RuntimeError("页面刷新失败") + + # 关闭实名弹窗 + popups = find_children( + box, By.XPATH, "//div[@class='realname-popup']" + ) + if popups: + try: + btn = popups[0].find_element( + By.XPATH, ".//div[@class='operate']//div[@class='button']" + ) + btn.click() + time.sleep(0.3) + logger.info("已关闭实名弹窗") + except NoSuchElementException: + pass + + try: + buy_btn = box.find_element(By.CLASS_NAME, "buy__button") + time.sleep(0.5) + btn_text = buy_btn.text.strip() + except NoSuchElementException: + raise RuntimeError("找不到购买按钮") + + if "即将开抢" in btn_text: + raise RuntimeError("尚未开售,刷新等待") + + if "缺货" in btn_text: + raise RuntimeError("已缺货") + + buy_btn.click() + logger.info(f"点击购买按钮: {btn_text}") + return btn_text + + def _select_tickets_in_popup(self) -> str: + """在弹窗中选票,返回最终按钮文字""" + try: + box = WebDriverWait(self.driver, 3).until( + EC.presence_of_element_located( + (By.CSS_SELECTOR, ".sku-pop-wrapper") + ) + ) + except TimeoutException: + raise RuntimeError("选票弹窗未出现") + + self._select_date(box) + self._select_session() + self._select_price() + + # 等待计数器出现 + try: + WebDriverWait(self.driver, 3).until( + EC.presence_of_element_located( + (By.CLASS_NAME, "bui-dm-sku-counter") + ) + ) + except TimeoutException: + raise RuntimeError("票数选择器未出现") + + # 查找最终提交按钮 + try: + submit_btn = box.find_element( + By.CLASS_NAME, "sku-footer-buy-button" + ) + submit_text = submit_btn.text.strip() + except NoSuchElementException: + raise RuntimeError("找不到提交按钮") + + if not submit_text: + raise RuntimeError("提交按钮文字为空,请调整重试间隔") + + if submit_text == "选座购买": + submit_btn.click() + logger.info("进入选座模式,请手动选择座位") + return "选座" + + if submit_text == "提交缺货登记": + raise RuntimeError("票已售罄") + + if submit_text in ("立即预订", "立即购买", "确定"): + # 增加票数 + plus_btn = find_child(box, By.CLASS_NAME, "plus-enable") + if plus_btn and self.config.ticket_num > 1: + for _ in range(self.config.ticket_num - 1): + plus_btn.click() + time.sleep(0.05) + logger.info(f"设置购票数量: {self.config.ticket_num}") + + time.sleep(1.0) + submit_btn.click() + logger.info(f"点击: {submit_text}") + return submit_text + + raise RuntimeError(f"未知按钮文字: {submit_text}") + + # ── 订单确认 ── + + def _confirm_order(self) -> bool: + """选择观影人并提交订单""" + try: + WebDriverWait(self.driver, 10).until( + EC.presence_of_element_located( + ( + By.XPATH, + '//*[@id="dmViewerBlock_DmViewerBlock"]/div[2]/div/div', + ) + ) + ) + except TimeoutException: + logger.error("订单确认页面加载超时") + return False + + # 选择观影人 + people = self.driver.find_elements( + By.XPATH, '//*[@id="dmViewerBlock_DmViewerBlock"]/div[2]/div/div' + ) + time.sleep(0.3) + + selected = 0 + for idx in self.config.viewer_person: + if idx > len(people): + logger.warning(f"观影人序号{idx}超出范围(共{len(people)}人)") + continue + people[idx - 1].click() + selected += 1 + time.sleep(0.05) + + if selected == 0: + logger.error("未选中任何观影人") + return False + + logger.info(f"已选择 {selected} 位观影人") + + # 提交订单 + try: + submit_btn = WebDriverWait(self.driver, 5).until( + EC.element_to_be_clickable( + ( + By.XPATH, + '//*[@id="dmOrderSubmitBlock_DmOrderSubmitBlock"]/div[2]/div/div[2]/div[3]/div[2]', + ) + ) + ) + time.sleep(0.5) + submit_btn.click() + logger.info("已提交订单") + except TimeoutException: + logger.error("找不到提交订单按钮") + return False + + return True + + def _wait_payment(self, timeout: int = 30) -> bool: + """等待跳转到支付页面""" + logger.info("等待跳转支付页面...") + start = time.time() + while time.time() - start < timeout: + title = self.driver.title + if "支付宝" in title or "支付" in title: + logger.info("已跳转到支付页面") + return True + time.sleep(1) + + logger.warning(f"等待支付页面超时({timeout}s),当前标题: {self.driver.title}") + return False + + # ── 主循环 ── + + def choose_and_buy(self) -> bool: + """选票 + 下单流程""" + self.attempts += 1 + + try: + self.enter_target_page() + self._click_buy_button() + self._select_tickets_in_popup() + + # 等待跳转确认页 + try: + WebDriverWait(self.driver, 5).until(EC.title_contains("确认")) + except TimeoutException: + logger.warning("未跳转到确认页,重试...") + + self.state = State.TICKET_SELECTED + + if self._confirm_order(): + self.state = State.ORDER_CONFIRMED + self._wait_payment() + self.state = State.SUCCESS + return True + + except RuntimeError as e: + logger.warning(f"第{self.attempts}次尝试失败: {e}") + # 返回目标页重试 + try: + self.driver.get(self.config.target_url) + except WebDriverException: + pass + return False + except WebDriverException as e: + logger.error(f"浏览器异常: {e}") + return False + + return False + + def run(self): + """运行抢票主流程""" + self.start_time = time.time() + + try: + self.start_browser() + + if not self.login(): + logger.error("登录失败,退出") + self.state = State.FAILED + return + + logger.info( + f"开始抢票 | 最大重试: {self.config.max_retries}次 | " + f"重试间隔: {self.config.retry_delay}s" + ) + + for _ in range(self.config.max_retries): + if self.choose_and_buy(): + self.end_time = time.time() + elapsed = round(self.end_time - self.start_time, 1) + logger.info( + f"🎉 抢票成功!共 {self.attempts} 轮,耗时 {elapsed}s" + ) + self.state = State.SUCCESS + # 不关闭浏览器,让用户自行支付 + input("按回车键关闭浏览器...") + return + + time.sleep(self.config.retry_delay) + + logger.error(f"达到最大重试次数 ({self.config.max_retries}),抢票失败") + self.state = State.FAILED + + except KeyboardInterrupt: + logger.info("用户中断") + self.state = State.FAILED + except Exception as e: + logger.error(f"未预期的错误: {e}", exc_info=True) + self.state = State.FAILED + finally: + self.close_browser() + + +# ── 入口 ───────────────────────────────────────────────────────────────── + +def load_config(path: str = "config.json") -> TicketConfig: + """从 JSON 配置文件加载""" + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + return TicketConfig( + date=data["date"], + sess=data["sess"], + price=data["price"], + ticket_num=data["ticket_num"], + viewer_person=data["viewer_person"], + nick_name=data["nick_name"], + damai_url=data["damai_url"], + target_url=data["target_url"], + driver_path=data["driver_path"], + max_retries=data.get("max_retries", 180), + retry_delay=data.get("retry_delay", 0.3), + ) + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", + ) + + config_path = sys.argv[1] if len(sys.argv) > 1 else "config.json" + config = load_config(config_path) + + bot = TicketBot(config) + bot.run()