Files
weidian/server/services/snatcher.py
Jeason 4dc918becd perf: 两阶段重试机制 - 商品页失败后自动切购物车下单
阶段1: 商品详情页快速抢购(5次,50ms间隔)
阶段2: 若阶段1全部失败,自动切换到购物车入口
       打开购物车勾选商品结算提交订单(5次重试)
避免单一页面刷新过多触发风控
2026-04-01 14:45:34 +08:00

331 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
抢购核心服务 — 优化版 v2
策略:两阶段重试
阶段1: 商品详情页直接抢购5次快速重试
阶段2: 若阶段1失败切换到购物车入口下单5次重试
从购物车勾选商品 → 结算 → 提交订单
这样避免单一页面刷新过多触发风控
"""
import asyncio
from playwright.async_api import async_playwright
from utils.stealth import stealth_async
from utils.timer import PrecisionTimer
from server.services.auth_service import get_browser_context, has_auth
from server.database import get_db
from datetime import datetime
CONCURRENT_TABS = 2
PHASE1_RETRIES = 5 # 商品页重试次数
PHASE2_RETRIES = 5 # 购物车重试次数
BUY_TEXTS = ["立即抢购", "立即购买", "马上抢", "立即秒杀"]
CART_URL = "https://weidian.com/new-cart/index.php"
async def run_snatch(task_id):
"""执行单个抢购任务"""
db = get_db()
task = db.execute('SELECT * FROM tasks WHERE id = ?', (task_id,)).fetchone()
if not task:
return
account_id = task['account_id']
if not has_auth(account_id):
_update_task(db, task_id, 'failed', '账号未登录')
return
target_url = task['target_url']
if not target_url or not target_url.strip():
_update_task(db, task_id, 'failed',
'商品链接为空,请检查购物车同步是否获取到了 itemID')
return
_update_task(db, task_id, 'running', '正在准备...')
timer = PrecisionTimer()
timer.sync_time()
cart_item_id = task['item_id'] or '' # 购物车商品ID用于阶段2
try:
async with async_playwright() as p:
browser, context = await get_browser_context(
p, account_id, headless=True)
# ── 1. 预热:打开商品页面 ──
_update_task(db, task_id, 'running', '预热:打开商品页面...')
page = await context.new_page()
await stealth_async(page)
await page.goto(target_url, wait_until='networkidle',
timeout=20000)
# 检查页面状态
if 'login' in page.url.lower():
_update_task(db, task_id, 'failed', '登录态已过期')
await browser.close()
return
body_text = await page.locator('body').text_content()
if '商品不存在' in (body_text or '') or '已下架' in (body_text or ''):
_update_task(db, task_id, 'failed', '商品不存在或已下架')
await browser.close()
return
# ── 2. 等待抢购时间 ──
snatch_time = task['snatch_time']
if snatch_time:
_update_task(db, task_id, 'running',
f'等待开售: {snatch_time}')
await timer.wait_until_early(snatch_time, early_ms=500)
# ══════════════════════════════════════
# 阶段1: 商品详情页直接抢购
# ══════════════════════════════════════
_update_task(db, task_id, 'running', '阶段1: 商品页抢购...')
pages = [page]
for _ in range(CONCURRENT_TABS - 1):
try:
p2 = await context.new_page()
await stealth_async(p2)
await p2.goto(target_url, wait_until='commit',
timeout=10000)
pages.append(p2)
except Exception:
pass
tasks_coro = [_phase1_purchase(pg, i) for i, pg in
enumerate(pages)]
results = await asyncio.gather(*tasks_coro,
return_exceptions=True)
result = _pick_success(results)
# ══════════════════════════════════════
# 阶段2: 购物车入口下单(降级策略)
# ══════════════════════════════════════
if not _is_success(result):
_update_task(db, task_id, 'running',
f'阶段1失败({result}),切换购物车下单...')
# 关闭之前的 tab开新 tab 去购物车
for pg in pages:
try:
await pg.close()
except Exception:
pass
cart_page = await context.new_page()
await stealth_async(cart_page)
result = await _phase2_cart_purchase(
cart_page, cart_item_id)
# ── 记录结果 ──
if _is_success(result):
_update_task(db, task_id, 'completed', result)
db.execute(
'INSERT INTO orders (task_id, account_id, status, detail)'
' VALUES (?, ?, ?, ?)',
(task_id, account_id, 'submitted', result))
else:
_update_task(db, task_id, 'failed', result)
db.execute(
'INSERT INTO orders (task_id, account_id, status, detail)'
' VALUES (?, ?, ?, ?)',
(task_id, account_id, 'failed', result))
db.commit()
await asyncio.sleep(3)
await browser.close()
except Exception as e:
_update_task(db, task_id, 'failed', str(e))
finally:
db.close()
# ─────────────────────────────────────────────
# 阶段1: 商品详情页快速抢购
# ─────────────────────────────────────────────
async def _phase1_purchase(page, tab_index=0):
"""商品详情页极速购买"""
for attempt in range(PHASE1_RETRIES):
try:
await page.reload(wait_until='commit', timeout=8000)
await asyncio.sleep(0.3)
buy_btn = None
for text in BUY_TEXTS:
loc = page.get_by_text(text, exact=False)
try:
await loc.first.wait_for(state="visible", timeout=1500)
buy_btn = loc.first
break
except Exception:
continue
if not buy_btn:
if attempt < PHASE1_RETRIES - 1:
await asyncio.sleep(0.05)
continue
return f"P1-tab{tab_index}: 未找到购买按钮"
await buy_btn.click(timeout=2000)
# SKU 弹窗
await _handle_sku(page)
# 提交订单
submit_btn = page.get_by_text("提交订单")
await submit_btn.wait_for(state="visible", timeout=6000)
await submit_btn.click()
return f"P1-tab{tab_index}: 抢购请求已提交"
except Exception as e:
if attempt < PHASE1_RETRIES - 1:
await asyncio.sleep(0.05)
continue
return f"P1-tab{tab_index}: {e}"
return f"P1-tab{tab_index}: 重试次数用尽"
# ─────────────────────────────────────────────
# 阶段2: 购物车入口下单
# ─────────────────────────────────────────────
async def _phase2_cart_purchase(page, cart_item_id):
"""
从购物车下单:
1. 打开购物车页面
2. 找到目标商品并勾选
3. 点击结算
4. 提交订单
"""
for attempt in range(PHASE2_RETRIES):
try:
# 打开购物车
if attempt == 0:
await page.goto(CART_URL, wait_until='networkidle',
timeout=15000)
await asyncio.sleep(1)
else:
await page.reload(wait_until='domcontentloaded',
timeout=10000)
await asyncio.sleep(0.5)
# 尝试勾选目标商品
selected = False
if cart_item_id:
# 通过 cart_item_id 精确定位
item_warp = page.locator(f'#\\3{cart_item_id[0]} {cart_item_id[1:]}' if len(cart_item_id) > 1 else f'#{cart_item_id}')
# 更可靠的方式:用 data-v + id 属性
item_warp = page.locator(f'.item_warp[id="{cart_item_id}"]')
if await item_warp.count() > 0:
# 点击商品前面的勾选框
checkbox = item_warp.locator('.checkbox').first
if await checkbox.count() > 0:
await checkbox.click()
selected = True
await asyncio.sleep(0.3)
if not selected:
# 没有精确定位到,尝试全选
try:
select_all = page.get_by_text("全选", exact=False)
if await select_all.count() > 0:
await select_all.first.click()
selected = True
await asyncio.sleep(0.3)
except Exception:
pass
if not selected:
# 还是没选中,点击第一个商品的 checkbox
try:
first_cb = page.locator(
'.item_warp .checkbox').first
if await first_cb.count() > 0:
await first_cb.click()
selected = True
await asyncio.sleep(0.3)
except Exception:
pass
# 点击结算按钮
settle_btn = None
for text in ["结算", "去结算", "立即结算"]:
loc = page.get_by_text(text, exact=False)
try:
await loc.first.wait_for(state="visible", timeout=2000)
settle_btn = loc.first
break
except Exception:
continue
if not settle_btn:
if attempt < PHASE2_RETRIES - 1:
await asyncio.sleep(0.1)
continue
return "P2: 未找到结算按钮"
await settle_btn.click(timeout=2000)
# 等待跳转到订单确认页
await asyncio.sleep(1)
# 提交订单
submit_btn = page.get_by_text("提交订单")
await submit_btn.wait_for(state="visible", timeout=8000)
await submit_btn.click()
return "P2-购物车: 抢购请求已提交"
except Exception as e:
if attempt < PHASE2_RETRIES - 1:
await asyncio.sleep(0.1)
continue
return f"P2-购物车: {e}"
return "P2-购物车: 重试次数用尽"
# ─────────────────────────────────────────────
# 公共工具
# ─────────────────────────────────────────────
async def _handle_sku(page):
"""处理 SKU 选择弹窗"""
try:
confirm_btn = page.get_by_text("确定", exact=True)
await confirm_btn.first.wait_for(state="visible", timeout=1500)
sku_sel = ('.sku-item:not(.disabled), '
'.sku_item:not(.disabled), '
'[class*="sku"] [class*="item"]'
':not([class*="disabled"])')
sku_items = page.locator(sku_sel)
if await sku_items.count() > 0:
await sku_items.first.click()
await asyncio.sleep(0.1)
await confirm_btn.first.click(timeout=2000)
except Exception:
pass
def _is_success(result):
return isinstance(result, str) and ('已提交' in result or '已发送' in result)
def _pick_success(results):
for r in results:
if _is_success(r):
return r
for r in results:
if isinstance(r, str):
return r
return f"全部失败: {results}"
def _update_task(db, task_id, status, result):
db.execute(
"UPDATE tasks SET status = ?, result = ?, updated_at = ? WHERE id = ?",
(status, result, datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
task_id))
db.commit()