Update markdown and initial code

This commit is contained in:
2026-02-02 09:27:49 +08:00
commit 7aea2ca2a8
11 changed files with 342 additions and 0 deletions

43
README.md Normal file
View File

@@ -0,0 +1,43 @@
# Weidian Snatch (微店抢购脚本)
这是一个基于 [Playwright](https://playwright.dev/) 的微店自动抢购工具,支持精确计时、自动登录、隐身模式等功能。
## 功能特性
- **自动登录**:支持保存和加载登录状态 (`auth_state.json`)。
- **精确计时**:内置 `PrecisionTimer` 进行时间同步和倒计时等待。
- **隐身模式**:使用 stealth 脚本隐藏自动化特征,降低防爬虫检测风险。
- **自动抢购**自动执行点击购买、确认规格SKU、提交订单的流程。
## 文件结构
- `main.py`: 主程序入口,包含抢购的核心逻辑。
- `config.yaml`: 配置文件,设置商品链接、抢购时间、浏览器模式等。
- `resolve_url.py`: URL 解析工具(如果有)。
- `utils/`:
- `auth.py`: 处理用户认证和 Session 管理。
- `stealth.py`: 反爬虫隐身处理。
- `timer.py`: 时间同步与控制。
## 使用方法
1. **安装依赖**
请确保已安装 Python并安装所需的库
```bash
pip install playwright pyyaml
playwright install
```
2. **配置 config.yaml**
修改 `config.yaml` 文件,填入目标商品 URL (`target_url`) 和抢购时间 (`snatch_time`)。
3. **运行脚本**
```bash
python main.py
```
如果是首次运行且无登录状态,脚本会提示登录。请手动登录后,脚本会自动保存状态供下次使用。
## 注意事项
- 请确保网络畅通,以保证时间同步和抢购请求的及时发送。
- 抢购成功率受多种因素影响(网络延迟、库存数量、平台风控等),本脚本仅辅助操作,不保证 100% 成功。

1
auth_state.json Normal file
View File

@@ -0,0 +1 @@
{"cookies": [{"name": "wdtoken", "value": "ac506552", "domain": ".weidian.com", "path": "/", "expires": -1, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "__spider__visitorid", "value": "5dbb14b15c5ee844", "domain": ".weidian.com", "path": "/", "expires": 1804428790.96553, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "v-components/clean-up-advert@private_domain", "value": "1711911458", "domain": ".weidian.com", "path": "/", "expires": -1, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "v-components/clean-up-advert@wx_app", "value": "1711911458", "domain": ".weidian.com", "path": "/", "expires": -1, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "v-components/tencent-live-plugin@wfr", "value": "BuyercopyURL", "domain": ".weidian.com", "path": "/", "expires": -1, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "__spider__sessionid", "value": "b1d5cfc65d7b5426", "domain": ".weidian.com", "path": "/", "expires": 1769870594, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "visitor_id", "value": "f3e575ae-e31b-4f84-abcc-375b58566172", "domain": ".weidian.com", "path": "/", "expires": 1772460791.264752, "httpOnly": false, "secure": false, "sameSite": "Lax"}], "origins": [{"origin": "https://shop1711911458.v.weidian.com", "localStorage": [{"name": "__kernel__owl_visit", "value": "{\"timestamp\":1769868790968,\"caches\":[{\"c\":1,\"h\":\"87b0e9a2b49f7c87\"}]}"}]}]}

9
config.yaml Normal file
View File

@@ -0,0 +1,9 @@
target_url: https://weidian.com/fastorder.html?itemID=7565508011&wfr=BuyercopyURL
item_id: '7565508011'
shop_id: ''
sku_id: ''
snatch_time: '2026-01-31 22:21:00'
headless: false
use_stealth: true
browser_type: chromium
auth_file: auth_state.json

91
main.py Normal file
View File

@@ -0,0 +1,91 @@
import asyncio
import yaml
import time
from playwright.async_api import async_playwright
from utils.stealth import stealth_async
from utils.auth import Authenticator
from utils.timer import PrecisionTimer
async def snatch(config):
auth = Authenticator(config.get("auth_file", "auth_state.json"))
timer = PrecisionTimer()
timer.sync_time()
async with async_playwright() as p:
browser, context = await auth.get_context(p, headless=config.get("headless", False))
page = await context.new_page()
await stealth_async(page)
target_url = config.get("target_url")
if not target_url:
print("错误: 未配置 target_url")
return
# 1. 预热:先打开页面
print(f"正在打开商品页面: {target_url}")
await page.goto(target_url)
# 如果未登录,可能需要在这里处理扫码
if not auth.has_auth():
print("未发现登录状态,请手动操作并将状态保存...")
# 注意login 会启动一个新的浏览器窗口
await auth.login(target_url)
# 登录完成后,我们需要关闭当前空的 context 并重新加载带有 cookie 的 context
await context.close()
await browser.close()
browser, context = await auth.get_context(p, headless=config.get("headless", False))
page = await context.new_page()
await stealth_async(page)
print("已重新加载登录状态,正在打开商品页面...")
await page.goto(target_url)
# 2. 等待抢购时间
snatch_time = config.get("snatch_time")
if snatch_time:
await timer.wait_until(snatch_time)
# 3. 抢购核心逻辑 (H5 流程)
# 注意:这里的选择器需要根据微店 H5 实际页面结构进行微调
# 这里演示一般的微店抢购流程:点击购买 -> 选择规格 -> 确定 -> 提交订单
try:
# 刷新页面以获取最新状态(可选,视具体页面倒计时逻辑而定)
# await page.reload()
# 点击“立即购买”按钮
# 微店 H5 常见的购买按钮类名类似于 .buy-btn, .footer-buy
# 我们尝试使用 text 匹配
buy_button = page.get_by_text("立即购买")
await buy_button.click()
print("点击立即购买")
# 处理 SKU 选择(如果弹出 SKU 选择框)
# 这里简单起见,如果弹出了规格选择,点击第一个选项并确定
# 实际需要根据 config 中的 sku_id 进行精准点击
confirm_btn = page.get_by_text("确定")
if await confirm_btn.is_visible():
await confirm_btn.click()
print("点击确定SKU")
# 进入确认订单页面后,点击“提交订单”
# 提交订单按钮通常在底部,文字为“提交订单”
submit_btn = page.get_by_text("提交订单")
await submit_btn.wait_for(state="visible", timeout=5000)
await submit_btn.click()
print("点击提交订单!抢购请求已发送!")
except Exception as e:
print(f"抢购过程中发生错误: {e}")
# 保持浏览器打开一段时间查看结果
await asyncio.sleep(10)
await browser.close()
def load_config():
with open("config.yaml", "r", encoding="utf-8") as f:
return yaml.safe_load(f)
if __name__ == "__main__":
config = load_config()
asyncio.run(snatch(config))

83
resolve_url.py Normal file
View File

@@ -0,0 +1,83 @@
import asyncio
from playwright.async_api import async_playwright
import re
import yaml
async def resolve_url_and_extract(short_url):
print(f"正在解析链接: {short_url}")
async with async_playwright() as p:
# 使用 headless=True 和 禁用GPU
browser = await p.chromium.launch(headless=True, args=['--disable-gpu'])
# 模拟手机有的跳转可能依赖UA
device = p.devices['iPhone 13']
context = await browser.new_context(**device)
page = await context.new_page()
try:
await page.goto(short_url)
# 等待跳转完成
await page.wait_for_load_state('networkidle')
await asyncio.sleep(2) # 额外等待确保 URL 稳定
final_url = page.url
print(f"最终链接: {final_url}")
item_id = ""
shop_id = ""
# 尝试从 URL 提取
# 常见的 param 是 itemID=xxx, shopId=xxx
# 或者路径中 /item.html?itemID=...
item_id_match = re.search(r"[?&]itemID=(\d+)", final_url, re.IGNORECASE)
if item_id_match:
item_id = item_id_match.group(1)
shop_id_match = re.search(r"[?&]shopId=(\d+)", final_url, re.IGNORECASE)
if shop_id_match:
shop_id = shop_id_match.group(1)
# 如果 URL 里没有,尝试在页面内容里找(有时候是全局变量)
if not item_id or not shop_id:
content = await page.content()
if not item_id:
# 匹配 "itemID":"123123" 或 itemID = '123123'
m = re.search(r'["\']?itemID["\']?\s*[:=]\s*["\']?(\d+)["\']?', content, re.IGNORECASE)
if m: item_id = m.group(1)
if not shop_id:
m = re.search(r'["\']?shopId["\']?\s*[:=]\s*["\']?(\d+)["\']?', content, re.IGNORECASE)
if m: shop_id = m.group(1)
print(f"解析结果 -> itemID: {item_id}, shopId: {shop_id}")
return final_url, item_id, shop_id
except Exception as e:
print(f"解析出错: {e}")
return None, None, None
finally:
await browser.close()
def update_config(url, item_id, shop_id):
config_path = "config.yaml"
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
config['target_url'] = url
if item_id:
config['item_id'] = item_id
if shop_id:
config['shop_id'] = shop_id
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(config, f, allow_unicode=True, sort_keys=False)
print("配置 config.yaml 已更新。")
if __name__ == "__main__":
# URL to resolve
target = "https://k.youshop10.com/cTO2VL6s?a=b&p=iphone&wfr=BuyercopyURL&share_relation=c03c72974993c056_1767112998_1"
final_url, item_id, shop_id = asyncio.run(resolve_url_and_extract(target))
if final_url:
update_config(final_url, item_id, shop_id)

Binary file not shown.

Binary file not shown.

Binary file not shown.

57
utils/auth.py Normal file
View File

@@ -0,0 +1,57 @@
import asyncio
import json
import os
from playwright.async_api import async_playwright
from utils.stealth import stealth_async
class Authenticator:
def __init__(self, auth_file="auth_state.json"):
self.auth_file = auth_file
async def login(self, initial_url="https://weidian.com/"):
"""
打开浏览器由用户手动扫码登录。
登录成功后保存 session state。
"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False, args=['--disable-gpu'])
# 模拟 iPhone 13 配置以确保显示手机版界面
device = p.devices['iPhone 13']
context = await browser.new_context(**device)
page = await context.new_page()
await stealth_async(page)
await page.goto(initial_url)
print("请在浏览器中完成登录(扫码等操作)...")
# 等待用户手动登录,直到 cookie 中出现登录凭证或 URL 变化
# 这里简单处理,等待用户按回车确认已登录
await asyncio.get_event_loop().run_in_executor(None, input, "登录完成后请在这里按回车控制台继续...")
# 保存状态
storage = await context.storage_state(path=self.auth_file)
print(f"登录状态已保存至 {self.auth_file}")
await browser.close()
def has_auth(self):
return os.path.exists(self.auth_file)
async def get_context(self, playwright_instance, headless=False):
"""
创建一个带有已保存状态的 context并模拟手机环境
"""
browser = await playwright_instance.chromium.launch(headless=headless, args=['--disable-gpu'])
# 模拟 iPhone 13 配置
device = playwright_instance.devices['iPhone 13']
if self.has_auth():
context = await browser.new_context(
**device,
storage_state=self.auth_file
)
else:
context = await browser.new_context(**device)
return browser, context

9
utils/stealth.py Normal file
View File

@@ -0,0 +1,9 @@
from playwright_stealth import Stealth
async def stealth_async(page):
"""
Apply stealth settings to the page.
Wrapper around Stealth().apply_stealth_async(page)
"""
stealth = Stealth()
await stealth.apply_stealth_async(page)

49
utils/timer.py Normal file
View File

@@ -0,0 +1,49 @@
import time
import asyncio
import ntplib
from datetime import datetime
class PrecisionTimer:
def __init__(self):
self.offset = 0 # 服务器时间 - 本地时间
def sync_time(self):
"""
同步 NTP 时间,计算偏移量
"""
try:
client = ntplib.NTPClient()
response = client.request('pool.ntp.org', version=3)
self.offset = response.tx_time - time.time()
print(f"时间同步完成,偏移量: {self.offset:.3f}s")
except Exception as e:
print(f"NTP同步失败: {e},将使用系统时间")
def get_server_time(self):
return time.time() + self.offset
async def wait_until(self, target_time_str):
"""
等待直到目标时间 (格式: 2026-02-01 10:00:00)
"""
target_dt = datetime.strptime(target_time_str, "%Y-%m-%d %H:%M:%S")
target_timestamp = target_dt.timestamp()
print(f"正在等待目标时间: {target_time_str}")
while True:
current_time = self.get_server_time()
remaining = target_timestamp - current_time
if remaining <= 0:
print("目标时间已到!触发抢购!")
break
# 动态调整调整休眠时间以节省 CPU 并保持精度
if remaining > 1:
await asyncio.sleep(remaining - 0.5)
elif remaining > 0.1:
await asyncio.sleep(0.01)
else:
# 最后一刻进入忙等以获取最高精度
pass