Files
damai/ticket_script.py

666 lines
22 KiB
Python
Raw Permalink Normal View History

# coding: utf-8
"""
大麦网抢票核心脚本
支持大麦网 (damai.cn) 移动端页面
"""
import json
import logging
import pickle
import sys
import time
from dataclasses import dataclass, field
from enum import Enum, auto
from os.path import exists
from pathlib import Path
from typing import List, Optional
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import (
NoSuchElementException,
TimeoutException,
WebDriverException,
)
logger = logging.getLogger("damai")
# ── 状态机 ───────────────────────────────────────────────────────────────
class State(Enum):
INIT = auto()
LOGGED_IN = auto()
PAGE_LOADED = auto()
TICKET_SELECTED = auto()
ORDER_CONFIRMED = auto()
SUCCESS = auto()
FAILED = auto()
# ── 配置 ─────────────────────────────────────────────────────────────────
@dataclass
class TicketConfig:
date: List[int] # 日期优先级序号 (1-based)
sess: List[int] # 场次优先级序号 (1-based)
price: List[int] # 票价优先级序号 (1-based)
ticket_num: int # 购买张数
viewer_person: List[int] # 观影人序号 (1-based)
nick_name: str # 用户昵称
damai_url: str # 大麦首页
target_url: str # 目标演出 URL
driver_path: str # chromedriver 路径
cookies_file: str = "cookies.pkl"
max_retries: int = 180
retry_delay: float = 0.3
headless: bool = False
mobile_emulation: bool = True
# ── 工具函数 ─────────────────────────────────────────────────────────────
def safe_click(driver, element: WebElement, delay: float = 0.05):
"""带滚动和延迟的安全点击"""
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", element)
time.sleep(delay)
element.click()
def has_class(element: WebElement, class_name: str) -> bool:
"""检查元素是否包含指定 class"""
classes = element.get_attribute("class") or ""
return class_name in classes.split()
def find_child(element: WebElement, by: str, value: str) -> Optional[WebElement]:
"""安全查找子元素,找不到返回 None"""
try:
return element.find_element(by=by, value=value)
except NoSuchElementException:
return None
def find_children(element: WebElement, by: str, value: str) -> List[WebElement]:
"""安全查找子元素列表"""
try:
return element.find_elements(by=by, value=value)
except NoSuchElementException:
return []
# ── 核心类 ───────────────────────────────────────────────────────────────
class TicketBot:
def __init__(self, config: TicketConfig):
self.config = config
self.driver: Optional[webdriver.Chrome] = None
self.state = State.INIT
self.attempts = 0
self.start_time = 0.0
self.end_time = 0.0
# ── 浏览器管理 ──
def _build_driver(self) -> webdriver.Chrome:
options = Options()
if self.config.mobile_emulation:
options.add_experimental_option(
"mobileEmulation", {"deviceName": "Nexus 6"}
)
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
# 性能优化:不加载图片
prefs = {
"profile.managed_default_content_settings.images": 2,
"profile.managed_default_content_settings.fonts": 2,
}
options.add_experimental_option("prefs", prefs)
# 页面加载策略:不等完全加载
options.page_load_strategy = "eager"
if self.config.headless:
options.add_argument("--headless=new")
if sys.platform != "win32":
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
service = Service(executable_path=self.config.driver_path)
driver = webdriver.Chrome(service=service, options=options)
# 隐藏 webdriver 特征
driver.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.navigator.chrome = {runtime: {}};
Object.defineProperty(navigator, 'plugins', {get: () => [1,2,3,4,5]});
Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN','en-US','en']});
"""
},
)
return driver
def start_browser(self):
"""启动浏览器"""
logger.info("启动浏览器...")
self.driver = self._build_driver()
def close_browser(self):
"""关闭浏览器"""
if self.driver:
try:
self.driver.quit()
except Exception:
pass
self.driver = None
# ── Cookie 管理 ──
def _save_cookies(self):
cookies_path = Path(self.config.cookies_file)
with open(cookies_path, "wb") as f:
pickle.dump(self.driver.get_cookies(), f)
logger.info(f"Cookie 已保存到 {cookies_path}")
def _load_cookies(self) -> bool:
cookies_path = Path(self.config.cookies_file)
if not cookies_path.exists():
return False
try:
with open(cookies_path, "rb") as f:
cookies = pickle.load(f)
self.driver.get(self.config.damai_url)
time.sleep(1)
for cookie in cookies:
# selenium 要求 domain 不能带当前域之外的 cookie
cookie.pop("sameSite", None)
try:
self.driver.add_cookie(cookie)
except Exception:
continue
logger.info("Cookie 已加载")
return True
except Exception as e:
logger.warning(f"加载 Cookie 失败: {e}")
return False
# ── 登录 ──
def login(self) -> bool:
"""登录大麦网,需要用户扫码"""
logger.info("开始登录流程...")
self.driver.get(self.config.damai_url)
time.sleep(2)
# 尝试加载已有 cookie
if self._load_cookies():
self.driver.get(self.config.target_url)
time.sleep(2)
# 检查是否已登录
if self._is_logged_in():
logger.info("Cookie 登录成功")
self.state = State.LOGGED_IN
return True
# Cookie 失效,需要扫码登录
logger.info("请扫码登录...")
self.driver.get(self.config.damai_url)
time.sleep(1)
try:
login_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, "login-user"))
)
login_btn.click()
except TimeoutException:
logger.error("找不到登录按钮")
return False
# 等待用户扫码完成(标题变化)
logger.info("等待扫码登录...")
original_title = self.driver.title
timeout = 120 # 最多等 2 分钟
start = time.time()
while time.time() - start < timeout:
if self.driver.title != original_title and "登录" not in self.driver.title:
break
time.sleep(1)
else:
logger.error("扫码超时")
return False
self._save_cookies()
self.state = State.LOGGED_IN
logger.info("登录成功")
return True
def _is_logged_in(self) -> bool:
"""检查是否已登录"""
try:
self.driver.find_element(By.CLASS_NAME, "login-user")
return False
except NoSuchElementException:
return True
# ── 选票核心逻辑 ──
def enter_target_page(self):
"""进入目标演出页面"""
logger.info(f"打开目标页面: {self.config.target_url}")
self.driver.get(self.config.target_url)
try:
WebDriverWait(self.driver, 15).until(
EC.presence_of_element_located((By.ID, "app"))
)
except TimeoutException:
raise RuntimeError("页面加载超时")
self.state = State.PAGE_LOADED
def _select_date(self, container: WebElement) -> bool:
"""选择日期"""
try:
date_section = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "bui-dm-sku-calendar")
)
)
except TimeoutException:
logger.debug("没有日期选择器,跳过")
return True
date_items = find_children(
date_section, By.CLASS_NAME, "bui-calendar-day-box"
)
if not date_items:
logger.warning("未找到日期选项")
return False
for idx in self.config.date:
if idx <= len(date_items):
safe_click(self.driver, date_items[idx - 1])
logger.info(f"选择日期: 第{idx}")
time.sleep(0.3)
return True
return False
def _select_session(self) -> bool:
"""选择场次"""
try:
session_section = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "sku-times-card")
)
)
except TimeoutException:
logger.debug("没有场次选择器")
return True
session_items = find_children(
session_section, By.CLASS_NAME, "bui-dm-sku-card-item"
)
if not session_items:
logger.warning("未找到场次选项")
return False
for idx in self.config.sess:
if idx > len(session_items):
continue
item = session_items[idx - 1]
tag = find_child(item, By.CLASS_NAME, "item-tag")
if tag:
tag_text = tag.text.strip()
if tag_text == "无票":
logger.debug(f"场次{idx}: 无票,跳过")
continue
# 预售/惠/有票 → 可选
safe_click(self.driver, item)
logger.info(f"选择场次: 第{idx}")
time.sleep(0.3)
return True
logger.warning("所有场次均无票")
return False
def _select_price(self) -> bool:
"""选择票价档位"""
try:
price_section = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "sku-tickets-card")
)
)
except TimeoutException:
logger.warning("找不到票价选择区域")
return False
price_items = find_children(
price_section, By.CLASS_NAME, "bui-dm-sku-card-item"
)
if not price_items:
logger.warning("未找到票价选项")
return False
for idx in self.config.price:
if idx > len(price_items):
continue
item = price_items[idx - 1]
tag = find_child(item, By.CLASS_NAME, "item-tag")
if tag and tag.text.strip() in ("缺货登记", "notticket"):
logger.debug(f"票价{idx}: 缺货,跳过")
continue
safe_click(self.driver, item)
logger.info(f"选择票价: 第{idx}")
time.sleep(0.3)
return True
logger.warning("所有票价档位均缺货")
return False
def _click_buy_button(self) -> str:
"""点击购买按钮,返回按钮文字"""
try:
box = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.ID, "app"))
)
except TimeoutException:
raise RuntimeError("页面刷新失败")
# 关闭实名弹窗
popups = find_children(
box, By.XPATH, "//div[@class='realname-popup']"
)
if popups:
try:
btn = popups[0].find_element(
By.XPATH, ".//div[@class='operate']//div[@class='button']"
)
btn.click()
time.sleep(0.3)
logger.info("已关闭实名弹窗")
except NoSuchElementException:
pass
try:
buy_btn = box.find_element(By.CLASS_NAME, "buy__button")
time.sleep(0.5)
btn_text = buy_btn.text.strip()
except NoSuchElementException:
raise RuntimeError("找不到购买按钮")
if "即将开抢" in btn_text:
raise RuntimeError("尚未开售,刷新等待")
if "缺货" in btn_text:
raise RuntimeError("已缺货")
buy_btn.click()
logger.info(f"点击购买按钮: {btn_text}")
return btn_text
def _select_tickets_in_popup(self) -> str:
"""在弹窗中选票,返回最终按钮文字"""
try:
box = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, ".sku-pop-wrapper")
)
)
except TimeoutException:
raise RuntimeError("选票弹窗未出现")
self._select_date(box)
self._select_session()
self._select_price()
# 等待计数器出现
try:
WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "bui-dm-sku-counter")
)
)
except TimeoutException:
raise RuntimeError("票数选择器未出现")
# 查找最终提交按钮
try:
submit_btn = box.find_element(
By.CLASS_NAME, "sku-footer-buy-button"
)
submit_text = submit_btn.text.strip()
except NoSuchElementException:
raise RuntimeError("找不到提交按钮")
if not submit_text:
raise RuntimeError("提交按钮文字为空,请调整重试间隔")
if submit_text == "选座购买":
submit_btn.click()
logger.info("进入选座模式,请手动选择座位")
return "选座"
if submit_text == "提交缺货登记":
raise RuntimeError("票已售罄")
if submit_text in ("立即预订", "立即购买", "确定"):
# 增加票数
plus_btn = find_child(box, By.CLASS_NAME, "plus-enable")
if plus_btn and self.config.ticket_num > 1:
for _ in range(self.config.ticket_num - 1):
plus_btn.click()
time.sleep(0.05)
logger.info(f"设置购票数量: {self.config.ticket_num}")
time.sleep(1.0)
submit_btn.click()
logger.info(f"点击: {submit_text}")
return submit_text
raise RuntimeError(f"未知按钮文字: {submit_text}")
# ── 订单确认 ──
def _confirm_order(self) -> bool:
"""选择观影人并提交订单"""
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located(
(
By.XPATH,
'//*[@id="dmViewerBlock_DmViewerBlock"]/div[2]/div/div',
)
)
)
except TimeoutException:
logger.error("订单确认页面加载超时")
return False
# 选择观影人
people = self.driver.find_elements(
By.XPATH, '//*[@id="dmViewerBlock_DmViewerBlock"]/div[2]/div/div'
)
time.sleep(0.3)
selected = 0
for idx in self.config.viewer_person:
if idx > len(people):
logger.warning(f"观影人序号{idx}超出范围(共{len(people)}人)")
continue
people[idx - 1].click()
selected += 1
time.sleep(0.05)
if selected == 0:
logger.error("未选中任何观影人")
return False
logger.info(f"已选择 {selected} 位观影人")
# 提交订单
try:
submit_btn = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable(
(
By.XPATH,
'//*[@id="dmOrderSubmitBlock_DmOrderSubmitBlock"]/div[2]/div/div[2]/div[3]/div[2]',
)
)
)
time.sleep(0.5)
submit_btn.click()
logger.info("已提交订单")
except TimeoutException:
logger.error("找不到提交订单按钮")
return False
return True
def _wait_payment(self, timeout: int = 30) -> bool:
"""等待跳转到支付页面"""
logger.info("等待跳转支付页面...")
start = time.time()
while time.time() - start < timeout:
title = self.driver.title
if "支付宝" in title or "支付" in title:
logger.info("已跳转到支付页面")
return True
time.sleep(1)
logger.warning(f"等待支付页面超时({timeout}s),当前标题: {self.driver.title}")
return False
# ── 主循环 ──
def choose_and_buy(self) -> bool:
"""选票 + 下单流程"""
self.attempts += 1
try:
self.enter_target_page()
self._click_buy_button()
self._select_tickets_in_popup()
# 等待跳转确认页
try:
WebDriverWait(self.driver, 5).until(EC.title_contains("确认"))
except TimeoutException:
logger.warning("未跳转到确认页,重试...")
self.state = State.TICKET_SELECTED
if self._confirm_order():
self.state = State.ORDER_CONFIRMED
self._wait_payment()
self.state = State.SUCCESS
return True
except RuntimeError as e:
logger.warning(f"{self.attempts}次尝试失败: {e}")
# 返回目标页重试
try:
self.driver.get(self.config.target_url)
except WebDriverException:
pass
return False
except WebDriverException as e:
logger.error(f"浏览器异常: {e}")
return False
return False
def run(self):
"""运行抢票主流程"""
self.start_time = time.time()
try:
self.start_browser()
if not self.login():
logger.error("登录失败,退出")
self.state = State.FAILED
return
logger.info(
f"开始抢票 | 最大重试: {self.config.max_retries}次 | "
f"重试间隔: {self.config.retry_delay}s"
)
for _ in range(self.config.max_retries):
if self.choose_and_buy():
self.end_time = time.time()
elapsed = round(self.end_time - self.start_time, 1)
logger.info(
f"🎉 抢票成功!共 {self.attempts} 轮,耗时 {elapsed}s"
)
self.state = State.SUCCESS
# 不关闭浏览器,让用户自行支付
input("按回车键关闭浏览器...")
return
time.sleep(self.config.retry_delay)
logger.error(f"达到最大重试次数 ({self.config.max_retries}),抢票失败")
self.state = State.FAILED
except KeyboardInterrupt:
logger.info("用户中断")
self.state = State.FAILED
except Exception as e:
logger.error(f"未预期的错误: {e}", exc_info=True)
self.state = State.FAILED
finally:
self.close_browser()
# ── 入口 ─────────────────────────────────────────────────────────────────
def load_config(path: str = "config.json") -> TicketConfig:
"""从 JSON 配置文件加载"""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return TicketConfig(
date=data["date"],
sess=data["sess"],
price=data["price"],
ticket_num=data["ticket_num"],
viewer_person=data["viewer_person"],
nick_name=data["nick_name"],
damai_url=data["damai_url"],
target_url=data["target_url"],
driver_path=data["driver_path"],
max_retries=data.get("max_retries", 180),
retry_delay=data.get("retry_delay", 0.3),
)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
config_path = sys.argv[1] if len(sys.argv) > 1 else "config.json"
config = load_config(config_path)
bot = TicketBot(config)
bot.run()