# coding: utf-8 """ 大麦网抢票核心脚本 支持:大麦网 (damai.cn) 移动端页面 """ import json import logging import pickle import sys import time from dataclasses import dataclass, field from enum import Enum, auto from os.path import exists from pathlib import Path from typing import List, Optional from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from selenium.common.exceptions import ( NoSuchElementException, TimeoutException, WebDriverException, ) logger = logging.getLogger("damai") # ── 状态机 ─────────────────────────────────────────────────────────────── class State(Enum): INIT = auto() LOGGED_IN = auto() PAGE_LOADED = auto() TICKET_SELECTED = auto() ORDER_CONFIRMED = auto() SUCCESS = auto() FAILED = auto() # ── 配置 ───────────────────────────────────────────────────────────────── @dataclass class TicketConfig: date: List[int] # 日期优先级序号 (1-based) sess: List[int] # 场次优先级序号 (1-based) price: List[int] # 票价优先级序号 (1-based) ticket_num: int # 购买张数 viewer_person: List[int] # 观影人序号 (1-based) nick_name: str # 用户昵称 damai_url: str # 大麦首页 target_url: str # 目标演出 URL driver_path: str # chromedriver 路径 cookies_file: str = "cookies.pkl" max_retries: int = 180 retry_delay: float = 0.3 headless: bool = False mobile_emulation: bool = True # ── 工具函数 ───────────────────────────────────────────────────────────── def safe_click(driver, element: WebElement, delay: float = 0.05): """带滚动和延迟的安全点击""" driver.execute_script("arguments[0].scrollIntoView({block:'center'});", element) time.sleep(delay) element.click() def has_class(element: WebElement, class_name: str) -> bool: """检查元素是否包含指定 class""" classes = element.get_attribute("class") or "" return class_name in classes.split() def find_child(element: WebElement, by: str, value: str) -> Optional[WebElement]: """安全查找子元素,找不到返回 None""" try: return element.find_element(by=by, value=value) except NoSuchElementException: return None def find_children(element: WebElement, by: str, value: str) -> List[WebElement]: """安全查找子元素列表""" try: return element.find_elements(by=by, value=value) except NoSuchElementException: return [] # ── 核心类 ─────────────────────────────────────────────────────────────── class TicketBot: def __init__(self, config: TicketConfig): self.config = config self.driver: Optional[webdriver.Chrome] = None self.state = State.INIT self.attempts = 0 self.start_time = 0.0 self.end_time = 0.0 # ── 浏览器管理 ── def _build_driver(self) -> webdriver.Chrome: options = Options() if self.config.mobile_emulation: options.add_experimental_option( "mobileEmulation", {"deviceName": "Nexus 6"} ) options.add_argument("--disable-blink-features=AutomationControlled") options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) # 性能优化:不加载图片 prefs = { "profile.managed_default_content_settings.images": 2, "profile.managed_default_content_settings.fonts": 2, } options.add_experimental_option("prefs", prefs) # 页面加载策略:不等完全加载 options.page_load_strategy = "eager" if self.config.headless: options.add_argument("--headless=new") if sys.platform != "win32": options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") service = Service(executable_path=self.config.driver_path) driver = webdriver.Chrome(service=service, options=options) # 隐藏 webdriver 特征 driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", { "source": """ Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); window.navigator.chrome = {runtime: {}}; Object.defineProperty(navigator, 'plugins', {get: () => [1,2,3,4,5]}); Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN','en-US','en']}); """ }, ) return driver def start_browser(self): """启动浏览器""" logger.info("启动浏览器...") self.driver = self._build_driver() def close_browser(self): """关闭浏览器""" if self.driver: try: self.driver.quit() except Exception: pass self.driver = None # ── Cookie 管理 ── def _save_cookies(self): cookies_path = Path(self.config.cookies_file) with open(cookies_path, "wb") as f: pickle.dump(self.driver.get_cookies(), f) logger.info(f"Cookie 已保存到 {cookies_path}") def _load_cookies(self) -> bool: cookies_path = Path(self.config.cookies_file) if not cookies_path.exists(): return False try: with open(cookies_path, "rb") as f: cookies = pickle.load(f) self.driver.get(self.config.damai_url) time.sleep(1) for cookie in cookies: # selenium 要求 domain 不能带当前域之外的 cookie cookie.pop("sameSite", None) try: self.driver.add_cookie(cookie) except Exception: continue logger.info("Cookie 已加载") return True except Exception as e: logger.warning(f"加载 Cookie 失败: {e}") return False # ── 登录 ── def login(self) -> bool: """登录大麦网,需要用户扫码""" logger.info("开始登录流程...") self.driver.get(self.config.damai_url) time.sleep(2) # 尝试加载已有 cookie if self._load_cookies(): self.driver.get(self.config.target_url) time.sleep(2) # 检查是否已登录 if self._is_logged_in(): logger.info("Cookie 登录成功") self.state = State.LOGGED_IN return True # Cookie 失效,需要扫码登录 logger.info("请扫码登录...") self.driver.get(self.config.damai_url) time.sleep(1) try: login_btn = WebDriverWait(self.driver, 10).until( EC.element_to_be_clickable((By.CLASS_NAME, "login-user")) ) login_btn.click() except TimeoutException: logger.error("找不到登录按钮") return False # 等待用户扫码完成(标题变化) logger.info("等待扫码登录...") original_title = self.driver.title timeout = 120 # 最多等 2 分钟 start = time.time() while time.time() - start < timeout: if self.driver.title != original_title and "登录" not in self.driver.title: break time.sleep(1) else: logger.error("扫码超时") return False self._save_cookies() self.state = State.LOGGED_IN logger.info("登录成功") return True def _is_logged_in(self) -> bool: """检查是否已登录""" try: self.driver.find_element(By.CLASS_NAME, "login-user") return False except NoSuchElementException: return True # ── 选票核心逻辑 ── def enter_target_page(self): """进入目标演出页面""" logger.info(f"打开目标页面: {self.config.target_url}") self.driver.get(self.config.target_url) try: WebDriverWait(self.driver, 15).until( EC.presence_of_element_located((By.ID, "app")) ) except TimeoutException: raise RuntimeError("页面加载超时") self.state = State.PAGE_LOADED def _select_date(self, container: WebElement) -> bool: """选择日期""" try: date_section = WebDriverWait(self.driver, 3).until( EC.presence_of_element_located( (By.CLASS_NAME, "bui-dm-sku-calendar") ) ) except TimeoutException: logger.debug("没有日期选择器,跳过") return True date_items = find_children( date_section, By.CLASS_NAME, "bui-calendar-day-box" ) if not date_items: logger.warning("未找到日期选项") return False for idx in self.config.date: if idx <= len(date_items): safe_click(self.driver, date_items[idx - 1]) logger.info(f"选择日期: 第{idx}个") time.sleep(0.3) return True return False def _select_session(self) -> bool: """选择场次""" try: session_section = WebDriverWait(self.driver, 3).until( EC.presence_of_element_located( (By.CLASS_NAME, "sku-times-card") ) ) except TimeoutException: logger.debug("没有场次选择器") return True session_items = find_children( session_section, By.CLASS_NAME, "bui-dm-sku-card-item" ) if not session_items: logger.warning("未找到场次选项") return False for idx in self.config.sess: if idx > len(session_items): continue item = session_items[idx - 1] tag = find_child(item, By.CLASS_NAME, "item-tag") if tag: tag_text = tag.text.strip() if tag_text == "无票": logger.debug(f"场次{idx}: 无票,跳过") continue # 预售/惠/有票 → 可选 safe_click(self.driver, item) logger.info(f"选择场次: 第{idx}个") time.sleep(0.3) return True logger.warning("所有场次均无票") return False def _select_price(self) -> bool: """选择票价档位""" try: price_section = WebDriverWait(self.driver, 3).until( EC.presence_of_element_located( (By.CLASS_NAME, "sku-tickets-card") ) ) except TimeoutException: logger.warning("找不到票价选择区域") return False price_items = find_children( price_section, By.CLASS_NAME, "bui-dm-sku-card-item" ) if not price_items: logger.warning("未找到票价选项") return False for idx in self.config.price: if idx > len(price_items): continue item = price_items[idx - 1] tag = find_child(item, By.CLASS_NAME, "item-tag") if tag and tag.text.strip() in ("缺货登记", "notticket"): logger.debug(f"票价{idx}: 缺货,跳过") continue safe_click(self.driver, item) logger.info(f"选择票价: 第{idx}个") time.sleep(0.3) return True logger.warning("所有票价档位均缺货") return False def _click_buy_button(self) -> str: """点击购买按钮,返回按钮文字""" try: box = WebDriverWait(self.driver, 5).until( EC.presence_of_element_located((By.ID, "app")) ) except TimeoutException: raise RuntimeError("页面刷新失败") # 关闭实名弹窗 popups = find_children( box, By.XPATH, "//div[@class='realname-popup']" ) if popups: try: btn = popups[0].find_element( By.XPATH, ".//div[@class='operate']//div[@class='button']" ) btn.click() time.sleep(0.3) logger.info("已关闭实名弹窗") except NoSuchElementException: pass try: buy_btn = box.find_element(By.CLASS_NAME, "buy__button") time.sleep(0.5) btn_text = buy_btn.text.strip() except NoSuchElementException: raise RuntimeError("找不到购买按钮") if "即将开抢" in btn_text: raise RuntimeError("尚未开售,刷新等待") if "缺货" in btn_text: raise RuntimeError("已缺货") buy_btn.click() logger.info(f"点击购买按钮: {btn_text}") return btn_text def _select_tickets_in_popup(self) -> str: """在弹窗中选票,返回最终按钮文字""" try: box = WebDriverWait(self.driver, 3).until( EC.presence_of_element_located( (By.CSS_SELECTOR, ".sku-pop-wrapper") ) ) except TimeoutException: raise RuntimeError("选票弹窗未出现") self._select_date(box) self._select_session() self._select_price() # 等待计数器出现 try: WebDriverWait(self.driver, 3).until( EC.presence_of_element_located( (By.CLASS_NAME, "bui-dm-sku-counter") ) ) except TimeoutException: raise RuntimeError("票数选择器未出现") # 查找最终提交按钮 try: submit_btn = box.find_element( By.CLASS_NAME, "sku-footer-buy-button" ) submit_text = submit_btn.text.strip() except NoSuchElementException: raise RuntimeError("找不到提交按钮") if not submit_text: raise RuntimeError("提交按钮文字为空,请调整重试间隔") if submit_text == "选座购买": submit_btn.click() logger.info("进入选座模式,请手动选择座位") return "选座" if submit_text == "提交缺货登记": raise RuntimeError("票已售罄") if submit_text in ("立即预订", "立即购买", "确定"): # 增加票数 plus_btn = find_child(box, By.CLASS_NAME, "plus-enable") if plus_btn and self.config.ticket_num > 1: for _ in range(self.config.ticket_num - 1): plus_btn.click() time.sleep(0.05) logger.info(f"设置购票数量: {self.config.ticket_num}") time.sleep(1.0) submit_btn.click() logger.info(f"点击: {submit_text}") return submit_text raise RuntimeError(f"未知按钮文字: {submit_text}") # ── 订单确认 ── def _confirm_order(self) -> bool: """选择观影人并提交订单""" try: WebDriverWait(self.driver, 10).until( EC.presence_of_element_located( ( By.XPATH, '//*[@id="dmViewerBlock_DmViewerBlock"]/div[2]/div/div', ) ) ) except TimeoutException: logger.error("订单确认页面加载超时") return False # 选择观影人 people = self.driver.find_elements( By.XPATH, '//*[@id="dmViewerBlock_DmViewerBlock"]/div[2]/div/div' ) time.sleep(0.3) selected = 0 for idx in self.config.viewer_person: if idx > len(people): logger.warning(f"观影人序号{idx}超出范围(共{len(people)}人)") continue people[idx - 1].click() selected += 1 time.sleep(0.05) if selected == 0: logger.error("未选中任何观影人") return False logger.info(f"已选择 {selected} 位观影人") # 提交订单 try: submit_btn = WebDriverWait(self.driver, 5).until( EC.element_to_be_clickable( ( By.XPATH, '//*[@id="dmOrderSubmitBlock_DmOrderSubmitBlock"]/div[2]/div/div[2]/div[3]/div[2]', ) ) ) time.sleep(0.5) submit_btn.click() logger.info("已提交订单") except TimeoutException: logger.error("找不到提交订单按钮") return False return True def _wait_payment(self, timeout: int = 30) -> bool: """等待跳转到支付页面""" logger.info("等待跳转支付页面...") start = time.time() while time.time() - start < timeout: title = self.driver.title if "支付宝" in title or "支付" in title: logger.info("已跳转到支付页面") return True time.sleep(1) logger.warning(f"等待支付页面超时({timeout}s),当前标题: {self.driver.title}") return False # ── 主循环 ── def choose_and_buy(self) -> bool: """选票 + 下单流程""" self.attempts += 1 try: self.enter_target_page() self._click_buy_button() self._select_tickets_in_popup() # 等待跳转确认页 try: WebDriverWait(self.driver, 5).until(EC.title_contains("确认")) except TimeoutException: logger.warning("未跳转到确认页,重试...") self.state = State.TICKET_SELECTED if self._confirm_order(): self.state = State.ORDER_CONFIRMED self._wait_payment() self.state = State.SUCCESS return True except RuntimeError as e: logger.warning(f"第{self.attempts}次尝试失败: {e}") # 返回目标页重试 try: self.driver.get(self.config.target_url) except WebDriverException: pass return False except WebDriverException as e: logger.error(f"浏览器异常: {e}") return False return False def run(self): """运行抢票主流程""" self.start_time = time.time() try: self.start_browser() if not self.login(): logger.error("登录失败,退出") self.state = State.FAILED return logger.info( f"开始抢票 | 最大重试: {self.config.max_retries}次 | " f"重试间隔: {self.config.retry_delay}s" ) for _ in range(self.config.max_retries): if self.choose_and_buy(): self.end_time = time.time() elapsed = round(self.end_time - self.start_time, 1) logger.info( f"🎉 抢票成功!共 {self.attempts} 轮,耗时 {elapsed}s" ) self.state = State.SUCCESS # 不关闭浏览器,让用户自行支付 input("按回车键关闭浏览器...") return time.sleep(self.config.retry_delay) logger.error(f"达到最大重试次数 ({self.config.max_retries}),抢票失败") self.state = State.FAILED except KeyboardInterrupt: logger.info("用户中断") self.state = State.FAILED except Exception as e: logger.error(f"未预期的错误: {e}", exc_info=True) self.state = State.FAILED finally: self.close_browser() # ── 入口 ───────────────────────────────────────────────────────────────── def load_config(path: str = "config.json") -> TicketConfig: """从 JSON 配置文件加载""" with open(path, "r", encoding="utf-8") as f: data = json.load(f) return TicketConfig( date=data["date"], sess=data["sess"], price=data["price"], ticket_num=data["ticket_num"], viewer_person=data["viewer_person"], nick_name=data["nick_name"], damai_url=data["damai_url"], target_url=data["target_url"], driver_path=data["driver_path"], max_retries=data.get("max_retries", 180), retry_delay=data.get("retry_delay", 0.3), ) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) config_path = sys.argv[1] if len(sys.argv) > 1 else "config.json" config = load_config(config_path) bot = TicketBot(config) bot.run()