commit 7aea2ca2a8cbff45cf62f0ffa08e37467fabc9dd Author: Zhaojie Date: Mon Feb 2 09:27:49 2026 +0800 Update markdown and initial code diff --git a/README.md b/README.md new file mode 100644 index 0000000..edde67d --- /dev/null +++ b/README.md @@ -0,0 +1,43 @@ +# Weidian Snatch (微店抢购脚本) + +这是一个基于 [Playwright](https://playwright.dev/) 的微店自动抢购工具,支持精确计时、自动登录、隐身模式等功能。 + +## 功能特性 + +- **自动登录**:支持保存和加载登录状态 (`auth_state.json`)。 +- **精确计时**:内置 `PrecisionTimer` 进行时间同步和倒计时等待。 +- **隐身模式**:使用 stealth 脚本隐藏自动化特征,降低防爬虫检测风险。 +- **自动抢购**:自动执行点击购买、确认规格(SKU)、提交订单的流程。 + +## 文件结构 + +- `main.py`: 主程序入口,包含抢购的核心逻辑。 +- `config.yaml`: 配置文件,设置商品链接、抢购时间、浏览器模式等。 +- `resolve_url.py`: URL 解析工具(如果有)。 +- `utils/`: + - `auth.py`: 处理用户认证和 Session 管理。 + - `stealth.py`: 反爬虫隐身处理。 + - `timer.py`: 时间同步与控制。 + +## 使用方法 + +1. **安装依赖** + 请确保已安装 Python,并安装所需的库: + ```bash + pip install playwright pyyaml + playwright install + ``` + +2. **配置 config.yaml** + 修改 `config.yaml` 文件,填入目标商品 URL (`target_url`) 和抢购时间 (`snatch_time`)。 + +3. **运行脚本** + ```bash + python main.py + ``` + 如果是首次运行且无登录状态,脚本会提示登录。请手动登录后,脚本会自动保存状态供下次使用。 + +## 注意事项 + +- 请确保网络畅通,以保证时间同步和抢购请求的及时发送。 +- 抢购成功率受多种因素影响(网络延迟、库存数量、平台风控等),本脚本仅辅助操作,不保证 100% 成功。 diff --git a/auth_state.json b/auth_state.json new file mode 100644 index 0000000..974c3f8 --- /dev/null +++ b/auth_state.json @@ -0,0 +1 @@ +{"cookies": [{"name": "wdtoken", "value": "ac506552", "domain": ".weidian.com", "path": "/", "expires": -1, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "__spider__visitorid", "value": "5dbb14b15c5ee844", "domain": ".weidian.com", "path": "/", "expires": 1804428790.96553, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "v-components/clean-up-advert@private_domain", "value": "1711911458", "domain": ".weidian.com", "path": "/", "expires": -1, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "v-components/clean-up-advert@wx_app", "value": "1711911458", "domain": ".weidian.com", "path": "/", "expires": -1, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "v-components/tencent-live-plugin@wfr", "value": "BuyercopyURL", "domain": ".weidian.com", "path": "/", "expires": -1, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "__spider__sessionid", "value": "b1d5cfc65d7b5426", "domain": ".weidian.com", "path": "/", "expires": 1769870594, "httpOnly": false, "secure": false, "sameSite": "Lax"}, {"name": "visitor_id", "value": "f3e575ae-e31b-4f84-abcc-375b58566172", "domain": ".weidian.com", "path": "/", "expires": 1772460791.264752, "httpOnly": false, "secure": false, "sameSite": "Lax"}], "origins": [{"origin": "https://shop1711911458.v.weidian.com", "localStorage": [{"name": "__kernel__owl_visit", "value": "{\"timestamp\":1769868790968,\"caches\":[{\"c\":1,\"h\":\"87b0e9a2b49f7c87\"}]}"}]}]} \ No newline at end of file diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..96194f5 --- /dev/null +++ b/config.yaml @@ -0,0 +1,9 @@ +target_url: https://weidian.com/fastorder.html?itemID=7565508011&wfr=BuyercopyURL +item_id: '7565508011' +shop_id: '' +sku_id: '' +snatch_time: '2026-01-31 22:21:00' +headless: false +use_stealth: true +browser_type: chromium +auth_file: auth_state.json diff --git a/main.py b/main.py new file mode 100644 index 0000000..e56620b --- /dev/null +++ b/main.py @@ -0,0 +1,91 @@ +import asyncio +import yaml +import time +from playwright.async_api import async_playwright +from utils.stealth import stealth_async +from utils.auth import Authenticator +from utils.timer import PrecisionTimer + +async def snatch(config): + auth = Authenticator(config.get("auth_file", "auth_state.json")) + timer = PrecisionTimer() + timer.sync_time() + + async with async_playwright() as p: + browser, context = await auth.get_context(p, headless=config.get("headless", False)) + page = await context.new_page() + await stealth_async(page) + + target_url = config.get("target_url") + if not target_url: + print("错误: 未配置 target_url") + return + + # 1. 预热:先打开页面 + print(f"正在打开商品页面: {target_url}") + await page.goto(target_url) + + # 如果未登录,可能需要在这里处理扫码 + if not auth.has_auth(): + print("未发现登录状态,请手动操作并将状态保存...") + # 注意:login 会启动一个新的浏览器窗口 + await auth.login(target_url) + + # 登录完成后,我们需要关闭当前空的 context 并重新加载带有 cookie 的 context + await context.close() + await browser.close() + browser, context = await auth.get_context(p, headless=config.get("headless", False)) + page = await context.new_page() + await stealth_async(page) + + print("已重新加载登录状态,正在打开商品页面...") + await page.goto(target_url) + + # 2. 等待抢购时间 + snatch_time = config.get("snatch_time") + if snatch_time: + await timer.wait_until(snatch_time) + + # 3. 抢购核心逻辑 (H5 流程) + # 注意:这里的选择器需要根据微店 H5 实际页面结构进行微调 + # 这里演示一般的微店抢购流程:点击购买 -> 选择规格 -> 确定 -> 提交订单 + try: + # 刷新页面以获取最新状态(可选,视具体页面倒计时逻辑而定) + # await page.reload() + + # 点击“立即购买”按钮 + # 微店 H5 常见的购买按钮类名类似于 .buy-btn, .footer-buy + # 我们尝试使用 text 匹配 + buy_button = page.get_by_text("立即购买") + await buy_button.click() + print("点击立即购买") + + # 处理 SKU 选择(如果弹出 SKU 选择框) + # 这里简单起见,如果弹出了规格选择,点击第一个选项并确定 + # 实际需要根据 config 中的 sku_id 进行精准点击 + confirm_btn = page.get_by_text("确定") + if await confirm_btn.is_visible(): + await confirm_btn.click() + print("点击确定(SKU)") + + # 进入确认订单页面后,点击“提交订单” + # 提交订单按钮通常在底部,文字为“提交订单” + submit_btn = page.get_by_text("提交订单") + await submit_btn.wait_for(state="visible", timeout=5000) + await submit_btn.click() + print("点击提交订单!抢购请求已发送!") + + except Exception as e: + print(f"抢购过程中发生错误: {e}") + + # 保持浏览器打开一段时间查看结果 + await asyncio.sleep(10) + await browser.close() + +def load_config(): + with open("config.yaml", "r", encoding="utf-8") as f: + return yaml.safe_load(f) + +if __name__ == "__main__": + config = load_config() + asyncio.run(snatch(config)) diff --git a/resolve_url.py b/resolve_url.py new file mode 100644 index 0000000..3c446b4 --- /dev/null +++ b/resolve_url.py @@ -0,0 +1,83 @@ +import asyncio +from playwright.async_api import async_playwright +import re +import yaml + +async def resolve_url_and_extract(short_url): + print(f"正在解析链接: {short_url}") + async with async_playwright() as p: + # 使用 headless=True 和 禁用GPU + browser = await p.chromium.launch(headless=True, args=['--disable-gpu']) + # 模拟手机,有的跳转可能依赖UA + device = p.devices['iPhone 13'] + context = await browser.new_context(**device) + page = await context.new_page() + + try: + await page.goto(short_url) + # 等待跳转完成 + await page.wait_for_load_state('networkidle') + await asyncio.sleep(2) # 额外等待确保 URL 稳定 + + final_url = page.url + print(f"最终链接: {final_url}") + + item_id = "" + shop_id = "" + + # 尝试从 URL 提取 + # 常见的 param 是 itemID=xxx, shopId=xxx + # 或者路径中 /item.html?itemID=... + + item_id_match = re.search(r"[?&]itemID=(\d+)", final_url, re.IGNORECASE) + if item_id_match: + item_id = item_id_match.group(1) + + shop_id_match = re.search(r"[?&]shopId=(\d+)", final_url, re.IGNORECASE) + if shop_id_match: + shop_id = shop_id_match.group(1) + + # 如果 URL 里没有,尝试在页面内容里找(有时候是全局变量) + if not item_id or not shop_id: + content = await page.content() + if not item_id: + # 匹配 "itemID":"123123" 或 itemID = '123123' + m = re.search(r'["\']?itemID["\']?\s*[:=]\s*["\']?(\d+)["\']?', content, re.IGNORECASE) + if m: item_id = m.group(1) + + if not shop_id: + m = re.search(r'["\']?shopId["\']?\s*[:=]\s*["\']?(\d+)["\']?', content, re.IGNORECASE) + if m: shop_id = m.group(1) + + print(f"解析结果 -> itemID: {item_id}, shopId: {shop_id}") + return final_url, item_id, shop_id + + except Exception as e: + print(f"解析出错: {e}") + return None, None, None + finally: + await browser.close() + +def update_config(url, item_id, shop_id): + config_path = "config.yaml" + with open(config_path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + + config['target_url'] = url + if item_id: + config['item_id'] = item_id + if shop_id: + config['shop_id'] = shop_id + + with open(config_path, "w", encoding="utf-8") as f: + yaml.dump(config, f, allow_unicode=True, sort_keys=False) + print("配置 config.yaml 已更新。") + +if __name__ == "__main__": + # URL to resolve + target = "https://k.youshop10.com/cTO2VL6s?a=b&p=iphone&wfr=BuyercopyURL&share_relation=c03c72974993c056_1767112998_1" + + final_url, item_id, shop_id = asyncio.run(resolve_url_and_extract(target)) + + if final_url: + update_config(final_url, item_id, shop_id) diff --git a/utils/__pycache__/auth.cpython-313.pyc b/utils/__pycache__/auth.cpython-313.pyc new file mode 100644 index 0000000..546f621 Binary files /dev/null and b/utils/__pycache__/auth.cpython-313.pyc differ diff --git a/utils/__pycache__/stealth.cpython-313.pyc b/utils/__pycache__/stealth.cpython-313.pyc new file mode 100644 index 0000000..b836b45 Binary files /dev/null and b/utils/__pycache__/stealth.cpython-313.pyc differ diff --git a/utils/__pycache__/timer.cpython-313.pyc b/utils/__pycache__/timer.cpython-313.pyc new file mode 100644 index 0000000..cf14d4d Binary files /dev/null and b/utils/__pycache__/timer.cpython-313.pyc differ diff --git a/utils/auth.py b/utils/auth.py new file mode 100644 index 0000000..f75b2dd --- /dev/null +++ b/utils/auth.py @@ -0,0 +1,57 @@ +import asyncio +import json +import os +from playwright.async_api import async_playwright +from utils.stealth import stealth_async + +class Authenticator: + def __init__(self, auth_file="auth_state.json"): + self.auth_file = auth_file + + async def login(self, initial_url="https://weidian.com/"): + """ + 打开浏览器由用户手动扫码登录。 + 登录成功后保存 session state。 + """ + async with async_playwright() as p: + browser = await p.chromium.launch(headless=False, args=['--disable-gpu']) + # 模拟 iPhone 13 配置以确保显示手机版界面 + device = p.devices['iPhone 13'] + context = await browser.new_context(**device) + page = await context.new_page() + await stealth_async(page) + + await page.goto(initial_url) + print("请在浏览器中完成登录(扫码等操作)...") + + # 等待用户手动登录,直到 cookie 中出现登录凭证或 URL 变化 + # 这里简单处理,等待用户按回车确认已登录 + await asyncio.get_event_loop().run_in_executor(None, input, "登录完成后请在这里按回车控制台继续...") + + # 保存状态 + storage = await context.storage_state(path=self.auth_file) + print(f"登录状态已保存至 {self.auth_file}") + + await browser.close() + + def has_auth(self): + return os.path.exists(self.auth_file) + + async def get_context(self, playwright_instance, headless=False): + """ + 创建一个带有已保存状态的 context,并模拟手机环境 + """ + browser = await playwright_instance.chromium.launch(headless=headless, args=['--disable-gpu']) + + # 模拟 iPhone 13 配置 + device = playwright_instance.devices['iPhone 13'] + + if self.has_auth(): + context = await browser.new_context( + **device, + storage_state=self.auth_file + ) + else: + context = await browser.new_context(**device) + + return browser, context diff --git a/utils/stealth.py b/utils/stealth.py new file mode 100644 index 0000000..6aa1977 --- /dev/null +++ b/utils/stealth.py @@ -0,0 +1,9 @@ +from playwright_stealth import Stealth + +async def stealth_async(page): + """ + Apply stealth settings to the page. + Wrapper around Stealth().apply_stealth_async(page) + """ + stealth = Stealth() + await stealth.apply_stealth_async(page) diff --git a/utils/timer.py b/utils/timer.py new file mode 100644 index 0000000..95e6bc4 --- /dev/null +++ b/utils/timer.py @@ -0,0 +1,49 @@ +import time +import asyncio +import ntplib +from datetime import datetime + +class PrecisionTimer: + def __init__(self): + self.offset = 0 # 服务器时间 - 本地时间 + + def sync_time(self): + """ + 同步 NTP 时间,计算偏移量 + """ + try: + client = ntplib.NTPClient() + response = client.request('pool.ntp.org', version=3) + self.offset = response.tx_time - time.time() + print(f"时间同步完成,偏移量: {self.offset:.3f}s") + except Exception as e: + print(f"NTP同步失败: {e},将使用系统时间") + + def get_server_time(self): + return time.time() + self.offset + + async def wait_until(self, target_time_str): + """ + 等待直到目标时间 (格式: 2026-02-01 10:00:00) + """ + target_dt = datetime.strptime(target_time_str, "%Y-%m-%d %H:%M:%S") + target_timestamp = target_dt.timestamp() + + print(f"正在等待目标时间: {target_time_str}") + + while True: + current_time = self.get_server_time() + remaining = target_timestamp - current_time + + if remaining <= 0: + print("目标时间已到!触发抢购!") + break + + # 动态调整调整休眠时间以节省 CPU 并保持精度 + if remaining > 1: + await asyncio.sleep(remaining - 0.5) + elif remaining > 0.1: + await asyncio.sleep(0.01) + else: + # 最后一刻进入忙等以获取最高精度 + pass