Files
weidian/server/services/auth_service.py
Jeason fa788bf3fc fix: _save_cookies_file改用async版本 + 加eventlet支持WebSocket
- 密码登录保存cookie时用await _save_cookies_file_async替代同步版
- 加eventlet依赖解决WebSocket 500错误
2026-04-02 13:07:25 +08:00

419 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
登录态管理服务 —— 基于 v4 人机协作登录
支持两种登录方式:
1. 短信验证码登录(需要人拖滑块 + 输入验证码)
2. 账号密码登录(纯 Playwright 自动化,无需人干预)
"""
import asyncio
import json
import os
import time
from datetime import datetime
from playwright.async_api import async_playwright
from utils.stealth import stealth_async
AUTH_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'auth')
# ── 微店 SSO 端点 ──
SSO_LOGIN_URL = "https://sso.weidian.com/login/index.php"
SEND_SMS_URL = "https://thor.weidian.com/passport/get.vcode/2.0"
LOGIN_BY_VCODE_URL = "https://sso.weidian.com/user/loginbyvcode"
LOGIN_BY_PWD_URL = "https://sso.weidian.com/user/login"
CAPTCHA_APPID = "2003473469"
def get_auth_path(account_id):
os.makedirs(AUTH_DIR, exist_ok=True)
return os.path.join(AUTH_DIR, f'auth_state_{account_id}.json')
def get_cookies_path(account_id):
os.makedirs(AUTH_DIR, exist_ok=True)
return os.path.join(AUTH_DIR, f'cookies_{account_id}.json')
def has_auth(account_id):
path = get_auth_path(account_id)
return os.path.exists(path) and os.path.getsize(path) > 10
def get_cookies_for_requests(account_id) -> dict:
"""获取给 requests/aiohttp 使用的 cookies"""
path = get_cookies_path(account_id)
if os.path.exists(path):
with open(path, "r") as f:
data = json.load(f)
return data.get("cookies", {})
return {}
async def login_with_password(account_id, phone, password):
"""
账号密码登录 — 纯 Playwright 自动化,无需人参与。
流程:
1. 打开登录页 → 点"登录" → 切到"账号密码" tab
2. 填手机号、密码 → 点击登录
3. 监听 /user/login 响应提取 cookie
4. 保存 storage_state
"""
login_result = {'success': False, 'msg': '登录超时', 'cookies': []}
p = await async_playwright().start()
browser = await p.chromium.launch(
headless=True, args=[
'--disable-gpu', '--no-sandbox',
'--disable-blink-features=AutomationControlled',
]
)
device = p.devices['iPhone 13']
context = await browser.new_context(**device)
page = await context.new_page()
await stealth_async(page)
# 反检测
await page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
""")
# 监听登录接口响应
async def on_response(response):
if 'user/login' in response.url and response.status == 200:
try:
data = await response.json()
status = data.get('status', {})
if status.get('status_code') == 0:
login_result['success'] = True
login_result['msg'] = '登录成功'
login_result['cookies'] = data.get('result', {}).get('cookie', [])
else:
login_result['msg'] = f"登录失败: {status.get('status_reason', '未知错误')}"
except Exception as e:
login_result['msg'] = f"解析登录响应失败: {e}"
page.on("response", on_response)
try:
await page.goto(SSO_LOGIN_URL, wait_until='networkidle', timeout=15000)
await asyncio.sleep(1)
# 点击"登录"进入表单
try:
await page.locator('#login_init_by_login').click(timeout=5000)
await asyncio.sleep(1.5)
except Exception:
pass
# 点击"账号密码登录" tab
try:
await page.locator('h4.login_content_h4 span', has_text="账号密码登录").click(timeout=3000)
await asyncio.sleep(0.5)
except Exception:
pass
# 填写手机号
phone_input = page.locator('input[placeholder*="手机号"]').first
await phone_input.click()
await phone_input.fill("")
await page.keyboard.type(phone, delay=50)
# 填写密码
pwd_input = page.locator('input[placeholder*="登录密码"], input[type="password"]').first
await pwd_input.click()
await pwd_input.fill("")
await page.keyboard.type(password, delay=50)
await asyncio.sleep(0.5)
# 点击登录
await page.locator('#login_pwd_submit').click(timeout=5000)
# 等待 API 响应
await asyncio.sleep(5)
if login_result['success'] and login_result['cookies']:
# 从 API 响应中提取 cookie写入 context
for c in login_result['cookies']:
await context.add_cookies([{
"name": c.get("name", ""),
"value": c.get("value", ""),
"domain": c.get("domain", ".weidian.com"),
"path": c.get("path", "/"),
"httpOnly": c.get("httpOnly", False),
"secure": c.get("secure", False),
"sameSite": "Lax",
}])
# 保存 storage_state + 精简 cookies
auth_path = get_auth_path(account_id)
await context.storage_state(path=auth_path)
await _save_cookies_file_async(context, account_id)
return True, "登录成功"
return False, login_result['msg']
except Exception as e:
return False, f"登录过程出错: {e}"
finally:
await browser.close()
await p.stop()
async def login_with_sms(phone, account_id=0):
"""
短信验证码登录 — 人机协作模式。
流程:
1. Playwright 打开登录页,自动填手机号
2. 点击"获取验证码"触发腾讯滑块
3. 👆 用户在浏览器窗口拖动滑块
4. 脚本拦截 ticket 并自动发短信
5. 👆 用户输入 6 位验证码
6. 脚本自动提交登录
7. 保存 auth 状态
注意:此函数会弹出浏览器窗口,需要人在终端和浏览器之间交互。
Returns: (success: bool, msg: str)
"""
if not account_id:
account_id = int(time.time()) % 100000
captcha_state = {
"ticket": None, "randstr": None,
"sms_sent": False, "sms_error": None,
"login_success": False, "login_error": None,
}
p = await async_playwright().start()
browser = await p.chromium.launch(
headless=False, # 必须有窗口
args=[
'--no-sandbox',
'--disable-blink-features=AutomationControlled',
]
)
context = await browser.new_context(
user_agent=(
"Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/18.0 Mobile/15E148 Safari/604.1"
),
viewport={"width": 390, "height": 844},
device_scale_factor=3,
is_mobile=True,
has_touch=True,
)
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
""")
page = await context.new_page()
# 拦截器
async def on_response(response):
url = response.url
if "cap_union_new_verify" in url:
try:
data = await response.json()
if data.get("errorCode") == "0":
captcha_state["ticket"] = data.get("ticket", "")
captcha_state["randstr"] = data.get("randstr", "")
print(" ✅ 滑块验证通过!")
except Exception:
pass
elif "get.vcode" in url:
try:
data = await response.json()
code = str(data.get("status", {}).get("code", ""))
if code == "0":
captcha_state["sms_sent"] = True
print(" ✅ 短信验证码已发送!")
elif code not in ("", "0"):
captcha_state["sms_error"] = data.get("status", {}).get("msg", f"code={code}")
except Exception:
pass
elif "user/loginbyvcode" in url:
try:
data = await response.json()
sc = str(data.get("status", {}).get("status_code",
data.get("status", {}).get("code", "")))
if sc == "0":
captcha_state["login_success"] = True
print(" ✅ 登录成功!")
else:
captcha_state["login_error"] = data.get("status", {}).get("status_reason", f"code={sc}")
except Exception:
pass
page.on("response", on_response)
try:
print(f"\n{'='*50}")
print(f" 微店短信验证码登录")
print(f" 手机号: {phone}")
print(f"{'='*50}\n")
# 打开登录页
print("📡 打开登录页...")
await page.goto(SSO_LOGIN_URL, wait_until="load", timeout=20000)
await asyncio.sleep(3)
# 点击"登录"
try:
btn = page.locator("#login_init_by_login")
if await btn.count() > 0 and await btn.is_visible():
await btn.click()
await asyncio.sleep(2)
except Exception:
pass
# 填手机号
print(f"📱 填写手机号...")
tele = page.locator('input[placeholder*="手机号"]').first
await tele.click()
await tele.fill("")
await page.keyboard.type(phone, delay=80)
await asyncio.sleep(0.5)
# 点击获取验证码
print("🔘 点击 '获取验证码'...")
code_btn = page.locator("#login_quickCode_right, text=获取验证码").first
await code_btn.click()
# 等待滑块
print()
print(" ┌──────────────────────────────────────────┐")
print(" │ 👆 请在浏览器窗口中完成滑块验证 │")
print(" │ 拖动滑块到缺口位置即可 │")
print(" └──────────────────────────────────────────┘")
print()
for i in range(180):
await asyncio.sleep(1)
if captcha_state["ticket"] or captcha_state["sms_sent"]:
break
if captcha_state["sms_error"]:
return False, f"短信发送失败: {captcha_state['sms_error']}"
if i == 59:
print(" ⏳ 已等待 60 秒...")
if i == 119:
print(" ⏳ 已等待 120 秒...")
else:
return False, "滑块验证超时180秒"
# 等待短信
print("⏳ 等待短信发送...")
for _ in range(10):
await asyncio.sleep(1)
if captcha_state["sms_sent"]:
break
# 输入验证码
print()
try:
import sys
loop = asyncio.get_event_loop()
vcode = await loop.run_in_executor(None, lambda: input(" 📨 请输入 6 位短信验证码: ").strip())
except (EOFError, KeyboardInterrupt):
return False, "用户取消"
if len(vcode) != 6 or not vcode.isdigit():
return False, f"无效验证码需要6位数字"
# 填写并提交
print("📝 提交登录...")
vcode_input = page.locator("#login_quick_input, input[placeholder*='验证码']").first
await vcode_input.fill(vcode)
await asyncio.sleep(0.3)
submit = page.locator("#login_quick_submit, button:has-text('登录')").first
await submit.click()
# 等待登录
print("⏳ 等待登录...")
for i in range(30):
await asyncio.sleep(1)
if captcha_state["login_success"]:
break
if captcha_state["login_error"]:
return False, captcha_state["login_error"]
# 检查 cookie
cookies = await context.cookies()
cookie_map = {c["name"]: c["value"] for c in cookies}
if cookie_map.get("is_login") == "true" and cookie_map.get("uid"):
captcha_state["login_success"] = True
break
if not captcha_state["login_success"]:
return False, "登录超时"
# 保存
auth_path = get_auth_path(account_id)
await context.storage_state(path=auth_path)
await _save_cookies_file_async(context, account_id)
print(f"\n✅ 登录成功Auth 已保存: {auth_path}")
return True, "登录成功"
except Exception as e:
return False, f"登录异常: {e}"
finally:
try:
await browser.close()
await p.stop()
except Exception:
pass
def _save_cookies_file(account_id, context):
"""同步版:保存精简 cookies"""
try:
cookies = context.cookies() if hasattr(context, 'cookies') else []
cookie_map = {c["name"]: c["value"] for c in cookies}
path = get_cookies_path(account_id)
with open(path, "w", encoding="utf-8") as f:
json.dump({
"uid": cookie_map.get("uid", ""),
"cookies": cookie_map,
"saved_at": datetime.now().isoformat(),
}, f, ensure_ascii=False, indent=2)
except Exception:
pass
async def _save_cookies_file_async(context, account_id):
"""异步版:保存精简 cookies"""
try:
cookies = await context.cookies()
cookie_map = {c["name"]: c["value"] for c in cookies}
path = get_cookies_path(account_id)
with open(path, "w", encoding="utf-8") as f:
json.dump({
"uid": cookie_map.get("uid", ""),
"cookies": cookie_map,
"saved_at": datetime.now().isoformat(),
}, f, ensure_ascii=False, indent=2)
except Exception:
pass
async def get_browser_context(playwright_instance, account_id, headless=True):
"""创建带有已保存登录状态的浏览器上下文"""
browser = await playwright_instance.chromium.launch(
headless=headless, args=[
'--disable-gpu', '--no-sandbox',
'--disable-blink-features=AutomationControlled',
]
)
device = playwright_instance.devices['iPhone 13']
auth_path = get_auth_path(account_id)
if has_auth(account_id):
context = await browser.new_context(**device, storage_state=auth_path)
else:
context = await browser.new_context(**device)
return browser, context