diff --git a/server/services/snatcher.py b/server/services/snatcher.py index e047993..6bc8957 100644 --- a/server/services/snatcher.py +++ b/server/services/snatcher.py @@ -1,11 +1,10 @@ """ -抢购核心服务 — 优化版 -优化点: - 1. 提前预热:开售前就打开页面并保持连接 - 2. 提前刷新:开售前 500ms 发起 reload,用 commit 级别不等渲染 - 3. 并发多 tab:同时开 2 个 tab 竞争抢购,谁先成功算谁的 - 4. 极速点击:不等 sleep,用 waitForSelector 替代固定等待 - 5. 重试间隔极短:50ms 级别快速重试 +抢购核心服务 — 优化版 v2 +策略:两阶段重试 + 阶段1: 商品详情页直接抢购(5次快速重试) + 阶段2: 若阶段1失败,切换到购物车入口下单(5次重试) + 从购物车勾选商品 → 结算 → 提交订单 + 这样避免单一页面刷新过多触发风控 """ import asyncio from playwright.async_api import async_playwright @@ -15,12 +14,11 @@ from server.services.auth_service import get_browser_context, has_auth from server.database import get_db from datetime import datetime -# 并发 tab 数量 CONCURRENT_TABS = 2 -# 最大重试次数(每个 tab) -MAX_RETRIES = 5 -# 购买按钮文案 +PHASE1_RETRIES = 5 # 商品页重试次数 +PHASE2_RETRIES = 5 # 购物车重试次数 BUY_TEXTS = ["立即抢购", "立即购买", "马上抢", "立即秒杀"] +CART_URL = "https://weidian.com/new-cart/index.php" async def run_snatch(task_id): @@ -46,6 +44,8 @@ async def run_snatch(task_id): timer = PrecisionTimer() timer.sync_time() + cart_item_id = task['item_id'] or '' # 购物车商品ID,用于阶段2 + try: async with async_playwright() as p: browser, context = await get_browser_context( @@ -75,13 +75,13 @@ async def run_snatch(task_id): if snatch_time: _update_task(db, task_id, 'running', f'等待开售: {snatch_time}') - # 提前 500ms 触发,因为 reload 本身需要时间 await timer.wait_until_early(snatch_time, early_ms=500) - # ── 3. 并发抢购 ── - _update_task(db, task_id, 'running', '开始抢购...') + # ══════════════════════════════════════ + # 阶段1: 商品详情页直接抢购 + # ══════════════════════════════════════ + _update_task(db, task_id, 'running', '阶段1: 商品页抢购...') - # 创建多个 tab 并发竞争 pages = [page] for _ in range(CONCURRENT_TABS - 1): try: @@ -93,27 +93,34 @@ async def run_snatch(task_id): except Exception: pass - # 所有 tab 并发执行抢购 - tasks_coro = [_do_purchase_fast(pg, i) for i, pg in + tasks_coro = [_phase1_purchase(pg, i) for i, pg in enumerate(pages)] results = await asyncio.gather(*tasks_coro, return_exceptions=True) - # 取第一个成功的结果 - result = None - for r in results: - if isinstance(r, str) and ('已提交' in r or '已发送' in r): - result = r - break - if not result: - # 没有成功的,取最后一个非异常结果 - for r in results: - if isinstance(r, str): - result = r - if not result: - result = f"抢购失败: {results}" + result = _pick_success(results) - if '已提交' in result or '已发送' in result: + # ══════════════════════════════════════ + # 阶段2: 购物车入口下单(降级策略) + # ══════════════════════════════════════ + if not _is_success(result): + _update_task(db, task_id, 'running', + f'阶段1失败({result}),切换购物车下单...') + + # 关闭之前的 tab,开新 tab 去购物车 + for pg in pages: + try: + await pg.close() + except Exception: + pass + + cart_page = await context.new_page() + await stealth_async(cart_page) + result = await _phase2_cart_purchase( + cart_page, cart_item_id) + + # ── 记录结果 ── + if _is_success(result): _update_task(db, task_id, 'completed', result) db.execute( 'INSERT INTO orders (task_id, account_id, status, detail)' @@ -136,22 +143,16 @@ async def run_snatch(task_id): db.close() -async def _do_purchase_fast(page, tab_index=0): - """ - 极速购买流程(单个 tab): - 1. reload 用 commit 级别,不等完整渲染 - 2. 用 locator.wait_for 替代固定 sleep - 3. 重试间隔极短 - """ - for attempt in range(MAX_RETRIES): +# ───────────────────────────────────────────── +# 阶段1: 商品详情页快速抢购 +# ───────────────────────────────────────────── +async def _phase1_purchase(page, tab_index=0): + """商品详情页极速购买""" + for attempt in range(PHASE1_RETRIES): try: - # ── 刷新页面 ── - # 用 commit 级别:收到第一个字节就继续,不等 DOM 完整加载 await page.reload(wait_until='commit', timeout=8000) - # 短暂等待让关键 DOM 出现(比 networkidle 快很多) await asyncio.sleep(0.3) - # ── 点击购买按钮 ── buy_btn = None for text in BUY_TEXTS: loc = page.get_by_text(text, exact=False) @@ -163,46 +164,162 @@ async def _do_purchase_fast(page, tab_index=0): continue if not buy_btn: - # 按钮没出现,可能页面还没加载完或还没开售 - if attempt < MAX_RETRIES - 1: - await asyncio.sleep(0.05) # 50ms 后重试 + if attempt < PHASE1_RETRIES - 1: + await asyncio.sleep(0.05) continue - return f"tab{tab_index}: 未找到购买按钮" + return f"P1-tab{tab_index}: 未找到购买按钮" await buy_btn.click(timeout=2000) - # ── 处理 SKU 弹窗 ── - try: - confirm_btn = page.get_by_text("确定", exact=True) - await confirm_btn.first.wait_for(state="visible", - timeout=1500) - # 选第一个可用 SKU - sku_sel = ('.sku-item:not(.disabled), ' - '.sku_item:not(.disabled), ' - '[class*="sku"] [class*="item"]' - ':not([class*="disabled"])') - sku_items = page.locator(sku_sel) - if await sku_items.count() > 0: - await sku_items.first.click() - await asyncio.sleep(0.1) - await confirm_btn.first.click(timeout=2000) - except Exception: - # 没有 SKU 弹窗,直接继续 - pass + # SKU 弹窗 + await _handle_sku(page) - # ── 提交订单 ── + # 提交订单 submit_btn = page.get_by_text("提交订单") await submit_btn.wait_for(state="visible", timeout=6000) await submit_btn.click() - return f"tab{tab_index}: 抢购请求已提交" + return f"P1-tab{tab_index}: 抢购请求已提交" except Exception as e: - if attempt < MAX_RETRIES - 1: + if attempt < PHASE1_RETRIES - 1: await asyncio.sleep(0.05) continue - return f"tab{tab_index}: 抢购失败: {e}" + return f"P1-tab{tab_index}: {e}" - return f"tab{tab_index}: 重试次数用尽" + return f"P1-tab{tab_index}: 重试次数用尽" + + +# ───────────────────────────────────────────── +# 阶段2: 购物车入口下单 +# ───────────────────────────────────────────── +async def _phase2_cart_purchase(page, cart_item_id): + """ + 从购物车下单: + 1. 打开购物车页面 + 2. 找到目标商品并勾选 + 3. 点击结算 + 4. 提交订单 + """ + for attempt in range(PHASE2_RETRIES): + try: + # 打开购物车 + if attempt == 0: + await page.goto(CART_URL, wait_until='networkidle', + timeout=15000) + await asyncio.sleep(1) + else: + await page.reload(wait_until='domcontentloaded', + timeout=10000) + await asyncio.sleep(0.5) + + # 尝试勾选目标商品 + selected = False + if cart_item_id: + # 通过 cart_item_id 精确定位 + item_warp = page.locator(f'#\\3{cart_item_id[0]} {cart_item_id[1:]}' if len(cart_item_id) > 1 else f'#{cart_item_id}') + # 更可靠的方式:用 data-v + id 属性 + item_warp = page.locator(f'.item_warp[id="{cart_item_id}"]') + if await item_warp.count() > 0: + # 点击商品前面的勾选框 + checkbox = item_warp.locator('.checkbox').first + if await checkbox.count() > 0: + await checkbox.click() + selected = True + await asyncio.sleep(0.3) + + if not selected: + # 没有精确定位到,尝试全选 + try: + select_all = page.get_by_text("全选", exact=False) + if await select_all.count() > 0: + await select_all.first.click() + selected = True + await asyncio.sleep(0.3) + except Exception: + pass + + if not selected: + # 还是没选中,点击第一个商品的 checkbox + try: + first_cb = page.locator( + '.item_warp .checkbox').first + if await first_cb.count() > 0: + await first_cb.click() + selected = True + await asyncio.sleep(0.3) + except Exception: + pass + + # 点击结算按钮 + settle_btn = None + for text in ["结算", "去结算", "立即结算"]: + loc = page.get_by_text(text, exact=False) + try: + await loc.first.wait_for(state="visible", timeout=2000) + settle_btn = loc.first + break + except Exception: + continue + + if not settle_btn: + if attempt < PHASE2_RETRIES - 1: + await asyncio.sleep(0.1) + continue + return "P2: 未找到结算按钮" + + await settle_btn.click(timeout=2000) + + # 等待跳转到订单确认页 + await asyncio.sleep(1) + + # 提交订单 + submit_btn = page.get_by_text("提交订单") + await submit_btn.wait_for(state="visible", timeout=8000) + await submit_btn.click() + return "P2-购物车: 抢购请求已提交" + + except Exception as e: + if attempt < PHASE2_RETRIES - 1: + await asyncio.sleep(0.1) + continue + return f"P2-购物车: {e}" + + return "P2-购物车: 重试次数用尽" + + +# ───────────────────────────────────────────── +# 公共工具 +# ───────────────────────────────────────────── +async def _handle_sku(page): + """处理 SKU 选择弹窗""" + try: + confirm_btn = page.get_by_text("确定", exact=True) + await confirm_btn.first.wait_for(state="visible", timeout=1500) + sku_sel = ('.sku-item:not(.disabled), ' + '.sku_item:not(.disabled), ' + '[class*="sku"] [class*="item"]' + ':not([class*="disabled"])') + sku_items = page.locator(sku_sel) + if await sku_items.count() > 0: + await sku_items.first.click() + await asyncio.sleep(0.1) + await confirm_btn.first.click(timeout=2000) + except Exception: + pass + + +def _is_success(result): + return isinstance(result, str) and ('已提交' in result or '已发送' in result) + + +def _pick_success(results): + for r in results: + if _is_success(r): + return r + for r in results: + if isinstance(r, str): + return r + return f"全部失败: {results}" def _update_task(db, task_id, status, result):