- 密码登录保存cookie时用await _save_cookies_file_async替代同步版 - 加eventlet依赖解决WebSocket 500错误
419 lines
14 KiB
Python
419 lines
14 KiB
Python
"""
|
||
登录态管理服务 —— 基于 v4 人机协作登录
|
||
支持两种登录方式:
|
||
1. 短信验证码登录(需要人拖滑块 + 输入验证码)
|
||
2. 账号密码登录(纯 Playwright 自动化,无需人干预)
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import time
|
||
from datetime import datetime
|
||
|
||
from playwright.async_api import async_playwright
|
||
from utils.stealth import stealth_async
|
||
|
||
AUTH_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'auth')
|
||
|
||
# ── 微店 SSO 端点 ──
|
||
SSO_LOGIN_URL = "https://sso.weidian.com/login/index.php"
|
||
SEND_SMS_URL = "https://thor.weidian.com/passport/get.vcode/2.0"
|
||
LOGIN_BY_VCODE_URL = "https://sso.weidian.com/user/loginbyvcode"
|
||
LOGIN_BY_PWD_URL = "https://sso.weidian.com/user/login"
|
||
CAPTCHA_APPID = "2003473469"
|
||
|
||
|
||
def get_auth_path(account_id):
|
||
os.makedirs(AUTH_DIR, exist_ok=True)
|
||
return os.path.join(AUTH_DIR, f'auth_state_{account_id}.json')
|
||
|
||
|
||
def get_cookies_path(account_id):
|
||
os.makedirs(AUTH_DIR, exist_ok=True)
|
||
return os.path.join(AUTH_DIR, f'cookies_{account_id}.json')
|
||
|
||
|
||
def has_auth(account_id):
|
||
path = get_auth_path(account_id)
|
||
return os.path.exists(path) and os.path.getsize(path) > 10
|
||
|
||
|
||
def get_cookies_for_requests(account_id) -> dict:
|
||
"""获取给 requests/aiohttp 使用的 cookies"""
|
||
path = get_cookies_path(account_id)
|
||
if os.path.exists(path):
|
||
with open(path, "r") as f:
|
||
data = json.load(f)
|
||
return data.get("cookies", {})
|
||
return {}
|
||
|
||
|
||
async def login_with_password(account_id, phone, password):
|
||
"""
|
||
账号密码登录 — 纯 Playwright 自动化,无需人参与。
|
||
流程:
|
||
1. 打开登录页 → 点"登录" → 切到"账号密码" tab
|
||
2. 填手机号、密码 → 点击登录
|
||
3. 监听 /user/login 响应提取 cookie
|
||
4. 保存 storage_state
|
||
"""
|
||
login_result = {'success': False, 'msg': '登录超时', 'cookies': []}
|
||
|
||
p = await async_playwright().start()
|
||
browser = await p.chromium.launch(
|
||
headless=True, args=[
|
||
'--disable-gpu', '--no-sandbox',
|
||
'--disable-blink-features=AutomationControlled',
|
||
]
|
||
)
|
||
device = p.devices['iPhone 13']
|
||
context = await browser.new_context(**device)
|
||
page = await context.new_page()
|
||
await stealth_async(page)
|
||
|
||
# 反检测
|
||
await page.add_init_script("""
|
||
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
|
||
""")
|
||
|
||
# 监听登录接口响应
|
||
async def on_response(response):
|
||
if 'user/login' in response.url and response.status == 200:
|
||
try:
|
||
data = await response.json()
|
||
status = data.get('status', {})
|
||
if status.get('status_code') == 0:
|
||
login_result['success'] = True
|
||
login_result['msg'] = '登录成功'
|
||
login_result['cookies'] = data.get('result', {}).get('cookie', [])
|
||
else:
|
||
login_result['msg'] = f"登录失败: {status.get('status_reason', '未知错误')}"
|
||
except Exception as e:
|
||
login_result['msg'] = f"解析登录响应失败: {e}"
|
||
|
||
page.on("response", on_response)
|
||
|
||
try:
|
||
await page.goto(SSO_LOGIN_URL, wait_until='networkidle', timeout=15000)
|
||
await asyncio.sleep(1)
|
||
|
||
# 点击"登录"进入表单
|
||
try:
|
||
await page.locator('#login_init_by_login').click(timeout=5000)
|
||
await asyncio.sleep(1.5)
|
||
except Exception:
|
||
pass
|
||
|
||
# 点击"账号密码登录" tab
|
||
try:
|
||
await page.locator('h4.login_content_h4 span', has_text="账号密码登录").click(timeout=3000)
|
||
await asyncio.sleep(0.5)
|
||
except Exception:
|
||
pass
|
||
|
||
# 填写手机号
|
||
phone_input = page.locator('input[placeholder*="手机号"]').first
|
||
await phone_input.click()
|
||
await phone_input.fill("")
|
||
await page.keyboard.type(phone, delay=50)
|
||
|
||
# 填写密码
|
||
pwd_input = page.locator('input[placeholder*="登录密码"], input[type="password"]').first
|
||
await pwd_input.click()
|
||
await pwd_input.fill("")
|
||
await page.keyboard.type(password, delay=50)
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 点击登录
|
||
await page.locator('#login_pwd_submit').click(timeout=5000)
|
||
|
||
# 等待 API 响应
|
||
await asyncio.sleep(5)
|
||
|
||
if login_result['success'] and login_result['cookies']:
|
||
# 从 API 响应中提取 cookie,写入 context
|
||
for c in login_result['cookies']:
|
||
await context.add_cookies([{
|
||
"name": c.get("name", ""),
|
||
"value": c.get("value", ""),
|
||
"domain": c.get("domain", ".weidian.com"),
|
||
"path": c.get("path", "/"),
|
||
"httpOnly": c.get("httpOnly", False),
|
||
"secure": c.get("secure", False),
|
||
"sameSite": "Lax",
|
||
}])
|
||
|
||
# 保存 storage_state + 精简 cookies
|
||
auth_path = get_auth_path(account_id)
|
||
await context.storage_state(path=auth_path)
|
||
await _save_cookies_file_async(context, account_id)
|
||
|
||
return True, "登录成功"
|
||
|
||
return False, login_result['msg']
|
||
|
||
except Exception as e:
|
||
return False, f"登录过程出错: {e}"
|
||
finally:
|
||
await browser.close()
|
||
await p.stop()
|
||
|
||
|
||
async def login_with_sms(phone, account_id=0):
|
||
"""
|
||
短信验证码登录 — 人机协作模式。
|
||
流程:
|
||
1. Playwright 打开登录页,自动填手机号
|
||
2. 点击"获取验证码"触发腾讯滑块
|
||
3. 👆 用户在浏览器窗口拖动滑块
|
||
4. 脚本拦截 ticket 并自动发短信
|
||
5. 👆 用户输入 6 位验证码
|
||
6. 脚本自动提交登录
|
||
7. 保存 auth 状态
|
||
|
||
注意:此函数会弹出浏览器窗口,需要人在终端和浏览器之间交互。
|
||
|
||
Returns: (success: bool, msg: str)
|
||
"""
|
||
if not account_id:
|
||
account_id = int(time.time()) % 100000
|
||
|
||
captcha_state = {
|
||
"ticket": None, "randstr": None,
|
||
"sms_sent": False, "sms_error": None,
|
||
"login_success": False, "login_error": None,
|
||
}
|
||
|
||
p = await async_playwright().start()
|
||
browser = await p.chromium.launch(
|
||
headless=False, # 必须有窗口
|
||
args=[
|
||
'--no-sandbox',
|
||
'--disable-blink-features=AutomationControlled',
|
||
]
|
||
)
|
||
|
||
context = await browser.new_context(
|
||
user_agent=(
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) "
|
||
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
|
||
"Version/18.0 Mobile/15E148 Safari/604.1"
|
||
),
|
||
viewport={"width": 390, "height": 844},
|
||
device_scale_factor=3,
|
||
is_mobile=True,
|
||
has_touch=True,
|
||
)
|
||
|
||
await context.add_init_script("""
|
||
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
|
||
""")
|
||
|
||
page = await context.new_page()
|
||
|
||
# 拦截器
|
||
async def on_response(response):
|
||
url = response.url
|
||
if "cap_union_new_verify" in url:
|
||
try:
|
||
data = await response.json()
|
||
if data.get("errorCode") == "0":
|
||
captcha_state["ticket"] = data.get("ticket", "")
|
||
captcha_state["randstr"] = data.get("randstr", "")
|
||
print(" ✅ 滑块验证通过!")
|
||
except Exception:
|
||
pass
|
||
elif "get.vcode" in url:
|
||
try:
|
||
data = await response.json()
|
||
code = str(data.get("status", {}).get("code", ""))
|
||
if code == "0":
|
||
captcha_state["sms_sent"] = True
|
||
print(" ✅ 短信验证码已发送!")
|
||
elif code not in ("", "0"):
|
||
captcha_state["sms_error"] = data.get("status", {}).get("msg", f"code={code}")
|
||
except Exception:
|
||
pass
|
||
elif "user/loginbyvcode" in url:
|
||
try:
|
||
data = await response.json()
|
||
sc = str(data.get("status", {}).get("status_code",
|
||
data.get("status", {}).get("code", "")))
|
||
if sc == "0":
|
||
captcha_state["login_success"] = True
|
||
print(" ✅ 登录成功!")
|
||
else:
|
||
captcha_state["login_error"] = data.get("status", {}).get("status_reason", f"code={sc}")
|
||
except Exception:
|
||
pass
|
||
|
||
page.on("response", on_response)
|
||
|
||
try:
|
||
print(f"\n{'='*50}")
|
||
print(f" 微店短信验证码登录")
|
||
print(f" 手机号: {phone}")
|
||
print(f"{'='*50}\n")
|
||
|
||
# 打开登录页
|
||
print("📡 打开登录页...")
|
||
await page.goto(SSO_LOGIN_URL, wait_until="load", timeout=20000)
|
||
await asyncio.sleep(3)
|
||
|
||
# 点击"登录"
|
||
try:
|
||
btn = page.locator("#login_init_by_login")
|
||
if await btn.count() > 0 and await btn.is_visible():
|
||
await btn.click()
|
||
await asyncio.sleep(2)
|
||
except Exception:
|
||
pass
|
||
|
||
# 填手机号
|
||
print(f"📱 填写手机号...")
|
||
tele = page.locator('input[placeholder*="手机号"]').first
|
||
await tele.click()
|
||
await tele.fill("")
|
||
await page.keyboard.type(phone, delay=80)
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 点击获取验证码
|
||
print("🔘 点击 '获取验证码'...")
|
||
code_btn = page.locator("#login_quickCode_right, text=获取验证码").first
|
||
await code_btn.click()
|
||
|
||
# 等待滑块
|
||
print()
|
||
print(" ┌──────────────────────────────────────────┐")
|
||
print(" │ 👆 请在浏览器窗口中完成滑块验证 │")
|
||
print(" │ 拖动滑块到缺口位置即可 │")
|
||
print(" └──────────────────────────────────────────┘")
|
||
print()
|
||
|
||
for i in range(180):
|
||
await asyncio.sleep(1)
|
||
if captcha_state["ticket"] or captcha_state["sms_sent"]:
|
||
break
|
||
if captcha_state["sms_error"]:
|
||
return False, f"短信发送失败: {captcha_state['sms_error']}"
|
||
if i == 59:
|
||
print(" ⏳ 已等待 60 秒...")
|
||
if i == 119:
|
||
print(" ⏳ 已等待 120 秒...")
|
||
else:
|
||
return False, "滑块验证超时(180秒)"
|
||
|
||
# 等待短信
|
||
print("⏳ 等待短信发送...")
|
||
for _ in range(10):
|
||
await asyncio.sleep(1)
|
||
if captcha_state["sms_sent"]:
|
||
break
|
||
|
||
# 输入验证码
|
||
print()
|
||
try:
|
||
import sys
|
||
loop = asyncio.get_event_loop()
|
||
vcode = await loop.run_in_executor(None, lambda: input(" 📨 请输入 6 位短信验证码: ").strip())
|
||
except (EOFError, KeyboardInterrupt):
|
||
return False, "用户取消"
|
||
|
||
if len(vcode) != 6 or not vcode.isdigit():
|
||
return False, f"无效验证码(需要6位数字)"
|
||
|
||
# 填写并提交
|
||
print("📝 提交登录...")
|
||
vcode_input = page.locator("#login_quick_input, input[placeholder*='验证码']").first
|
||
await vcode_input.fill(vcode)
|
||
await asyncio.sleep(0.3)
|
||
|
||
submit = page.locator("#login_quick_submit, button:has-text('登录')").first
|
||
await submit.click()
|
||
|
||
# 等待登录
|
||
print("⏳ 等待登录...")
|
||
for i in range(30):
|
||
await asyncio.sleep(1)
|
||
if captcha_state["login_success"]:
|
||
break
|
||
if captcha_state["login_error"]:
|
||
return False, captcha_state["login_error"]
|
||
# 检查 cookie
|
||
cookies = await context.cookies()
|
||
cookie_map = {c["name"]: c["value"] for c in cookies}
|
||
if cookie_map.get("is_login") == "true" and cookie_map.get("uid"):
|
||
captcha_state["login_success"] = True
|
||
break
|
||
|
||
if not captcha_state["login_success"]:
|
||
return False, "登录超时"
|
||
|
||
# 保存
|
||
auth_path = get_auth_path(account_id)
|
||
await context.storage_state(path=auth_path)
|
||
await _save_cookies_file_async(context, account_id)
|
||
|
||
print(f"\n✅ 登录成功!Auth 已保存: {auth_path}")
|
||
return True, "登录成功"
|
||
|
||
except Exception as e:
|
||
return False, f"登录异常: {e}"
|
||
finally:
|
||
try:
|
||
await browser.close()
|
||
await p.stop()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _save_cookies_file(account_id, context):
|
||
"""同步版:保存精简 cookies"""
|
||
try:
|
||
cookies = context.cookies() if hasattr(context, 'cookies') else []
|
||
cookie_map = {c["name"]: c["value"] for c in cookies}
|
||
path = get_cookies_path(account_id)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump({
|
||
"uid": cookie_map.get("uid", ""),
|
||
"cookies": cookie_map,
|
||
"saved_at": datetime.now().isoformat(),
|
||
}, f, ensure_ascii=False, indent=2)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
async def _save_cookies_file_async(context, account_id):
|
||
"""异步版:保存精简 cookies"""
|
||
try:
|
||
cookies = await context.cookies()
|
||
cookie_map = {c["name"]: c["value"] for c in cookies}
|
||
path = get_cookies_path(account_id)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump({
|
||
"uid": cookie_map.get("uid", ""),
|
||
"cookies": cookie_map,
|
||
"saved_at": datetime.now().isoformat(),
|
||
}, f, ensure_ascii=False, indent=2)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
async def get_browser_context(playwright_instance, account_id, headless=True):
|
||
"""创建带有已保存登录状态的浏览器上下文"""
|
||
browser = await playwright_instance.chromium.launch(
|
||
headless=headless, args=[
|
||
'--disable-gpu', '--no-sandbox',
|
||
'--disable-blink-features=AutomationControlled',
|
||
]
|
||
)
|
||
device = playwright_instance.devices['iPhone 13']
|
||
auth_path = get_auth_path(account_id)
|
||
|
||
if has_auth(account_id):
|
||
context = await browser.new_context(**device, storage_state=auth_path)
|
||
else:
|
||
context = await browser.new_context(**device)
|
||
|
||
return browser, context
|