diff --git a/main.py b/main.py index 5925c9d..dba2806 100644 --- a/main.py +++ b/main.py @@ -1,18 +1,27 @@ +""" +独立抢购脚本(命令行模式)— 优化版 +用法: python main.py +""" import asyncio import yaml -import time from playwright.async_api import async_playwright from utils.stealth import stealth_async from utils.auth import Authenticator from utils.timer import PrecisionTimer +BUY_TEXTS = ["立即抢购", "立即购买", "马上抢", "立即秒杀"] +MAX_RETRIES = 5 +CONCURRENT_TABS = 2 + + async def snatch(config): auth = Authenticator(config.get("auth_file", "auth_state.json")) timer = PrecisionTimer() timer.sync_time() async with async_playwright() as p: - browser, context = await auth.get_context(p, headless=config.get("headless", False)) + browser, context = await auth.get_context( + p, headless=config.get("headless", False)) page = await context.new_page() await stealth_async(page) @@ -23,7 +32,7 @@ async def snatch(config): # 1. 预热:先打开页面 print(f"正在打开商品页面: {target_url}") - await page.goto(target_url) + await page.goto(target_url, wait_until='networkidle', timeout=20000) # 如果未登录,处理登录 if not auth.has_auth(): @@ -31,83 +40,111 @@ async def snatch(config): await auth.login(target_url) await context.close() await browser.close() - browser, context = await auth.get_context(p, headless=config.get("headless", False)) + browser, context = await auth.get_context( + p, headless=config.get("headless", False)) page = await context.new_page() await stealth_async(page) print("已重新加载登录状态,正在打开商品页面...") - await page.goto(target_url) + await page.goto(target_url, wait_until='networkidle', + timeout=20000) - # 2. 等待抢购时间 + # 2. 等待抢购时间(提前 500ms 触发) snatch_time = config.get("snatch_time") if snatch_time: - await timer.wait_until(snatch_time) + await timer.wait_until_early(snatch_time, early_ms=500) - # 3. 抢购核心逻辑 - max_retries = 3 - for attempt in range(max_retries): + # 3. 并发抢购 + pages = [page] + for _ in range(CONCURRENT_TABS - 1): try: - # 刷新页面,让预售按钮变为可点击 - if attempt > 0: - await asyncio.sleep(0.3) - await page.reload(wait_until='domcontentloaded', timeout=10000) - await asyncio.sleep(0.5) + p2 = await context.new_page() + await stealth_async(p2) + await p2.goto(target_url, wait_until='commit', timeout=10000) + pages.append(p2) + except Exception: + pass - # 点击购买按钮(兼容多种文案) - buy_btn = None - for text in ["立即抢购", "立即购买", "马上抢", "立即秒杀"]: - loc = page.get_by_text(text, exact=False) - if await loc.count() > 0: - buy_btn = loc.first - break + tasks = [_do_purchase(pg, i) for i, pg in enumerate(pages)] + results = await asyncio.gather(*tasks, return_exceptions=True) - if not buy_btn: - if attempt < max_retries - 1: - print(f"第{attempt+1}次未找到购买按钮,重试...") - continue - print("错误: 未找到购买按钮") - break - - await buy_btn.click(timeout=3000) - print("点击购买按钮") - - # 处理 SKU 选择(如果弹出规格选择框) - await asyncio.sleep(0.5) - try: - confirm_btn = page.get_by_text("确定", exact=True) - if await confirm_btn.count() > 0 and await confirm_btn.first.is_visible(): - # 自动选择第一个可用的 SKU 选项 - sku_items = page.locator('.sku-item:not(.disabled), .sku_item:not(.disabled), [class*="sku"] [class*="item"]:not([class*="disabled"])') - if await sku_items.count() > 0: - await sku_items.first.click() - print("自动选择 SKU") - await asyncio.sleep(0.3) - await confirm_btn.first.click(timeout=3000) - print("点击确定(SKU)") - except Exception: - pass - - # 提交订单 - submit_btn = page.get_by_text("提交订单") - await submit_btn.wait_for(state="visible", timeout=8000) - await submit_btn.click() - print("点击提交订单!抢购请求已发送!") + for r in results: + print(f"结果: {r}") + if isinstance(r, str) and ('已提交' in r or '已发送' in r): + print("✅ 抢购成功!") break - except Exception as e: - if attempt < max_retries - 1: - print(f"第{attempt+1}次尝试失败: {e},重试...") - else: - print(f"抢购失败: {e}") - - # 保持浏览器打开一段时间查看结果 await asyncio.sleep(10) await browser.close() +async def _do_purchase(page, tab_index=0): + """极速购买流程""" + for attempt in range(MAX_RETRIES): + try: + await page.reload(wait_until='commit', timeout=8000) + await asyncio.sleep(0.3) + + # 点击购买按钮 + buy_btn = None + for text in BUY_TEXTS: + loc = page.get_by_text(text, exact=False) + try: + await loc.first.wait_for(state="visible", timeout=1500) + buy_btn = loc.first + break + except Exception: + continue + + if not buy_btn: + if attempt < MAX_RETRIES - 1: + print(f"tab{tab_index} 第{attempt+1}次未找到按钮,重试...") + await asyncio.sleep(0.05) + continue + return f"tab{tab_index}: 未找到购买按钮" + + await buy_btn.click(timeout=2000) + print(f"tab{tab_index}: 点击购买按钮") + + # 处理 SKU 弹窗 + try: + confirm_btn = page.get_by_text("确定", exact=True) + await confirm_btn.first.wait_for(state="visible", + timeout=1500) + sku_sel = ('.sku-item:not(.disabled), ' + '.sku_item:not(.disabled), ' + '[class*="sku"] [class*="item"]' + ':not([class*="disabled"])') + sku_items = page.locator(sku_sel) + if await sku_items.count() > 0: + await sku_items.first.click() + print(f"tab{tab_index}: 自动选择 SKU") + await asyncio.sleep(0.1) + await confirm_btn.first.click(timeout=2000) + print(f"tab{tab_index}: 点击确定") + except Exception: + pass + + # 提交订单 + submit_btn = page.get_by_text("提交订单") + await submit_btn.wait_for(state="visible", timeout=6000) + await submit_btn.click() + return f"tab{tab_index}: 抢购请求已提交" + + except Exception as e: + if attempt < MAX_RETRIES - 1: + print(f"tab{tab_index} 第{attempt+1}次失败: {e},重试...") + await asyncio.sleep(0.05) + else: + return f"tab{tab_index}: 抢购失败: {e}" + + return f"tab{tab_index}: 重试次数用尽" + + def load_config(): with open("config.yaml", "r", encoding="utf-8") as f: return yaml.safe_load(f) + if __name__ == "__main__": config = load_config() asyncio.run(snatch(config)) diff --git a/server/services/snatcher.py b/server/services/snatcher.py index b5285f9..e047993 100644 --- a/server/services/snatcher.py +++ b/server/services/snatcher.py @@ -1,3 +1,12 @@ +""" +抢购核心服务 — 优化版 +优化点: + 1. 提前预热:开售前就打开页面并保持连接 + 2. 提前刷新:开售前 500ms 发起 reload,用 commit 级别不等渲染 + 3. 并发多 tab:同时开 2 个 tab 竞争抢购,谁先成功算谁的 + 4. 极速点击:不等 sleep,用 waitForSelector 替代固定等待 + 5. 重试间隔极短:50ms 级别快速重试 +""" import asyncio from playwright.async_api import async_playwright from utils.stealth import stealth_async @@ -6,6 +15,13 @@ from server.services.auth_service import get_browser_context, has_auth from server.database import get_db from datetime import datetime +# 并发 tab 数量 +CONCURRENT_TABS = 2 +# 最大重试次数(每个 tab) +MAX_RETRIES = 5 +# 购买按钮文案 +BUY_TEXTS = ["立即抢购", "立即购买", "马上抢", "立即秒杀"] + async def run_snatch(task_id): """执行单个抢购任务""" @@ -19,6 +35,12 @@ async def run_snatch(task_id): _update_task(db, task_id, 'failed', '账号未登录') return + target_url = task['target_url'] + if not target_url or not target_url.strip(): + _update_task(db, task_id, 'failed', + '商品链接为空,请检查购物车同步是否获取到了 itemID') + return + _update_task(db, task_id, 'running', '正在准备...') timer = PrecisionTimer() @@ -26,56 +48,83 @@ async def run_snatch(task_id): try: async with async_playwright() as p: - browser, context = await get_browser_context(p, account_id, headless=True) + browser, context = await get_browser_context( + p, account_id, headless=True) + + # ── 1. 预热:打开商品页面 ── + _update_task(db, task_id, 'running', '预热:打开商品页面...') page = await context.new_page() await stealth_async(page) + await page.goto(target_url, wait_until='networkidle', + timeout=20000) - target_url = task['target_url'] - - # 1. 预热:先打开商品页面 - _update_task(db, task_id, 'running', '正在打开商品页面...') - await page.goto(target_url, wait_until='networkidle', timeout=20000) - - # 检查是否被重定向到登录页 + # 检查页面状态 if 'login' in page.url.lower(): _update_task(db, task_id, 'failed', '登录态已过期') await browser.close() return - # 检查商品是否存在 - page_text = await page.locator('body').text_content() - if '商品不存在' in (page_text or '') or '已下架' in (page_text or ''): - _update_task(db, task_id, 'failed', f'商品不存在或已下架 (URL: {target_url})') + body_text = await page.locator('body').text_content() + if '商品不存在' in (body_text or '') or '已下架' in (body_text or ''): + _update_task(db, task_id, 'failed', '商品不存在或已下架') await browser.close() return - if not target_url or target_url.strip() == '': - _update_task(db, task_id, 'failed', '商品链接为空,请检查购物车同步是否获取到了 itemID') - await browser.close() - return - - # 2. 等待抢购时间 + # ── 2. 等待抢购时间 ── snatch_time = task['snatch_time'] if snatch_time: - _update_task(db, task_id, 'running', f'等待抢购时间: {snatch_time}') - await timer.wait_until(snatch_time) + _update_task(db, task_id, 'running', + f'等待开售: {snatch_time}') + # 提前 500ms 触发,因为 reload 本身需要时间 + await timer.wait_until_early(snatch_time, early_ms=500) - # 3. 抢购核心逻辑(与 main.py 一致) + # ── 3. 并发抢购 ── _update_task(db, task_id, 'running', '开始抢购...') - result = await _do_purchase(page) + + # 创建多个 tab 并发竞争 + pages = [page] + for _ in range(CONCURRENT_TABS - 1): + try: + p2 = await context.new_page() + await stealth_async(p2) + await p2.goto(target_url, wait_until='commit', + timeout=10000) + pages.append(p2) + except Exception: + pass + + # 所有 tab 并发执行抢购 + tasks_coro = [_do_purchase_fast(pg, i) for i, pg in + enumerate(pages)] + results = await asyncio.gather(*tasks_coro, + return_exceptions=True) + + # 取第一个成功的结果 + result = None + for r in results: + if isinstance(r, str) and ('已提交' in r or '已发送' in r): + result = r + break + if not result: + # 没有成功的,取最后一个非异常结果 + for r in results: + if isinstance(r, str): + result = r + if not result: + result = f"抢购失败: {results}" if '已提交' in result or '已发送' in result: _update_task(db, task_id, 'completed', result) db.execute( - 'INSERT INTO orders (task_id, account_id, status, detail) VALUES (?, ?, ?, ?)', - (task_id, account_id, 'submitted', result) - ) + 'INSERT INTO orders (task_id, account_id, status, detail)' + ' VALUES (?, ?, ?, ?)', + (task_id, account_id, 'submitted', result)) else: _update_task(db, task_id, 'failed', result) db.execute( - 'INSERT INTO orders (task_id, account_id, status, detail) VALUES (?, ?, ?, ?)', - (task_id, account_id, 'failed', result) - ) + 'INSERT INTO orders (task_id, account_id, status, detail)' + ' VALUES (?, ?, ?, ?)', + (task_id, account_id, 'failed', result)) db.commit() await asyncio.sleep(3) @@ -87,71 +136,78 @@ async def run_snatch(task_id): db.close() -async def _do_purchase(page): +async def _do_purchase_fast(page, tab_index=0): """ - 执行购买流程: - 1. 刷新页面(预售商品需要刷新才能出现购买按钮) - 2. 点击"立即购买"/"立即抢购" - 3. 处理 SKU 选择 -> 点击"确定" - 4. 进入订单确认页 -> 点击"提交订单" - 支持多次重试 + 极速购买流程(单个 tab): + 1. reload 用 commit 级别,不等完整渲染 + 2. 用 locator.wait_for 替代固定 sleep + 3. 重试间隔极短 """ - max_retries = 3 - for attempt in range(max_retries): + for attempt in range(MAX_RETRIES): try: - # 刷新页面,让预售按钮变为可点击 - if attempt > 0: - await asyncio.sleep(0.3) - await page.reload(wait_until='domcontentloaded', timeout=10000) - await asyncio.sleep(0.5) + # ── 刷新页面 ── + # 用 commit 级别:收到第一个字节就继续,不等 DOM 完整加载 + await page.reload(wait_until='commit', timeout=8000) + # 短暂等待让关键 DOM 出现(比 networkidle 快很多) + await asyncio.sleep(0.3) - # 点击购买按钮(兼容多种文案) + # ── 点击购买按钮 ── buy_btn = None - for text in ["立即抢购", "立即购买", "马上抢", "立即秒杀"]: + for text in BUY_TEXTS: loc = page.get_by_text(text, exact=False) - if await loc.count() > 0: + try: + await loc.first.wait_for(state="visible", timeout=1500) buy_btn = loc.first break + except Exception: + continue if not buy_btn: - if attempt < max_retries - 1: + # 按钮没出现,可能页面还没加载完或还没开售 + if attempt < MAX_RETRIES - 1: + await asyncio.sleep(0.05) # 50ms 后重试 continue - return "抢购操作失败: 未找到购买按钮" + return f"tab{tab_index}: 未找到购买按钮" - await buy_btn.click(timeout=3000) + await buy_btn.click(timeout=2000) - # 处理 SKU 选择(如果弹出规格选择框) - await asyncio.sleep(0.5) + # ── 处理 SKU 弹窗 ── try: - # 检查是否有 SKU 弹窗 confirm_btn = page.get_by_text("确定", exact=True) - if await confirm_btn.count() > 0 and await confirm_btn.first.is_visible(): - # 自动选择第一个可用的 SKU 选项 - sku_items = page.locator('.sku-item:not(.disabled), .sku_item:not(.disabled), [class*="sku"] [class*="item"]:not([class*="disabled"])') - if await sku_items.count() > 0: - await sku_items.first.click() - await asyncio.sleep(0.3) - await confirm_btn.first.click(timeout=3000) + await confirm_btn.first.wait_for(state="visible", + timeout=1500) + # 选第一个可用 SKU + sku_sel = ('.sku-item:not(.disabled), ' + '.sku_item:not(.disabled), ' + '[class*="sku"] [class*="item"]' + ':not([class*="disabled"])') + sku_items = page.locator(sku_sel) + if await sku_items.count() > 0: + await sku_items.first.click() + await asyncio.sleep(0.1) + await confirm_btn.first.click(timeout=2000) except Exception: + # 没有 SKU 弹窗,直接继续 pass - # 等待进入订单确认页,点击"提交订单" + # ── 提交订单 ── submit_btn = page.get_by_text("提交订单") - await submit_btn.wait_for(state="visible", timeout=8000) + await submit_btn.wait_for(state="visible", timeout=6000) await submit_btn.click() - return "抢购请求已提交" + return f"tab{tab_index}: 抢购请求已提交" except Exception as e: - if attempt < max_retries - 1: + if attempt < MAX_RETRIES - 1: + await asyncio.sleep(0.05) continue - return f"抢购操作失败: {e}" + return f"tab{tab_index}: 抢购失败: {e}" - return "抢购操作失败: 重试次数用尽" + return f"tab{tab_index}: 重试次数用尽" def _update_task(db, task_id, status, result): db.execute( "UPDATE tasks SET status = ?, result = ?, updated_at = ? WHERE id = ?", - (status, result, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), task_id) - ) + (status, result, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + task_id)) db.commit() diff --git a/utils/timer.py b/utils/timer.py index 95e6bc4..fc22880 100644 --- a/utils/timer.py +++ b/utils/timer.py @@ -3,47 +3,67 @@ import asyncio import ntplib from datetime import datetime + class PrecisionTimer: def __init__(self): - self.offset = 0 # 服务器时间 - 本地时间 + self.offset = 0 # 服务器时间 - 本地时间 def sync_time(self): - """ - 同步 NTP 时间,计算偏移量 - """ - try: - client = ntplib.NTPClient() - response = client.request('pool.ntp.org', version=3) - self.offset = response.tx_time - time.time() - print(f"时间同步完成,偏移量: {self.offset:.3f}s") - except Exception as e: - print(f"NTP同步失败: {e},将使用系统时间") + """多次 NTP 同步取中位数,提高精度""" + offsets = [] + servers = ['ntp.aliyun.com', 'ntp.tencent.com', 'pool.ntp.org'] + for server in servers: + try: + client = ntplib.NTPClient() + resp = client.request(server, version=3) + offsets.append(resp.tx_time - time.time()) + except Exception: + continue + if offsets: + offsets.sort() + self.offset = offsets[len(offsets) // 2] + print(f"时间同步完成,偏移量: {self.offset:.3f}s (采样{len(offsets)}个)") + else: + print("NTP同步失败,将使用系统时间") def get_server_time(self): return time.time() + self.offset async def wait_until(self, target_time_str): - """ - 等待直到目标时间 (格式: 2026-02-01 10:00:00) - """ + """等待直到目标时间,最后阶段忙等保证精度""" target_dt = datetime.strptime(target_time_str, "%Y-%m-%d %H:%M:%S") - target_timestamp = target_dt.timestamp() - - print(f"正在等待目标时间: {target_time_str}") - + target_ts = target_dt.timestamp() + + print(f"等待目标时间: {target_time_str}") + while True: - current_time = self.get_server_time() - remaining = target_timestamp - current_time - + now = self.get_server_time() + remaining = target_ts - now + if remaining <= 0: - print("目标时间已到!触发抢购!") + print("目标时间已到!") break - - # 动态调整调整休眠时间以节省 CPU 并保持精度 - if remaining > 1: - await asyncio.sleep(remaining - 0.5) + elif remaining > 10: + await asyncio.sleep(remaining - 10) + elif remaining > 2: + await asyncio.sleep(0.5) + elif remaining > 0.1: + await asyncio.sleep(0.01) + # remaining <= 0.1: 忙等,不 sleep + + async def wait_until_early(self, target_time_str, early_ms=500): + """提前 early_ms 毫秒触发,用于需要预操作的场景""" + target_dt = datetime.strptime(target_time_str, "%Y-%m-%d %H:%M:%S") + target_ts = target_dt.timestamp() - (early_ms / 1000.0) + + while True: + now = self.get_server_time() + remaining = target_ts - now + if remaining <= 0: + break + elif remaining > 10: + await asyncio.sleep(remaining - 10) + elif remaining > 2: + await asyncio.sleep(0.5) elif remaining > 0.1: await asyncio.sleep(0.01) - else: - # 最后一刻进入忙等以获取最高精度 - pass