Files
weidian/server/services/auth_service.py

419 lines
14 KiB
Python
Raw Permalink Normal View History

"""
登录态管理服务 基于 v4 人机协作登录
支持两种登录方式
1. 短信验证码登录需要人拖滑块 + 输入验证码
2. 账号密码登录 Playwright 自动化无需人干预
"""
import asyncio
import json
import os
import time
from datetime import datetime
from playwright.async_api import async_playwright
from utils.stealth import stealth_async
AUTH_DIR = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'auth')
# ── 微店 SSO 端点 ──
SSO_LOGIN_URL = "https://sso.weidian.com/login/index.php"
SEND_SMS_URL = "https://thor.weidian.com/passport/get.vcode/2.0"
LOGIN_BY_VCODE_URL = "https://sso.weidian.com/user/loginbyvcode"
LOGIN_BY_PWD_URL = "https://sso.weidian.com/user/login"
CAPTCHA_APPID = "2003473469"
def get_auth_path(account_id):
os.makedirs(AUTH_DIR, exist_ok=True)
return os.path.join(AUTH_DIR, f'auth_state_{account_id}.json')
def get_cookies_path(account_id):
os.makedirs(AUTH_DIR, exist_ok=True)
return os.path.join(AUTH_DIR, f'cookies_{account_id}.json')
def has_auth(account_id):
path = get_auth_path(account_id)
return os.path.exists(path) and os.path.getsize(path) > 10
def get_cookies_for_requests(account_id) -> dict:
"""获取给 requests/aiohttp 使用的 cookies"""
path = get_cookies_path(account_id)
if os.path.exists(path):
with open(path, "r") as f:
data = json.load(f)
return data.get("cookies", {})
return {}
async def login_with_password(account_id, phone, password):
"""
账号密码登录 Playwright 自动化无需人参与
流程
1. 打开登录页 "登录" 切到"账号密码" tab
2. 填手机号密码 点击登录
3. 监听 /user/login 响应提取 cookie
4. 保存 storage_state
"""
login_result = {'success': False, 'msg': '登录超时', 'cookies': []}
p = await async_playwright().start()
browser = await p.chromium.launch(
headless=True, args=[
'--disable-gpu', '--no-sandbox',
'--disable-blink-features=AutomationControlled',
]
)
device = p.devices['iPhone 13']
context = await browser.new_context(**device)
page = await context.new_page()
await stealth_async(page)
# 反检测
await page.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
""")
# 监听登录接口响应
async def on_response(response):
if 'user/login' in response.url and response.status == 200:
try:
data = await response.json()
status = data.get('status', {})
if status.get('status_code') == 0:
login_result['success'] = True
login_result['msg'] = '登录成功'
login_result['cookies'] = data.get('result', {}).get('cookie', [])
else:
login_result['msg'] = f"登录失败: {status.get('status_reason', '未知错误')}"
except Exception as e:
login_result['msg'] = f"解析登录响应失败: {e}"
page.on("response", on_response)
try:
await page.goto(SSO_LOGIN_URL, wait_until='networkidle', timeout=15000)
await asyncio.sleep(1)
# 点击"登录"进入表单
try:
await page.locator('#login_init_by_login').click(timeout=5000)
await asyncio.sleep(1.5)
except Exception:
pass
# 点击"账号密码登录" tab
try:
await page.locator('h4.login_content_h4 span', has_text="账号密码登录").click(timeout=3000)
await asyncio.sleep(0.5)
except Exception:
pass
# 填写手机号
phone_input = page.locator('input[placeholder*="手机号"]').first
await phone_input.click()
await phone_input.fill("")
await page.keyboard.type(phone, delay=50)
# 填写密码
pwd_input = page.locator('input[placeholder*="登录密码"], input[type="password"]').first
await pwd_input.click()
await pwd_input.fill("")
await page.keyboard.type(password, delay=50)
await asyncio.sleep(0.5)
# 点击登录
await page.locator('#login_pwd_submit').click(timeout=5000)
# 等待 API 响应
await asyncio.sleep(5)
if login_result['success'] and login_result['cookies']:
# 从 API 响应中提取 cookie写入 context
for c in login_result['cookies']:
await context.add_cookies([{
"name": c.get("name", ""),
"value": c.get("value", ""),
"domain": c.get("domain", ".weidian.com"),
"path": c.get("path", "/"),
"httpOnly": c.get("httpOnly", False),
"secure": c.get("secure", False),
"sameSite": "Lax",
}])
# 保存 storage_state + 精简 cookies
auth_path = get_auth_path(account_id)
await context.storage_state(path=auth_path)
await _save_cookies_file_async(context, account_id)
return True, "登录成功"
return False, login_result['msg']
except Exception as e:
return False, f"登录过程出错: {e}"
finally:
await browser.close()
await p.stop()
async def login_with_sms(phone, account_id=0):
"""
短信验证码登录 人机协作模式
流程
1. Playwright 打开登录页自动填手机号
2. 点击"获取验证码"触发腾讯滑块
3. 👆 用户在浏览器窗口拖动滑块
4. 脚本拦截 ticket 并自动发短信
5. 👆 用户输入 6 位验证码
6. 脚本自动提交登录
7. 保存 auth 状态
注意此函数会弹出浏览器窗口需要人在终端和浏览器之间交互
Returns: (success: bool, msg: str)
"""
if not account_id:
account_id = int(time.time()) % 100000
captcha_state = {
"ticket": None, "randstr": None,
"sms_sent": False, "sms_error": None,
"login_success": False, "login_error": None,
}
p = await async_playwright().start()
browser = await p.chromium.launch(
headless=False, # 必须有窗口
args=[
'--no-sandbox',
'--disable-blink-features=AutomationControlled',
]
)
context = await browser.new_context(
user_agent=(
"Mozilla/5.0 (iPhone; CPU iPhone OS 18_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/18.0 Mobile/15E148 Safari/604.1"
),
viewport={"width": 390, "height": 844},
device_scale_factor=3,
is_mobile=True,
has_touch=True,
)
await context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
""")
page = await context.new_page()
# 拦截器
async def on_response(response):
url = response.url
if "cap_union_new_verify" in url:
try:
data = await response.json()
if data.get("errorCode") == "0":
captcha_state["ticket"] = data.get("ticket", "")
captcha_state["randstr"] = data.get("randstr", "")
print(" ✅ 滑块验证通过!")
except Exception:
pass
elif "get.vcode" in url:
try:
data = await response.json()
code = str(data.get("status", {}).get("code", ""))
if code == "0":
captcha_state["sms_sent"] = True
print(" ✅ 短信验证码已发送!")
elif code not in ("", "0"):
captcha_state["sms_error"] = data.get("status", {}).get("msg", f"code={code}")
except Exception:
pass
elif "user/loginbyvcode" in url:
try:
data = await response.json()
sc = str(data.get("status", {}).get("status_code",
data.get("status", {}).get("code", "")))
if sc == "0":
captcha_state["login_success"] = True
print(" ✅ 登录成功!")
else:
captcha_state["login_error"] = data.get("status", {}).get("status_reason", f"code={sc}")
except Exception:
pass
page.on("response", on_response)
try:
print(f"\n{'='*50}")
print(f" 微店短信验证码登录")
print(f" 手机号: {phone}")
print(f"{'='*50}\n")
# 打开登录页
print("📡 打开登录页...")
await page.goto(SSO_LOGIN_URL, wait_until="load", timeout=20000)
await asyncio.sleep(3)
# 点击"登录"
try:
btn = page.locator("#login_init_by_login")
if await btn.count() > 0 and await btn.is_visible():
await btn.click()
await asyncio.sleep(2)
except Exception:
pass
# 填手机号
print(f"📱 填写手机号...")
tele = page.locator('input[placeholder*="手机号"]').first
await tele.click()
await tele.fill("")
await page.keyboard.type(phone, delay=80)
await asyncio.sleep(0.5)
# 点击获取验证码
print("🔘 点击 '获取验证码'...")
code_btn = page.locator("#login_quickCode_right, text=获取验证码").first
await code_btn.click()
# 等待滑块
print()
print(" ┌──────────────────────────────────────────┐")
print(" │ 👆 请在浏览器窗口中完成滑块验证 │")
print(" │ 拖动滑块到缺口位置即可 │")
print(" └──────────────────────────────────────────┘")
print()
for i in range(180):
await asyncio.sleep(1)
if captcha_state["ticket"] or captcha_state["sms_sent"]:
break
if captcha_state["sms_error"]:
return False, f"短信发送失败: {captcha_state['sms_error']}"
if i == 59:
print(" ⏳ 已等待 60 秒...")
if i == 119:
print(" ⏳ 已等待 120 秒...")
else:
return False, "滑块验证超时180秒"
# 等待短信
print("⏳ 等待短信发送...")
for _ in range(10):
await asyncio.sleep(1)
if captcha_state["sms_sent"]:
break
# 输入验证码
print()
try:
import sys
loop = asyncio.get_event_loop()
vcode = await loop.run_in_executor(None, lambda: input(" 📨 请输入 6 位短信验证码: ").strip())
except (EOFError, KeyboardInterrupt):
return False, "用户取消"
if len(vcode) != 6 or not vcode.isdigit():
return False, f"无效验证码需要6位数字"
# 填写并提交
print("📝 提交登录...")
vcode_input = page.locator("#login_quick_input, input[placeholder*='验证码']").first
await vcode_input.fill(vcode)
await asyncio.sleep(0.3)
submit = page.locator("#login_quick_submit, button:has-text('登录')").first
await submit.click()
# 等待登录
print("⏳ 等待登录...")
for i in range(30):
await asyncio.sleep(1)
if captcha_state["login_success"]:
break
if captcha_state["login_error"]:
return False, captcha_state["login_error"]
# 检查 cookie
cookies = await context.cookies()
cookie_map = {c["name"]: c["value"] for c in cookies}
if cookie_map.get("is_login") == "true" and cookie_map.get("uid"):
captcha_state["login_success"] = True
break
if not captcha_state["login_success"]:
return False, "登录超时"
# 保存
auth_path = get_auth_path(account_id)
await context.storage_state(path=auth_path)
await _save_cookies_file_async(context, account_id)
print(f"\n✅ 登录成功Auth 已保存: {auth_path}")
return True, "登录成功"
except Exception as e:
return False, f"登录异常: {e}"
finally:
try:
await browser.close()
await p.stop()
except Exception:
pass
def _save_cookies_file(account_id, context):
"""同步版:保存精简 cookies"""
try:
cookies = context.cookies() if hasattr(context, 'cookies') else []
cookie_map = {c["name"]: c["value"] for c in cookies}
path = get_cookies_path(account_id)
with open(path, "w", encoding="utf-8") as f:
json.dump({
"uid": cookie_map.get("uid", ""),
"cookies": cookie_map,
"saved_at": datetime.now().isoformat(),
}, f, ensure_ascii=False, indent=2)
except Exception:
pass
async def _save_cookies_file_async(context, account_id):
"""异步版:保存精简 cookies"""
try:
cookies = await context.cookies()
cookie_map = {c["name"]: c["value"] for c in cookies}
path = get_cookies_path(account_id)
with open(path, "w", encoding="utf-8") as f:
json.dump({
"uid": cookie_map.get("uid", ""),
"cookies": cookie_map,
"saved_at": datetime.now().isoformat(),
}, f, ensure_ascii=False, indent=2)
except Exception:
pass
async def get_browser_context(playwright_instance, account_id, headless=True):
"""创建带有已保存登录状态的浏览器上下文"""
browser = await playwright_instance.chromium.launch(
headless=headless, args=[
'--disable-gpu', '--no-sandbox',
'--disable-blink-features=AutomationControlled',
]
)
device = playwright_instance.devices['iPhone 13']
auth_path = get_auth_path(account_id)
if has_auth(account_id):
context = await browser.new_context(**device, storage_state=auth_path)
else:
context = await browser.new_context(**device)
return browser, context