Files
weidian/server/services/cart_service.py

239 lines
8.0 KiB
Python
Raw Normal View History

"""
购物车预售商品抓取服务
通过 Playwright 打开购物车页面 DOM 提取商品信息
并通过点击商品图片获取跳转 URL 来提取真实 itemID
"""
import asyncio
import re
from playwright.async_api import async_playwright
from utils.stealth import stealth_async
from server.services.auth_service import get_browser_context, has_auth
CART_URL = "https://weidian.com/new-cart/index.php"
# 从 DOM 提取购物车商品基本信息
EXTRACT_JS = """() => {
const R = [];
const sws = document.querySelectorAll(
'div.shop_info.cart_content div.shop_warp'
);
for (const sw of sws) {
const sn = (sw.querySelector('.shop_name') || {}).textContent || '';
const iws = sw.querySelectorAll('.item_warp');
for (const iw of iws) {
const o = {
shop_name: sn.trim(),
cart_item_id: iw.id,
item_id: '',
title: '', sku_name: '', price: '',
is_presale: false, countdown_text: '',
sale_time: '', presale_type: ''
};
// 尝试多种方式提取 itemID
// 1. Vue 组件数据
try {
const vue = iw.__vue__;
if (vue) {
const d = vue.$data || vue._data || vue;
o.item_id = String(d.itemID || d.itemId || d.item_id
|| d.goodsId || d.goods_id || '');
// 也检查 props
if (!o.item_id && vue.$props) {
const pp = vue.$props;
o.item_id = String(pp.itemID || pp.itemId
|| pp.item_id || pp.goodsId || '');
}
// 检查 item 对象
if (!o.item_id && d.item) {
o.item_id = String(d.item.itemID || d.item.itemId
|| d.item.item_id || '');
}
}
} catch(e) {}
// 2. data-* 属性
if (!o.item_id) {
o.item_id = iw.dataset.itemId || iw.dataset.itemid
|| iw.dataset.goodsId || iw.dataset.id || '';
}
// 3. 内部链接
if (!o.item_id) {
const a = iw.querySelector('a[href*="itemID"]');
if (a) {
const m = a.href.match(/itemID=(\\d+)/);
if (m) o.item_id = m[1];
}
}
// 4. 图片 URL 中可能有商品信息备用
const img = iw.querySelector('.item_img img');
o.img_src = img ? (img.src || img.dataset.src || '') : '';
const te = iw.querySelector('.item_title');
if (te) o.title = te.textContent.trim();
const sk = iw.querySelector('.item_sku');
if (sk) o.sku_name = sk.textContent.trim();
const pr = iw.querySelector('.item_prices');
if (pr) o.price = pr.textContent.replace(/[^\\d.]/g, '');
const de = iw.querySelector('.item_desc');
if (de) {
const dt = de.querySelector('.title');
const dd = de.querySelector('.desc');
const wm = de.querySelector('.warn_msg');
if (dt && /\\u5b9a\\u65f6\\s*\\u5f00\\u552e/.test(dt.textContent)) {
o.is_presale = true;
const d = dd ? dd.textContent.trim() : '';
const w = wm ? wm.textContent.trim() : '';
if (d.includes('\\u8ddd\\u79bb\\u5f00\\u552e\\u8fd8\\u5269')) {
o.presale_type = 'countdown';
o.countdown_text = w;
} else if (d.includes('\\u5f00\\u552e\\u65f6\\u95f4')) {
o.presale_type = 'scheduled';
o.sale_time = w;
} else {
o.presale_type = 'unknown';
o.countdown_text = w;
}
}
}
R.push(o);
}
}
return R;
}"""
async def fetch_cart_presale_items(account_id):
"""
获取指定账号购物车中的预售商品列表
对于没有 itemID 的商品通过点击图片获取跳转 URL 来提取
返回: (success, items_or_msg)
"""
if not has_auth(account_id):
return False, "账号未登录"
async with async_playwright() as p:
browser, context = await get_browser_context(
p, account_id, headless=True
)
page = await context.new_page()
await stealth_async(page)
try:
await page.goto(
CART_URL, wait_until="networkidle", timeout=20000
)
await asyncio.sleep(3)
if "login" in page.url.lower():
await browser.close()
return False, "登录态已过期,请重新登录"
if "error" in page.url.lower():
await browser.close()
return False, "购物车页面加载失败"
except Exception as e:
await browser.close()
return False, f"打开购物车失败: {e}"
# 提取基本信息
raw_items = await page.evaluate(EXTRACT_JS)
# 筛选预售商品
presale = [it for it in raw_items if it.get("is_presale")]
# 对没有 itemID 的预售商品,通过点击图片获取跳转 URL
for item in presale:
if item.get('item_id'):
continue
cid = item.get('cart_item_id', '')
if not cid:
continue
item_id = await _get_item_id_by_click(page, context, cid)
if item_id:
item['item_id'] = item_id
item['url'] = f'https://weidian.com/item.html?itemID={item_id}'
await browser.close()
return True, presale
async def _get_item_id_by_click(page, context, cart_item_id):
"""
通过点击购物车中商品的图片拦截跳转 URL 来提取 itemID
点击后会打开新 tab从新 tab URL 中提取 itemID然后关闭
"""
try:
# 定位商品图片
img_locator = page.locator(
f'.item_warp[id="{cart_item_id}"] .item_img')
if await img_locator.count() == 0:
return None
# 监听新页面打开事件
async with context.expect_page(timeout=5000) as new_page_info:
await img_locator.first.click()
new_page = await new_page_info.value
# 等待 URL 加载
await asyncio.sleep(1)
url = new_page.url
# 从 URL 提取 itemID
item_id = _extract_item_id_from_url(url)
# 关闭新 tab
await new_page.close()
return item_id
except Exception:
# 如果 expect_page 超时,可能是在当前页面跳转了
# 检查当前页面 URL
try:
current_url = page.url
item_id = _extract_item_id_from_url(current_url)
if item_id:
# 跳回购物车
await page.goto(CART_URL, wait_until="networkidle",
timeout=15000)
await asyncio.sleep(2)
return item_id
except Exception:
pass
# 确保回到购物车页面
try:
if 'new-cart' not in page.url:
await page.goto(CART_URL, wait_until="networkidle",
timeout=15000)
await asyncio.sleep(2)
except Exception:
pass
return None
def _extract_item_id_from_url(url):
"""从 URL 中提取 itemID"""
if not url:
return None
# https://weidian.com/item.html?itemID=123456
m = re.search(r'itemID=(\d+)', url, re.IGNORECASE)
if m:
return m.group(1)
# https://shop.weidian.com/item/123456
m = re.search(r'/item/(\d+)', url)
if m:
return m.group(1)
# https://weidian.com/...?id=123456
m = re.search(r'[?&]id=(\d+)', url)
if m:
return m.group(1)
return None