- cart_service: 拦截购物车API提取真实itemID映射 - cart_service: 从Vue组件/data属性/window全局变量多路提取itemID - tasks: 区分item_id和cart_item_id,只有真实itemID才拼URL - snatcher: 增加商品不存在/已下架检测,增加空URL检测
158 lines
5.9 KiB
Python
158 lines
5.9 KiB
Python
import asyncio
|
|
from playwright.async_api import async_playwright
|
|
from utils.stealth import stealth_async
|
|
from utils.timer import PrecisionTimer
|
|
from server.services.auth_service import get_browser_context, has_auth
|
|
from server.database import get_db
|
|
from datetime import datetime
|
|
|
|
|
|
async def run_snatch(task_id):
|
|
"""执行单个抢购任务"""
|
|
db = get_db()
|
|
task = db.execute('SELECT * FROM tasks WHERE id = ?', (task_id,)).fetchone()
|
|
if not task:
|
|
return
|
|
|
|
account_id = task['account_id']
|
|
if not has_auth(account_id):
|
|
_update_task(db, task_id, 'failed', '账号未登录')
|
|
return
|
|
|
|
_update_task(db, task_id, 'running', '正在准备...')
|
|
|
|
timer = PrecisionTimer()
|
|
timer.sync_time()
|
|
|
|
try:
|
|
async with async_playwright() as p:
|
|
browser, context = await get_browser_context(p, account_id, headless=True)
|
|
page = await context.new_page()
|
|
await stealth_async(page)
|
|
|
|
target_url = task['target_url']
|
|
|
|
# 1. 预热:先打开商品页面
|
|
_update_task(db, task_id, 'running', '正在打开商品页面...')
|
|
await page.goto(target_url, wait_until='networkidle', timeout=20000)
|
|
|
|
# 检查是否被重定向到登录页
|
|
if 'login' in page.url.lower():
|
|
_update_task(db, task_id, 'failed', '登录态已过期')
|
|
await browser.close()
|
|
return
|
|
|
|
# 检查商品是否存在
|
|
page_text = await page.locator('body').text_content()
|
|
if '商品不存在' in (page_text or '') or '已下架' in (page_text or ''):
|
|
_update_task(db, task_id, 'failed', f'商品不存在或已下架 (URL: {target_url})')
|
|
await browser.close()
|
|
return
|
|
|
|
if not target_url or target_url.strip() == '':
|
|
_update_task(db, task_id, 'failed', '商品链接为空,请检查购物车同步是否获取到了 itemID')
|
|
await browser.close()
|
|
return
|
|
|
|
# 2. 等待抢购时间
|
|
snatch_time = task['snatch_time']
|
|
if snatch_time:
|
|
_update_task(db, task_id, 'running', f'等待抢购时间: {snatch_time}')
|
|
await timer.wait_until(snatch_time)
|
|
|
|
# 3. 抢购核心逻辑(与 main.py 一致)
|
|
_update_task(db, task_id, 'running', '开始抢购...')
|
|
result = await _do_purchase(page)
|
|
|
|
if '已提交' in result or '已发送' in result:
|
|
_update_task(db, task_id, 'completed', result)
|
|
db.execute(
|
|
'INSERT INTO orders (task_id, account_id, status, detail) VALUES (?, ?, ?, ?)',
|
|
(task_id, account_id, 'submitted', result)
|
|
)
|
|
else:
|
|
_update_task(db, task_id, 'failed', result)
|
|
db.execute(
|
|
'INSERT INTO orders (task_id, account_id, status, detail) VALUES (?, ?, ?, ?)',
|
|
(task_id, account_id, 'failed', result)
|
|
)
|
|
db.commit()
|
|
|
|
await asyncio.sleep(3)
|
|
await browser.close()
|
|
|
|
except Exception as e:
|
|
_update_task(db, task_id, 'failed', str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
async def _do_purchase(page):
|
|
"""
|
|
执行购买流程:
|
|
1. 刷新页面(预售商品需要刷新才能出现购买按钮)
|
|
2. 点击"立即购买"/"立即抢购"
|
|
3. 处理 SKU 选择 -> 点击"确定"
|
|
4. 进入订单确认页 -> 点击"提交订单"
|
|
支持多次重试
|
|
"""
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# 刷新页面,让预售按钮变为可点击
|
|
if attempt > 0:
|
|
await asyncio.sleep(0.3)
|
|
await page.reload(wait_until='domcontentloaded', timeout=10000)
|
|
await asyncio.sleep(0.5)
|
|
|
|
# 点击购买按钮(兼容多种文案)
|
|
buy_btn = None
|
|
for text in ["立即抢购", "立即购买", "马上抢", "立即秒杀"]:
|
|
loc = page.get_by_text(text, exact=False)
|
|
if await loc.count() > 0:
|
|
buy_btn = loc.first
|
|
break
|
|
|
|
if not buy_btn:
|
|
if attempt < max_retries - 1:
|
|
continue
|
|
return "抢购操作失败: 未找到购买按钮"
|
|
|
|
await buy_btn.click(timeout=3000)
|
|
|
|
# 处理 SKU 选择(如果弹出规格选择框)
|
|
await asyncio.sleep(0.5)
|
|
try:
|
|
# 检查是否有 SKU 弹窗
|
|
confirm_btn = page.get_by_text("确定", exact=True)
|
|
if await confirm_btn.count() > 0 and await confirm_btn.first.is_visible():
|
|
# 自动选择第一个可用的 SKU 选项
|
|
sku_items = page.locator('.sku-item:not(.disabled), .sku_item:not(.disabled), [class*="sku"] [class*="item"]:not([class*="disabled"])')
|
|
if await sku_items.count() > 0:
|
|
await sku_items.first.click()
|
|
await asyncio.sleep(0.3)
|
|
await confirm_btn.first.click(timeout=3000)
|
|
except Exception:
|
|
pass
|
|
|
|
# 等待进入订单确认页,点击"提交订单"
|
|
submit_btn = page.get_by_text("提交订单")
|
|
await submit_btn.wait_for(state="visible", timeout=8000)
|
|
await submit_btn.click()
|
|
return "抢购请求已提交"
|
|
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
continue
|
|
return f"抢购操作失败: {e}"
|
|
|
|
return "抢购操作失败: 重试次数用尽"
|
|
|
|
|
|
def _update_task(db, task_id, status, result):
|
|
db.execute(
|
|
"UPDATE tasks SET status = ?, result = ?, updated_at = ? WHERE id = ?",
|
|
(status, result, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), task_id)
|
|
)
|
|
db.commit()
|