Files
weidian/weidian_sso_login_v4.py
openclaw def06c6360 feat: 短信验证码登录 + v4 人机协作方案
核心改动:
- weidian_sso_login_v4.py: 全新人机协作登录方案
  - Playwright 打开页面 + 自动填手机号
  - 人拖滑块(唯一需要人做的事)
  - 脚本自动拦截 ticket → 发短信
  - 人输入验证码 → 自动提交 → 保存 auth
  - 反检测: 隐藏 webdriver 标记、模拟 iPhone 设备、逐字输入
  - 多 selector 兼容(微店不同版本 DOM 结构)
  - 自动截图 debug(失败时)

- auth_service.py: 重写,集成 v4 方案
  - login_with_password(): 密码登录(全自动)
  - login_with_sms(): 短信登录(人机协作)
  - 保存 Playwright storage_state + 精简 cookies JSON

- accounts.py 路由: 新增 /login_sms/<id> 接口
  - 密码登录和短信登录两条路径
  - 状态轮询支持新的交互状态

- accounts.html 模板:
  - 新增「短信登录」按钮
  - 确认弹窗提醒用户需要浏览器交互
2026-03-31 15:18:02 +08:00

545 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
微店 SSO 登录 v4 —— 以"人机协作"为核心的最佳实践
=================================================
核心思路:腾讯滑块验证码无法被程序绕过,必须有人参与。
本方案将"人的操作"缩到最少(只需拖一次滑块、输入一次验证码),
其余全自动化。
流程:
1. Playwright 打开登录页,自动填手机号
2. 点击"获取验证码",触发腾讯滑块
3. 👆 用户在浏览器窗口拖动滑块(唯一需要人做的事)
4. 脚本自动拦截 ticket → 自动发短信
5. 👆 用户在终端输入 6 位短信验证码
6. 脚本自动提交登录 → 保存 auth_state → 导出 cookies
7. 后续业务 API 直接用 cookies不需要再开浏览器
两种使用方式:
- CLI 模式python weidian_sso_login_v4.py
- 代码调用from weidian_sso_login_v4 import WeidianLoginV4
login = WeidianLoginV4()
result = login.login("13800138000")
依赖:
pip install playwright
playwright install chromium
"""
import json
import os
import sys
import time
from datetime import datetime
from urllib.parse import quote
from typing import Optional
from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page
# ── 配置 ─────────────────────────────────────────────
SSO_LOGIN_URL = "https://sso.weidian.com/login/index.php"
SEND_SMS_URL = "https://thor.weidian.com/passport/get.vcode/2.0"
LOGIN_BY_VCODE_URL = "https://sso.weidian.com/user/loginbyvcode"
SYNC_LOGIN_URL = "https://sso.weidian.com/user/synclogin"
CAPTCHA_APPID = "2003473469"
# 保存目录
AUTH_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "auth")
class WeidianLoginV4:
"""微店 SSO 登录 v4 — 人机协作"""
def __init__(self, headless: bool = False, auth_dir: str = AUTH_DIR):
"""
Args:
headless: 是否无头模式(登录时必须 False因为要人拖滑块
auth_dir: auth 状态保存目录
"""
if headless:
print("⚠️ 登录需要人拖滑块,强制 headless=False")
headless = False
self.headless = headless
self.auth_dir = auth_dir
os.makedirs(auth_dir, exist_ok=True)
# ── 核心登录流程 ────────────────────────────────
def login(self, phone: str, country_code: str = "86",
account_id: int = 0) -> dict:
"""
完整登录流程。
Args:
phone: 手机号
country_code: 国家代码
account_id: 账号 ID用于保存 auth 文件名0 则自动生成
Returns:
{
"success": bool,
"uid": str,
"cookies": dict,
"auth_file": str,
"error": str (失败时)
}
"""
if not account_id:
account_id = int(time.time()) % 100000
print(f"\n{'='*50}")
print(f" 微店 SSO 登录 v4")
print(f" 手机号: +{country_code} {phone}")
print(f"{'='*50}\n")
pw = sync_playwright().start()
browser = pw.chromium.launch(
headless=False, # 必须有窗口,人要拖滑块
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
],
)
# 模拟真实设备,降低被识别概率
ctx = browser.new_context(
user_agent=(
"Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/18.0 Mobile/15E148 Safari/604.1"
),
viewport={"width": 390, "height": 844}, # iPhone 14 尺寸
device_scale_factor=3,
is_mobile=True,
has_touch=True,
)
# 注入反检测脚本
ctx.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.chrome = { runtime: {} };
""")
page = ctx.new_page()
# ── 拦截器 ──
captcha_state = {
"ticket": None, "randstr": None,
"sms_sent": False, "sms_error": None,
"login_success": False, "login_error": None,
}
def on_response(response):
url = response.url
# 拦截腾讯验证码验证结果
if "cap_union_new_verify" in url:
try:
data = response.json()
if data.get("errorCode") == "0":
captcha_state["ticket"] = data.get("ticket", "")
captcha_state["randstr"] = data.get("randstr", "")
print(" ✅ 滑块验证通过!")
except Exception:
pass
# 拦截短信发送结果
elif "get.vcode" in url:
try:
data = response.json()
code = str(data.get("status", {}).get("code", ""))
if code == "0":
captcha_state["sms_sent"] = True
print(" ✅ 短信验证码已发送!")
elif code not in ("", "0"):
captcha_state["sms_error"] = data.get("status", {}).get("msg", f"code={code}")
except Exception:
pass
# 拦截登录结果
elif "user/loginbyvcode" in url:
try:
data = response.json()
sc = str(data.get("status", {}).get("status_code",
data.get("status", {}).get("code", "")))
if sc == "0":
captcha_state["login_success"] = True
print(" ✅ 登录 API 返回成功!")
else:
captcha_state["login_error"] = data.get("status", {}).get("status_reason", f"code={sc}")
except Exception:
pass
page.on("response", on_response)
try:
return self._do_login_flow(page, ctx, browser, pw, phone, country_code,
account_id, captcha_state)
except Exception as e:
return {"success": False, "error": f"登录异常: {e}"}
finally:
try:
browser.close()
pw.stop()
except Exception:
pass
def _do_login_flow(self, page: Page, ctx: BrowserContext,
browser: Browser, pw, phone: str, country_code: str,
account_id: int, captcha_state: dict) -> dict:
"""内部登录流程"""
# ── Step 1: 打开登录页 ──
print("📡 Step 1: 打开微店登录页...")
page.goto(SSO_LOGIN_URL, wait_until="load", timeout=20000)
time.sleep(3)
# 点击"登录"进入表单(有些版本需要先点一下)
try:
login_btn = page.locator("#login_init_by_login")
if login_btn.count() > 0 and login_btn.is_visible():
login_btn.click()
time.sleep(2)
except Exception:
pass
# ── Step 2: 定位手机号输入框 ──
print(f"📱 Step 2: 填写手机号 +{country_code} {phone}")
# 尝试多种 selector微店不同版本 DOM 结构不同)
tele_input = None
selectors = [
"#login_autoRegiTele_input",
'input[placeholder*="手机号"]',
'input[name="phone"]',
'input[type="tel"]',
'#login_tele_input',
]
for sel in selectors:
loc = page.locator(sel)
if loc.count() > 0:
tele_input = loc.first
break
if not tele_input:
# 可能默认在密码登录 tab切换到快捷登录
try:
quick_tab = page.locator('text=短信验证登录')
if quick_tab.count() > 0:
quick_tab.click()
time.sleep(1)
else:
quick_tab = page.locator('[data-quick="1"]')
if quick_tab.count() > 0:
quick_tab.click()
time.sleep(1)
except Exception:
pass
# 再试一次
for sel in selectors:
loc = page.locator(sel)
if loc.count() > 0:
tele_input = loc.first
break
if not tele_input:
page.screenshot(path="debug_login_no_input.png")
return {"success": False, "error": "找不到手机号输入框,已截图 debug_login_no_input.png"}
tele_input.click()
tele_input.fill("")
page.keyboard.type(phone, delay=80) # 模拟逐字输入
time.sleep(0.5)
# ── Step 3: 点击获取验证码,触发滑块 ──
print("🔘 Step 3: 点击 '获取验证码'...")
code_btn = None
btn_selectors = [
"#login_quickCode_right",
"text=获取验证码",
"text=发送验证码",
'button:has-text("验证码")',
]
for sel in btn_selectors:
loc = page.locator(sel)
if loc.count() > 0 and loc.first.is_visible():
code_btn = loc.first
break
if not code_btn:
page.screenshot(path="debug_login_no_btn.png")
return {"success": False, "error": "找不到'获取验证码'按钮"}
code_btn.click()
# ── Step 4: 等待人完成滑块 ──
print()
print(" ┌──────────────────────────────────────────┐")
print(" │ 👆 请在浏览器窗口中完成滑块验证 │")
print(" │ 拖动滑块到缺口位置即可 │")
print(" │ 完成后会自动继续 │")
print(" └──────────────────────────────────────────┘")
print()
captcha_done = self._wait_for_captcha(page, captcha_state, timeout=180)
if not captcha_done:
return {"success": False, "error": "滑块验证超时180秒"}
# 滑块通过后,页面通常会自动发短信
# 等一下看看短信是否自动发送
print("⏳ 等待短信发送...")
for _ in range(15):
time.sleep(1)
if captcha_state["sms_sent"]:
break
if captcha_state["sms_error"]:
return {"success": False, "error": f"短信发送失败: {captcha_state['sms_error']}"}
if not captcha_state["sms_sent"]:
# 滑块通过但短信没自动发,手动调 API 发送
print(" ⚠️ 短信未自动发送,手动触发...")
# 用页面已有的 cookie 直接在页面上再点一次
try:
code_btn.click()
time.sleep(5)
except Exception:
pass
if not captcha_state["sms_sent"]:
print(" ⚠️ 尝试通过页面 JS 发送...")
try:
page.evaluate(f"""
fetch('{SEND_SMS_URL}', {{
method: 'POST',
headers: {{'Content-Type': 'application/x-www-form-urlencoded'}},
body: 'param=' + encodeURIComponent(JSON.stringify({{
phone: '{phone}',
countryCode: '{country_code}',
action: 'weidian',
scene: 'H5Login',
forceGraph: false
}}))
}}).then(r => r.json()).then(d => console.log('sms:', JSON.stringify(d)))
""")
time.sleep(5)
except Exception:
pass
# ── Step 5: 用户输入短信验证码 ──
print()
vcode = self._input_vcode()
if not vcode:
return {"success": False, "error": "未输入验证码"}
# ── Step 6: 填写验证码并提交 ──
print("📝 Step 6: 提交登录...")
vcode_input = None
vcode_selectors = [
"#login_quick_input",
'input[placeholder*="验证码"]',
'input[name="vcode"]',
'#login_code_input',
]
for sel in vcode_selectors:
loc = page.locator(sel)
if loc.count() > 0:
vcode_input = loc.first
break
if vcode_input:
vcode_input.fill(vcode)
time.sleep(0.3)
else:
# 直接通过 API 提交
print(" ⚠️ 找不到验证码输入框,通过 API 提交...")
page.evaluate(f"""
fetch('{LOGIN_BY_VCODE_URL}', {{
method: 'POST',
credentials: 'include',
headers: {{'Content-Type': 'application/x-www-form-urlencoded'}},
body: 'phone={phone}&countryCode={country_code}&vcode={vcode}'
}}).then(r => r.json()).then(d => console.log('login:', JSON.stringify(d)))
""")
time.sleep(8)
# 点击提交按钮
submit_btn = None
submit_selectors = [
"#login_quick_submit",
'button:has-text("登录")',
'button[type="submit"]',
]
for sel in submit_selectors:
loc = page.locator(sel)
if loc.count() > 0 and loc.first.is_visible():
submit_btn = loc.first
break
if submit_btn:
submit_btn.click()
# ── Step 7: 等待登录完成 ──
print("⏳ Step 7: 等待登录完成...")
result = self._wait_for_login(page, ctx, captcha_state, timeout=30)
if result["success"]:
# 保存 auth 状态
auth_file = self._save_auth(ctx, account_id, result)
result["auth_file"] = auth_file
print(f"\n✅ 登录成功uid={result.get('uid', '?')}")
print(f" Auth 已保存: {auth_file}")
else:
# 截图方便调试
page.screenshot(path="debug_login_failed.png")
print(f"\n❌ 登录失败: {result.get('error')}")
print(" 已截图: debug_login_failed.png")
return result
# ── 辅助方法 ────────────────────────────────────
def _wait_for_captcha(self, page: Page, state: dict, timeout: int = 180) -> bool:
"""等待滑块验证完成"""
for i in range(timeout):
time.sleep(1)
if state["ticket"]:
return True
# 检查是否短信已经直接发送成功(有些情况下不需要滑块)
if state["sms_sent"]:
print(" 无需滑块,短信已直接发送")
return True
if i == 59:
print(" ⏳ 已等待 60 秒...")
if i == 119:
print(" ⏳ 已等待 120 秒...")
return False
def _input_vcode(self) -> Optional[str]:
"""让用户在终端输入验证码,支持超时"""
try:
vcode = input(" 📨 请输入 6 位短信验证码120秒内: ").strip()
if len(vcode) == 6 and vcode.isdigit():
return vcode
if vcode:
print(f" ⚠️ 无效验证码: {vcode}需要6位数字")
return None
except (EOFError, KeyboardInterrupt):
return None
def _wait_for_login(self, page: Page, ctx: BrowserContext,
state: dict, timeout: int = 30) -> dict:
"""等待登录成功(检查 cookie 或 API 响应)"""
for i in range(timeout):
time.sleep(1)
# 方法 1: 检查 API 拦截结果
if state["login_success"]:
break
# 方法 2: 检查 cookie
cookies = {c["name"]: c["value"] for c in ctx.cookies()}
if cookies.get("is_login") == "true" and cookies.get("uid"):
state["login_success"] = True
break
# 方法 3: 检查 URL 跳转
if "weidian.com" in page.url and "login" not in page.url:
state["login_success"] = True
break
if state["login_error"]:
return {"success": False, "error": state["login_error"]}
if not state["login_success"]:
return {"success": False, "error": "登录超时,未检测到登录状态"}
# 提取 cookies
cookies = {c["name"]: c["value"] for c in ctx.cookies()}
uid = cookies.get("uid", "")
return {
"success": True,
"uid": uid,
"cookies": cookies,
"url": page.url,
}
def _save_auth(self, ctx: BrowserContext, account_id: int, result: dict) -> str:
"""保存 Playwright storage_state + 关键 cookies"""
auth_file = os.path.join(self.auth_dir, f"auth_state_{account_id}.json")
# 保存 Playwright storage_state包含所有 cookie + localStorage
ctx.storage_state(path=auth_file)
# 额外保存一份精简版 cookies给 requests 用)
cookies_file = os.path.join(self.auth_dir, f"cookies_{account_id}.json")
with open(cookies_file, "w", encoding="utf-8") as f:
json.dump({
"uid": result.get("uid", ""),
"cookies": result.get("cookies", {}),
"saved_at": datetime.now().isoformat(),
}, f, ensure_ascii=False, indent=2)
return auth_file
# ── 工具方法 ────────────────────────────────────
@staticmethod
def get_auth_file(account_id: int, auth_dir: str = AUTH_DIR) -> str:
return os.path.join(auth_dir, f"auth_state_{account_id}.json")
@staticmethod
def has_auth(account_id: int, auth_dir: str = AUTH_DIR) -> bool:
path = os.path.join(auth_dir, f"auth_state_{account_id}.json")
return os.path.exists(path) and os.path.getsize(path) > 10
@staticmethod
def get_cookies_for_requests(account_id: int, auth_dir: str = AUTH_DIR) -> dict:
"""获取给 requests 库使用的 cookies"""
path = os.path.join(auth_dir, f"cookies_{account_id}.json")
if os.path.exists(path):
with open(path, "r") as f:
data = json.load(f)
return data.get("cookies", {})
return {}
# ── CLI 入口 ─────────────────────────────────────────
def main():
print("=" * 50)
print(" 微店 SSO 登录 v4 — 人机协作")
print("=" * 50)
phone = input("\n📱 请输入手机号: ").strip()
if not phone:
print("❌ 手机号不能为空")
sys.exit(1)
account_id_input = input("📋 账号 ID可选回车跳过: ").strip()
account_id = int(account_id_input) if account_id_input.isdigit() else 0
login = WeidianLoginV4()
result = login.login(phone, account_id=account_id)
print("\n" + "=" * 50)
if result["success"]:
print(" ✅ 登录成功!")
print(f" UID: {result.get('uid', '?')}")
print(f" Auth: {result.get('auth_file', '?')}")
important = {k: v for k, v in result.get("cookies", {}).items()
if k in ("uid", "is_login", "login_source", "smart_login_type")}
print(f" 关键 Cookie: {json.dumps(important, ensure_ascii=False)}")
else:
print(f" ❌ 失败: {result.get('error', '未知')}")
print("=" * 50)
if __name__ == "__main__":
main()