v1.0: 重写核心代码,清理空壳脚本

- 重写 ticket_script.py: 状态机、完整的选票/下单流程、反检测
- 重写 scripts/: 验证码处理、定时调度、NTP校时、多账户管理
- 删除空壳 appium_simulator.py
- 清理配置文件中的硬编码密码
- 重写 README,去除虚假宣传
This commit is contained in:
zhaojie
2026-04-01 15:24:34 +08:00
commit b711b98dc8
11 changed files with 1371 additions and 0 deletions

71
README.md Normal file
View File

@@ -0,0 +1,71 @@
# 大麦网抢票工具
基于 Selenium 的大麦网自动抢票脚本,支持定时开抢、多账户并发、验证码处理。
## 功能
- **自动抢票**:自动选日期→场次→票价→提交订单
- **定时开抢**NTP 时间校准,精确到毫秒级触发
- **多账户**:支持多账号并发抢票
- **验证码**:自动检测和处理滑块验证码
- **Cookie 持久化**:扫码登录后自动保存/加载 Cookie
- **反检测**:隐藏 webdriver 痕迹,移动端模拟
## 快速开始
```bash
# 1. 安装依赖
pip install selenium ntplib Pillow
# 2. 下载 chromedriver匹配你的 Chrome 版本)
# https://googlechromelabs.github.io/chrome-for-testing/
# 3. 编辑配置
cp config/config.json config/my_config.json
# 修改 target_url、date、sess、price 等参数
# 4. 运行(首次需要扫码登录)
python ticket_script.py config/my_config.json
```
## 配置说明
```json
{
"date": [1, 2], // 日期优先级序号 (1-based)
"sess": [1, 2], // 场次优先级序号 (1-based)
"price": [1, 2, 3], // 票价优先级序号 (1-based)
"ticket_num": 2, // 购买张数
"viewer_person": [1, 2], // 观影人序号 (1-based)
"nick_name": "", // 用户昵称
"driver_path": "/usr/local/bin/chromedriver",
"damai_url": "https://www.damai.cn/",
"target_url": "https://m.damai.cn/damai/detail/item.html?itemId=xxx",
"max_retries": 180, // 最大重试次数
"retry_delay": 0.3 // 重试间隔(秒)
}
```
## 项目结构
```
├── ticket_script.py # 核心抢票逻辑
├── scripts/
│ ├── main.py # 主入口(单/多账户)
│ ├── selenium_driver.py # 浏览器驱动工厂
│ ├── captcha_solver.py # 验证码检测与处理
│ ├── scheduler.py # 定时调度与 NTP 校时
│ ├── multi_account_manager.py # 多账户管理
│ └── mock_dependency_manager.py # GUI 依赖模拟
├── GUI.py # Tkinter 配置界面
└── config/
├── config.json # 默认配置
└── demo_config.json # 示例配置
```
## 说明
- 首次运行会打开浏览器,需要手动扫码登录
- 登录成功后 Cookie 会保存到 `cookies.pkl`,下次自动加载
- 支持通过 GUI 界面编辑配置(`python GUI.py`
- 仅限个人学习研究使用

13
config/config.json Normal file
View File

@@ -0,0 +1,13 @@
{
"date": [1, 2, 3],
"sess": [1, 2],
"price": [1, 2, 3],
"ticket_num": 2,
"viewer_person": [1, 2],
"nick_name": "",
"driver_path": "/usr/local/bin/chromedriver",
"damai_url": "https://www.damai.cn/",
"target_url": "https://m.damai.cn/damai/detail/item.html?itemId=714001339730",
"max_retries": 180,
"retry_delay": 0.3
}

32
config/demo_config.json Normal file
View File

@@ -0,0 +1,32 @@
{
"version": "1.0",
"global": {
"log_level": "INFO",
"timezone": "Asia/Shanghai"
},
"accounts": {
"acc_primary": {
"platform": "taopiaopiao",
"credentials": {
"mobile": "138xxxxxxxx",
"password": ""
},
"target": {
"event_url": "https://example.com/detail/987654321",
"priorities": {
"date": [1, 2],
"session": [1],
"price_range": "lowest_to_highest"
},
"tickets": 2,
"viewers": [0, 1]
}
}
},
"strategy": {
"auto_strike": true,
"strike_time": "2026-01-25T12:00:00",
"preheat_stages": [5.0, 2.0, 0.5],
"max_retries": 180
}
}

View File

@@ -0,0 +1,20 @@
{
"platforms": {
"damai": {
"platform_name": "大麦网",
"login_url": "https://www.damai.cn/login",
"login_method": "scan",
"mobile_url": "https://m.damai.cn/"
},
"taopiaopiao": {
"platform_name": "淘票票",
"login_url": "https://m.taopiaopiao.com/",
"login_method": "sms"
},
"showstart": {
"platform_name": "秀动",
"login_url": "https://www.showstart.com/",
"login_method": "scan"
}
}
}

11
scripts/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
"""
damai.scripts - 抢票工具模块
模块说明:
- ticket_script 核心抢票逻辑 (主入口)
- selenium_driver 浏览器驱动工厂
- captcha_solver 验证码检测与处理
- scheduler 定时调度与 NTP 校时
- multi_account_manager 多账户管理
- mock_dependency_manager GUI 依赖模拟 (仅用于界面演示)
"""

160
scripts/captcha_solver.py Normal file
View File

@@ -0,0 +1,160 @@
# coding: utf-8
"""
验证码处理模块
支持:图像验证码、滑块验证码
依赖Pillow (PIL)
"""
import io
import logging
from dataclasses import dataclass
from enum import Enum, auto
from typing import Optional, Tuple
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
logger = logging.getLogger("damai.captcha")
class CaptchaType(Enum):
NONE = auto()
IMAGE = auto() # 图形验证码
SLIDER = auto() # 滑块验证码
CLICK = auto() # 点选验证码
UNKNOWN = auto()
@dataclass
class CaptchaResult:
success: bool
captcha_type: CaptchaType
message: str = ""
def detect_captcha(driver) -> CaptchaType:
"""检测当前页面是否存在验证码"""
# 检测滑块
slider_selectors = [
".captcha-slider",
".slide-verify",
"[class*='slider']",
".nc-lang-cn", # 阿里云盾滑块
]
for sel in slider_selectors:
try:
driver.find_element(By.CSS_SELECTOR, sel)
logger.info(f"检测到滑块验证码: {sel}")
return CaptchaType.SLIDER
except Exception:
continue
# 检测图形验证码
image_selectors = [
".captcha-img",
"img[src*='captcha']",
"img[src*='verify']",
"#captcha_img",
]
for sel in image_selectors:
try:
driver.find_element(By.CSS_SELECTOR, sel)
logger.info(f"检测到图形验证码: {sel}")
return CaptchaType.IMAGE
except Exception:
continue
# 检测弹窗中是否有验证码关键词
try:
page_source = driver.page_source.lower()
if "captcha" in page_source or "验证码" in page_source:
return CaptchaType.UNKNOWN
except Exception:
pass
return CaptchaType.NONE
def solve_slider(driver, slider_element: Optional[WebElement] = None) -> CaptchaResult:
"""
处理滑块验证码
简单实现:拖动滑块从左到右
更精确的方案需要图像处理来识别缺口位置
"""
from selenium.webdriver.common.action_chains import ActionChains
selectors = [
".captcha-slider .slide-btn",
".slide-verify-slider-mask-item",
".nc-lang-cn .btn_slide",
"[class*='slider'] [class*='btn']",
]
slider = slider_element
if not slider:
for sel in selectors:
try:
slider = driver.find_element(By.CSS_SELECTOR, sel)
break
except Exception:
continue
if not slider:
return CaptchaResult(False, CaptchaType.SLIDER, "找不到滑块元素")
try:
# 获取滑块轨道宽度
track = slider.find_element(By.XPATH, "..")
track_width = track.size["width"]
slider_width = slider.size["width"]
distance = track_width - slider_width
if distance <= 0:
distance = 300 # fallback
actions = ActionChains(driver)
actions.click_and_hold(slider)
actions.pause(0.1)
# 模拟人类拖动:分段加速减速
steps = 20
for i in range(steps):
progress = (i + 1) / steps
# 加速-减速曲线
if progress < 0.3:
offset = distance * progress * 0.5
elif progress < 0.7:
offset = distance * progress
else:
offset = distance * progress * 0.95
move = int(offset - (distance * (i / steps) if i > 0 else 0))
actions.move_by_offset(move, 0)
actions.pause(0.02)
actions.release()
actions.perform()
logger.info("滑块拖动完成")
return CaptchaResult(True, CaptchaType.SLIDER)
except Exception as e:
logger.error(f"滑块处理失败: {e}")
return CaptchaResult(False, CaptchaType.SLIDER, str(e))
def handle_captcha(driver) -> CaptchaResult:
"""自动检测并处理验证码"""
captcha_type = detect_captcha(driver)
if captcha_type == CaptchaType.NONE:
return CaptchaResult(True, CaptchaType.NONE)
if captcha_type == CaptchaType.SLIDER:
return solve_slider(driver)
if captcha_type == CaptchaType.IMAGE:
logger.warning("图形验证码需要人工处理或接入 OCR 服务")
return CaptchaResult(False, CaptchaType.IMAGE, "需要人工处理")
logger.warning(f"未知验证码类型,需要人工处理")
return CaptchaResult(False, captcha_type, "未知类型")

88
scripts/main.py Normal file
View File

@@ -0,0 +1,88 @@
# coding: utf-8
"""
主入口模块
支持单账户和多账户模式
"""
import json
import logging
import sys
from pathlib import Path
from ticket_script import TicketBot, TicketConfig
logger = logging.getLogger("damai")
def load_config(path: str = "config/config.json") -> dict:
config_path = Path(path)
if not config_path.exists():
logger.error(f"配置文件不存在: {path}")
sys.exit(1)
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
def run_single_account(config: dict):
"""单账户模式"""
tc = TicketConfig(
date=config["date"],
sess=config["sess"],
price=config["price"],
ticket_num=config.get("ticket_num", 2),
viewer_person=config.get("viewer_person", [1]),
nick_name=config.get("nick_name", ""),
damai_url=config.get("damai_url", "https://www.damai.cn/"),
target_url=config["target_url"],
driver_path=config["driver_path"],
max_retries=config.get("max_retries", 180),
retry_delay=config.get("retry_delay", 0.3),
)
bot = TicketBot(tc)
bot.run()
def run_multi_account(config: dict):
"""多账户模式"""
from scripts.multi_account_manager import load_accounts_from_config, run_parallel
accounts = load_accounts_from_config(config)
def account_task(acc_id, acc_info):
tc = TicketConfig(
date=config.get("date", [1]),
sess=config.get("sess", [1]),
price=config.get("price", [1]),
ticket_num=config.get("ticket_num", 2),
viewer_person=acc_info.viewer_person or [1],
nick_name=acc_info.username,
damai_url=config.get("damai_url", "https://www.damai.cn/"),
target_url=acc_info.target_url or config.get("target_url", ""),
driver_path=config.get("driver_path", ""),
max_retries=config.get("max_retries", 180),
retry_delay=config.get("retry_delay", 0.3),
)
bot = TicketBot(tc)
bot.run()
threads = run_parallel(accounts, account_task, max_workers=3)
for t in threads:
t.join()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
config_path = sys.argv[1] if len(sys.argv) > 1 else "config/config.json"
config = load_config(config_path)
if isinstance(config.get("accounts"), (list, dict)):
run_multi_account(config)
else:
run_single_account(config)

View File

@@ -0,0 +1,85 @@
# coding: utf-8
"""
多账户管理模块
支持:并发抢票、账户轮换
"""
import logging
import threading
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional
logger = logging.getLogger("damai.accounts")
@dataclass
class AccountInfo:
username: str
password: str
target_url: str
auto_buy_time: str = ""
viewer_person: List[int] = None
def __post_init__(self):
if self.viewer_person is None:
self.viewer_person = [1]
def load_accounts_from_config(config: dict) -> Dict[str, AccountInfo]:
"""从配置字典加载账户列表"""
accounts = {}
raw = config.get("accounts", [])
if isinstance(raw, dict):
items = raw.items()
elif isinstance(raw, list):
items = enumerate(raw)
else:
return accounts
for key, info in items:
acc_id = str(key)
accounts[acc_id] = AccountInfo(
username=info.get("username", ""),
password=info.get("password", ""),
target_url=info.get("target_url", ""),
auto_buy_time=info.get("auto_buy_time", ""),
viewer_person=info.get("viewer_person", [1]),
)
logger.info(f"加载了 {len(accounts)} 个账户")
return accounts
def run_parallel(
accounts: Dict[str, AccountInfo],
task_fn: Callable[[str, AccountInfo], None],
max_workers: int = 3,
) -> List[threading.Thread]:
"""
并行执行多个账户的抢票任务
task_fn: 接受 (account_id, account_info) 的回调函数
max_workers: 最大并发数
"""
threads: List[threading.Thread] = []
for acc_id, acc_info in accounts.items():
if len(threads) >= max_workers:
logger.warning(f"达到最大并发数 {max_workers},跳过账户 {acc_id}")
continue
def run_task(aid=acc_id, ainfo=acc_info):
try:
logger.info(f"账户 {aid} 任务启动")
task_fn(aid, ainfo)
logger.info(f"账户 {aid} 任务完成")
except Exception as e:
logger.error(f"账户 {aid} 任务异常: {e}")
t = threading.Thread(target=run_task, daemon=True, name=f"account-{acc_id}")
t.start()
threads.append(t)
logger.info(f"已启动线程: account-{acc_id}")
return threads

133
scripts/scheduler.py Normal file
View File

@@ -0,0 +1,133 @@
# coding: utf-8
"""
定时调度模块
支持定时开抢、NTP 时间校准
"""
import logging
import time
from datetime import datetime, timezone, timedelta
from typing import Optional
logger = logging.getLogger("damai.scheduler")
CST = timezone(timedelta(hours=8))
def parse_strike_time(time_str: str) -> datetime:
"""
解析开抢时间字符串
支持格式2026-01-25T12:00:00 或 2026-01-25 12:00:00
"""
for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(time_str, fmt)
return dt.replace(tzinfo=CST)
except ValueError:
continue
raise ValueError(f"无法解析时间: {time_str}")
def get_ntp_time() -> Optional[datetime]:
"""从 NTP 服务器获取精确时间"""
try:
import ntplib
client = ntplib.NTPClient()
servers = ["time.google.com", "ntp.aliyun.com", "pool.ntp.org"]
for server in servers:
try:
response = client.request(server, timeout=3)
ntp_time = datetime.fromtimestamp(response.tx_time, tz=CST)
return ntp_time
except Exception:
continue
except ImportError:
logger.debug("ntplib 未安装,使用本地时间")
except Exception:
pass
return None
def wait_until_strike(strike_time_str: str, preheat_stages: list = None) -> dict:
"""
等待到开抢时间
preheat_stages: 预热阶段列表,如 [5.0, 2.0, 0.5] 表示提前 5s/2s/0.5s 触发预热回调
返回: {"strike_time": datetime, "ntp_offset": float}
"""
strike_time = parse_strike_time(strike_time_str)
if preheat_stages is None:
preheat_stages = [5.0, 2.0, 0.5]
# 尝试 NTP 校时
ntp_time = get_ntp_time()
if ntp_time:
local_time = datetime.now(CST)
offset = (ntp_time - local_time).total_seconds()
logger.info(f"NTP 校时成功 | 偏差: {offset:+.3f}s")
else:
offset = 0.0
logger.warning("NTP 校时失败,使用本地时间")
now = datetime.now(CST)
remaining = (strike_time - now).total_seconds()
if remaining < 0:
logger.warning(f"开抢时间已过 {-remaining:.1f}s立即执行")
return {"strike_time": strike_time, "ntp_offset": offset}
logger.info(
f"距离开抢: {remaining:.1f}s | "
f"开抢时间: {strike_time.strftime('%H:%M:%S')} | "
f"预热阶段: {preheat_stages}"
)
# 排序预热阶段(降序)
stages = sorted(preheat_stages, reverse=True)
for stage_sec in stages:
sleep_until = remaining - stage_sec
if sleep_until <= 0:
continue
logger.info(f"等待 {sleep_until:.1f}s 到预热阶段 (提前 {stage_sec}s)...")
time.sleep(sleep_until)
remaining = (strike_time - datetime.now(CST)).total_seconds()
# 最后精确等待
while True:
now = datetime.now(CST)
remaining = (strike_time - now).total_seconds()
if remaining <= 0.005: # 5ms 精度
break
if remaining > 0.1:
time.sleep(remaining * 0.5)
else:
# 忙等待最后阶段
pass
logger.info("⏰ 到达开抢时间!")
return {"strike_time": strike_time, "ntp_offset": offset}
def format_countdown(strike_time_str: str) -> str:
"""格式化倒计时显示"""
strike_time = parse_strike_time(strike_time_str)
now = datetime.now(CST)
remaining = (strike_time - now).total_seconds()
if remaining < 0:
return f"已开抢 {-remaining:.0f}s"
hours = int(remaining // 3600)
minutes = int((remaining % 3600) // 60)
seconds = int(remaining % 60)
if hours > 0:
return f"{hours}h {minutes}m {seconds}s"
elif minutes > 0:
return f"{minutes}m {seconds}s"
else:
return f"{seconds}s"

View File

@@ -0,0 +1,93 @@
# coding: utf-8
"""
Selenium 驱动工厂
提供统一的浏览器驱动创建接口
"""
import logging
import sys
from typing import Optional
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
logger = logging.getLogger("damai.driver")
def create_driver(
driver_path: str,
mobile_emulation: bool = True,
headless: bool = False,
disable_images: bool = True,
page_load_strategy: str = "eager",
) -> webdriver.Chrome:
"""
创建配置好的 Chrome 驱动
Args:
driver_path: chromedriver 可执行文件路径
mobile_emulation: 是否模拟移动设备
headless: 是否无头模式
disable_images: 是否禁用图片加载
page_load_strategy: 页面加载策略 (normal/eager/none)
Returns:
配置好的 Chrome WebDriver 实例
"""
options = Options()
# 移动端模拟
if mobile_emulation:
options.add_experimental_option(
"mobileEmulation", {"deviceName": "Nexus 6"}
)
# 反检测
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
# 性能优化
if disable_images:
prefs = {
"profile.managed_default_content_settings.images": 2,
"profile.managed_default_content_settings.fonts": 2,
}
options.add_experimental_option("prefs", prefs)
# 页面加载策略
options.page_load_strategy = page_load_strategy
# 无头模式
if headless:
options.add_argument("--headless=new")
# Linux 环境
if sys.platform != "win32":
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
service = Service(executable_path=driver_path)
driver = webdriver.Chrome(service=service, options=options)
# 隐藏 webdriver 痕迹
driver.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.navigator.chrome = {runtime: {}};
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'en-US', 'en'],
});
"""
},
)
logger.info(f"Chrome 驱动已创建 | mobile={mobile_emulation} headless={headless}")
return driver

665
ticket_script.py Normal file
View File

@@ -0,0 +1,665 @@
# coding: utf-8
"""
大麦网抢票核心脚本
支持:大麦网 (damai.cn) 移动端页面
"""
import json
import logging
import pickle
import sys
import time
from dataclasses import dataclass, field
from enum import Enum, auto
from os.path import exists
from pathlib import Path
from typing import List, Optional
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import (
NoSuchElementException,
TimeoutException,
WebDriverException,
)
logger = logging.getLogger("damai")
# ── 状态机 ───────────────────────────────────────────────────────────────
class State(Enum):
INIT = auto()
LOGGED_IN = auto()
PAGE_LOADED = auto()
TICKET_SELECTED = auto()
ORDER_CONFIRMED = auto()
SUCCESS = auto()
FAILED = auto()
# ── 配置 ─────────────────────────────────────────────────────────────────
@dataclass
class TicketConfig:
date: List[int] # 日期优先级序号 (1-based)
sess: List[int] # 场次优先级序号 (1-based)
price: List[int] # 票价优先级序号 (1-based)
ticket_num: int # 购买张数
viewer_person: List[int] # 观影人序号 (1-based)
nick_name: str # 用户昵称
damai_url: str # 大麦首页
target_url: str # 目标演出 URL
driver_path: str # chromedriver 路径
cookies_file: str = "cookies.pkl"
max_retries: int = 180
retry_delay: float = 0.3
headless: bool = False
mobile_emulation: bool = True
# ── 工具函数 ─────────────────────────────────────────────────────────────
def safe_click(driver, element: WebElement, delay: float = 0.05):
"""带滚动和延迟的安全点击"""
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", element)
time.sleep(delay)
element.click()
def has_class(element: WebElement, class_name: str) -> bool:
"""检查元素是否包含指定 class"""
classes = element.get_attribute("class") or ""
return class_name in classes.split()
def find_child(element: WebElement, by: str, value: str) -> Optional[WebElement]:
"""安全查找子元素,找不到返回 None"""
try:
return element.find_element(by=by, value=value)
except NoSuchElementException:
return None
def find_children(element: WebElement, by: str, value: str) -> List[WebElement]:
"""安全查找子元素列表"""
try:
return element.find_elements(by=by, value=value)
except NoSuchElementException:
return []
# ── 核心类 ───────────────────────────────────────────────────────────────
class TicketBot:
def __init__(self, config: TicketConfig):
self.config = config
self.driver: Optional[webdriver.Chrome] = None
self.state = State.INIT
self.attempts = 0
self.start_time = 0.0
self.end_time = 0.0
# ── 浏览器管理 ──
def _build_driver(self) -> webdriver.Chrome:
options = Options()
if self.config.mobile_emulation:
options.add_experimental_option(
"mobileEmulation", {"deviceName": "Nexus 6"}
)
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
# 性能优化:不加载图片
prefs = {
"profile.managed_default_content_settings.images": 2,
"profile.managed_default_content_settings.fonts": 2,
}
options.add_experimental_option("prefs", prefs)
# 页面加载策略:不等完全加载
options.page_load_strategy = "eager"
if self.config.headless:
options.add_argument("--headless=new")
if sys.platform != "win32":
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
service = Service(executable_path=self.config.driver_path)
driver = webdriver.Chrome(service=service, options=options)
# 隐藏 webdriver 特征
driver.execute_cdp_cmd(
"Page.addScriptToEvaluateOnNewDocument",
{
"source": """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.navigator.chrome = {runtime: {}};
Object.defineProperty(navigator, 'plugins', {get: () => [1,2,3,4,5]});
Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN','en-US','en']});
"""
},
)
return driver
def start_browser(self):
"""启动浏览器"""
logger.info("启动浏览器...")
self.driver = self._build_driver()
def close_browser(self):
"""关闭浏览器"""
if self.driver:
try:
self.driver.quit()
except Exception:
pass
self.driver = None
# ── Cookie 管理 ──
def _save_cookies(self):
cookies_path = Path(self.config.cookies_file)
with open(cookies_path, "wb") as f:
pickle.dump(self.driver.get_cookies(), f)
logger.info(f"Cookie 已保存到 {cookies_path}")
def _load_cookies(self) -> bool:
cookies_path = Path(self.config.cookies_file)
if not cookies_path.exists():
return False
try:
with open(cookies_path, "rb") as f:
cookies = pickle.load(f)
self.driver.get(self.config.damai_url)
time.sleep(1)
for cookie in cookies:
# selenium 要求 domain 不能带当前域之外的 cookie
cookie.pop("sameSite", None)
try:
self.driver.add_cookie(cookie)
except Exception:
continue
logger.info("Cookie 已加载")
return True
except Exception as e:
logger.warning(f"加载 Cookie 失败: {e}")
return False
# ── 登录 ──
def login(self) -> bool:
"""登录大麦网,需要用户扫码"""
logger.info("开始登录流程...")
self.driver.get(self.config.damai_url)
time.sleep(2)
# 尝试加载已有 cookie
if self._load_cookies():
self.driver.get(self.config.target_url)
time.sleep(2)
# 检查是否已登录
if self._is_logged_in():
logger.info("Cookie 登录成功")
self.state = State.LOGGED_IN
return True
# Cookie 失效,需要扫码登录
logger.info("请扫码登录...")
self.driver.get(self.config.damai_url)
time.sleep(1)
try:
login_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, "login-user"))
)
login_btn.click()
except TimeoutException:
logger.error("找不到登录按钮")
return False
# 等待用户扫码完成(标题变化)
logger.info("等待扫码登录...")
original_title = self.driver.title
timeout = 120 # 最多等 2 分钟
start = time.time()
while time.time() - start < timeout:
if self.driver.title != original_title and "登录" not in self.driver.title:
break
time.sleep(1)
else:
logger.error("扫码超时")
return False
self._save_cookies()
self.state = State.LOGGED_IN
logger.info("登录成功")
return True
def _is_logged_in(self) -> bool:
"""检查是否已登录"""
try:
self.driver.find_element(By.CLASS_NAME, "login-user")
return False
except NoSuchElementException:
return True
# ── 选票核心逻辑 ──
def enter_target_page(self):
"""进入目标演出页面"""
logger.info(f"打开目标页面: {self.config.target_url}")
self.driver.get(self.config.target_url)
try:
WebDriverWait(self.driver, 15).until(
EC.presence_of_element_located((By.ID, "app"))
)
except TimeoutException:
raise RuntimeError("页面加载超时")
self.state = State.PAGE_LOADED
def _select_date(self, container: WebElement) -> bool:
"""选择日期"""
try:
date_section = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "bui-dm-sku-calendar")
)
)
except TimeoutException:
logger.debug("没有日期选择器,跳过")
return True
date_items = find_children(
date_section, By.CLASS_NAME, "bui-calendar-day-box"
)
if not date_items:
logger.warning("未找到日期选项")
return False
for idx in self.config.date:
if idx <= len(date_items):
safe_click(self.driver, date_items[idx - 1])
logger.info(f"选择日期: 第{idx}")
time.sleep(0.3)
return True
return False
def _select_session(self) -> bool:
"""选择场次"""
try:
session_section = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "sku-times-card")
)
)
except TimeoutException:
logger.debug("没有场次选择器")
return True
session_items = find_children(
session_section, By.CLASS_NAME, "bui-dm-sku-card-item"
)
if not session_items:
logger.warning("未找到场次选项")
return False
for idx in self.config.sess:
if idx > len(session_items):
continue
item = session_items[idx - 1]
tag = find_child(item, By.CLASS_NAME, "item-tag")
if tag:
tag_text = tag.text.strip()
if tag_text == "无票":
logger.debug(f"场次{idx}: 无票,跳过")
continue
# 预售/惠/有票 → 可选
safe_click(self.driver, item)
logger.info(f"选择场次: 第{idx}")
time.sleep(0.3)
return True
logger.warning("所有场次均无票")
return False
def _select_price(self) -> bool:
"""选择票价档位"""
try:
price_section = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "sku-tickets-card")
)
)
except TimeoutException:
logger.warning("找不到票价选择区域")
return False
price_items = find_children(
price_section, By.CLASS_NAME, "bui-dm-sku-card-item"
)
if not price_items:
logger.warning("未找到票价选项")
return False
for idx in self.config.price:
if idx > len(price_items):
continue
item = price_items[idx - 1]
tag = find_child(item, By.CLASS_NAME, "item-tag")
if tag and tag.text.strip() in ("缺货登记", "notticket"):
logger.debug(f"票价{idx}: 缺货,跳过")
continue
safe_click(self.driver, item)
logger.info(f"选择票价: 第{idx}")
time.sleep(0.3)
return True
logger.warning("所有票价档位均缺货")
return False
def _click_buy_button(self) -> str:
"""点击购买按钮,返回按钮文字"""
try:
box = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.ID, "app"))
)
except TimeoutException:
raise RuntimeError("页面刷新失败")
# 关闭实名弹窗
popups = find_children(
box, By.XPATH, "//div[@class='realname-popup']"
)
if popups:
try:
btn = popups[0].find_element(
By.XPATH, ".//div[@class='operate']//div[@class='button']"
)
btn.click()
time.sleep(0.3)
logger.info("已关闭实名弹窗")
except NoSuchElementException:
pass
try:
buy_btn = box.find_element(By.CLASS_NAME, "buy__button")
time.sleep(0.5)
btn_text = buy_btn.text.strip()
except NoSuchElementException:
raise RuntimeError("找不到购买按钮")
if "即将开抢" in btn_text:
raise RuntimeError("尚未开售,刷新等待")
if "缺货" in btn_text:
raise RuntimeError("已缺货")
buy_btn.click()
logger.info(f"点击购买按钮: {btn_text}")
return btn_text
def _select_tickets_in_popup(self) -> str:
"""在弹窗中选票,返回最终按钮文字"""
try:
box = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, ".sku-pop-wrapper")
)
)
except TimeoutException:
raise RuntimeError("选票弹窗未出现")
self._select_date(box)
self._select_session()
self._select_price()
# 等待计数器出现
try:
WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "bui-dm-sku-counter")
)
)
except TimeoutException:
raise RuntimeError("票数选择器未出现")
# 查找最终提交按钮
try:
submit_btn = box.find_element(
By.CLASS_NAME, "sku-footer-buy-button"
)
submit_text = submit_btn.text.strip()
except NoSuchElementException:
raise RuntimeError("找不到提交按钮")
if not submit_text:
raise RuntimeError("提交按钮文字为空,请调整重试间隔")
if submit_text == "选座购买":
submit_btn.click()
logger.info("进入选座模式,请手动选择座位")
return "选座"
if submit_text == "提交缺货登记":
raise RuntimeError("票已售罄")
if submit_text in ("立即预订", "立即购买", "确定"):
# 增加票数
plus_btn = find_child(box, By.CLASS_NAME, "plus-enable")
if plus_btn and self.config.ticket_num > 1:
for _ in range(self.config.ticket_num - 1):
plus_btn.click()
time.sleep(0.05)
logger.info(f"设置购票数量: {self.config.ticket_num}")
time.sleep(1.0)
submit_btn.click()
logger.info(f"点击: {submit_text}")
return submit_text
raise RuntimeError(f"未知按钮文字: {submit_text}")
# ── 订单确认 ──
def _confirm_order(self) -> bool:
"""选择观影人并提交订单"""
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located(
(
By.XPATH,
'//*[@id="dmViewerBlock_DmViewerBlock"]/div[2]/div/div',
)
)
)
except TimeoutException:
logger.error("订单确认页面加载超时")
return False
# 选择观影人
people = self.driver.find_elements(
By.XPATH, '//*[@id="dmViewerBlock_DmViewerBlock"]/div[2]/div/div'
)
time.sleep(0.3)
selected = 0
for idx in self.config.viewer_person:
if idx > len(people):
logger.warning(f"观影人序号{idx}超出范围(共{len(people)}人)")
continue
people[idx - 1].click()
selected += 1
time.sleep(0.05)
if selected == 0:
logger.error("未选中任何观影人")
return False
logger.info(f"已选择 {selected} 位观影人")
# 提交订单
try:
submit_btn = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable(
(
By.XPATH,
'//*[@id="dmOrderSubmitBlock_DmOrderSubmitBlock"]/div[2]/div/div[2]/div[3]/div[2]',
)
)
)
time.sleep(0.5)
submit_btn.click()
logger.info("已提交订单")
except TimeoutException:
logger.error("找不到提交订单按钮")
return False
return True
def _wait_payment(self, timeout: int = 30) -> bool:
"""等待跳转到支付页面"""
logger.info("等待跳转支付页面...")
start = time.time()
while time.time() - start < timeout:
title = self.driver.title
if "支付宝" in title or "支付" in title:
logger.info("已跳转到支付页面")
return True
time.sleep(1)
logger.warning(f"等待支付页面超时({timeout}s),当前标题: {self.driver.title}")
return False
# ── 主循环 ──
def choose_and_buy(self) -> bool:
"""选票 + 下单流程"""
self.attempts += 1
try:
self.enter_target_page()
self._click_buy_button()
self._select_tickets_in_popup()
# 等待跳转确认页
try:
WebDriverWait(self.driver, 5).until(EC.title_contains("确认"))
except TimeoutException:
logger.warning("未跳转到确认页,重试...")
self.state = State.TICKET_SELECTED
if self._confirm_order():
self.state = State.ORDER_CONFIRMED
self._wait_payment()
self.state = State.SUCCESS
return True
except RuntimeError as e:
logger.warning(f"{self.attempts}次尝试失败: {e}")
# 返回目标页重试
try:
self.driver.get(self.config.target_url)
except WebDriverException:
pass
return False
except WebDriverException as e:
logger.error(f"浏览器异常: {e}")
return False
return False
def run(self):
"""运行抢票主流程"""
self.start_time = time.time()
try:
self.start_browser()
if not self.login():
logger.error("登录失败,退出")
self.state = State.FAILED
return
logger.info(
f"开始抢票 | 最大重试: {self.config.max_retries}次 | "
f"重试间隔: {self.config.retry_delay}s"
)
for _ in range(self.config.max_retries):
if self.choose_and_buy():
self.end_time = time.time()
elapsed = round(self.end_time - self.start_time, 1)
logger.info(
f"🎉 抢票成功!共 {self.attempts} 轮,耗时 {elapsed}s"
)
self.state = State.SUCCESS
# 不关闭浏览器,让用户自行支付
input("按回车键关闭浏览器...")
return
time.sleep(self.config.retry_delay)
logger.error(f"达到最大重试次数 ({self.config.max_retries}),抢票失败")
self.state = State.FAILED
except KeyboardInterrupt:
logger.info("用户中断")
self.state = State.FAILED
except Exception as e:
logger.error(f"未预期的错误: {e}", exc_info=True)
self.state = State.FAILED
finally:
self.close_browser()
# ── 入口 ─────────────────────────────────────────────────────────────────
def load_config(path: str = "config.json") -> TicketConfig:
"""从 JSON 配置文件加载"""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return TicketConfig(
date=data["date"],
sess=data["sess"],
price=data["price"],
ticket_num=data["ticket_num"],
viewer_person=data["viewer_person"],
nick_name=data["nick_name"],
damai_url=data["damai_url"],
target_url=data["target_url"],
driver_path=data["driver_path"],
max_retries=data.get("max_retries", 180),
retry_delay=data.get("retry_delay", 0.3),
)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
config_path = sys.argv[1] if len(sys.argv) > 1 else "config.json"
config = load_config(config_path)
bot = TicketBot(config)
bot.run()