335 lines
12 KiB
Python
335 lines
12 KiB
Python
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.common.keys import Keys
|
||
import time
|
||
import csv
|
||
import os
|
||
import random
|
||
|
||
class VINSpider:
|
||
def __init__(self):
|
||
self.options = webdriver.ChromeOptions()
|
||
self.options.add_argument('--disable-blink-features=AutomationControlled')
|
||
self.options.add_argument('--start-maximized')
|
||
self.options.add_experimental_option('excludeSwitches', ['enable-automation'])
|
||
self.options.add_experimental_option('useAutomationExtension', False)
|
||
|
||
self.driver = None
|
||
self.wait = None
|
||
|
||
def init_driver(self, chromedriver_path):
|
||
"""初始化浏览器驱动"""
|
||
service = Service(chromedriver_path)
|
||
self.driver = webdriver.Chrome(service=service, options=self.options)
|
||
self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
|
||
'source': '''
|
||
Object.defineProperty(navigator, 'webdriver', {
|
||
get: () => undefined
|
||
})
|
||
'''
|
||
})
|
||
self.wait = WebDriverWait(self.driver, 30)
|
||
print("✅ 浏览器启动成功")
|
||
|
||
def save_screenshot(self, name="screenshot"):
|
||
"""保存截图"""
|
||
self.driver.save_screenshot(f"{name}.png")
|
||
print(f"📸 截图已保存: {name}.png")
|
||
|
||
def login(self, username, password):
|
||
"""登录系统"""
|
||
print("\n🔐 开始登录...")
|
||
|
||
try:
|
||
# 等待登录表单加载
|
||
login_form = self.wait.until(
|
||
EC.presence_of_element_located((By.XPATH, '//form[@class="el-form login-form"]'))
|
||
)
|
||
print("✅ 登录表单已找到")
|
||
|
||
# ======== 用户名输入框 ========
|
||
# input[@placeholder="账号"] 或 input[@name="username"]
|
||
username_input = login_form.find_element(
|
||
By.XPATH, './/input[@placeholder="账号" or @name="username"]'
|
||
)
|
||
username_input.clear()
|
||
username_input.send_keys(username)
|
||
print(f" ✅ 已输入账号: {username}")
|
||
|
||
# ======== 密码输入框 ========
|
||
password_input = login_form.find_element(
|
||
By.XPATH, './/input[@placeholder="密码" or @name="password"]'
|
||
)
|
||
password_input.clear()
|
||
password_input.send_keys(password)
|
||
print(" ✅ 已输入密码")
|
||
|
||
# ======== 验证码输入框 ========
|
||
try:
|
||
captcha_input = login_form.find_element(
|
||
By.XPATH, './/input[@placeholder="请输入验证码"]'
|
||
)
|
||
print(" 🔒 找到验证码输入框")
|
||
has_captcha = True
|
||
except:
|
||
has_captcha = False
|
||
print(" ⚠️ 未找到验证码输入框")
|
||
|
||
# ======== 验证码处理 ========
|
||
if has_captcha:
|
||
# 保存验证码图片(方便人工识别)
|
||
try:
|
||
captcha_img = login_form.find_element(
|
||
By.XPATH, './/img[@class="login-code-img"]'
|
||
)
|
||
self.driver.save_screenshot("captcha_page.png")
|
||
print(" 📸 已保存验证码页面截图")
|
||
except:
|
||
pass
|
||
|
||
print("\n" + "="*50)
|
||
print("⚠️ 请查看截图 'captcha_page.png' 获取验证码")
|
||
print(" 或在浏览器中查看验证码图片")
|
||
print("="*50)
|
||
|
||
captcha_code = input("请输入验证码: ").strip()
|
||
captcha_input.send_keys(captcha_code)
|
||
print(" ✅ 已输入验证码")
|
||
|
||
# ======== 点击登录按钮 ========
|
||
# button[@class="el-button submit-login el-button--primary"]
|
||
login_btn = login_form.find_element(
|
||
By.XPATH, './/button[@class="el-button submit-login el-button--primary"]'
|
||
)
|
||
login_btn.click()
|
||
print(" ✅ 已点击登录按钮")
|
||
|
||
# 等待登录结果
|
||
time.sleep(3)
|
||
|
||
# 检查是否登录成功(是否有错误提示)
|
||
try:
|
||
error_msg = self.driver.find_element(
|
||
By.XPATH, '//p[@class="el-form-item__error"]'
|
||
)
|
||
if error_msg.is_displayed():
|
||
print(f" ❌ 登录错误: {error_msg.text}")
|
||
self.save_screenshot("login_error")
|
||
return False
|
||
except:
|
||
pass
|
||
|
||
print("✅ 登录成功!")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 登录过程出错: {str(e)}")
|
||
self.save_screenshot("login_error")
|
||
return False
|
||
|
||
def wait_for_query_page(self):
|
||
"""等待进入查询页面"""
|
||
print("\n⏳ 正在进入查询页面...")
|
||
|
||
target_url = "https://tspmanager.chery.ru/#/f-statistic-analysis/s-universal-analysis/t-vehicle-online-duration"
|
||
self.driver.get(target_url)
|
||
time.sleep(3)
|
||
|
||
# 检测查询页面元素
|
||
try:
|
||
vin_input = self.wait.until(
|
||
EC.presence_of_element_located(
|
||
(By.XPATH, '//*[@id="pane-tabSingle"]/form/div/div[1]/div/div/div/input')
|
||
)
|
||
)
|
||
print("✅ 已进入查询页面")
|
||
return True
|
||
except:
|
||
print("⚠️ 未检测到查询页面元素")
|
||
self.save_screenshot("query_page_check")
|
||
print("请手动进入查询页面,然后按回车继续")
|
||
input("按回车继续...")
|
||
return True
|
||
|
||
def query_vin(self, vin):
|
||
"""查询单个VIN"""
|
||
try:
|
||
# 清空并输入VIN
|
||
vin_input = self.wait.until(
|
||
EC.element_to_be_clickable(
|
||
(By.XPATH, '//*[@id="pane-tabSingle"]/form/div/div[1]/div/div/div/input')
|
||
)
|
||
)
|
||
vin_input.clear()
|
||
vin_input.send_keys(vin)
|
||
print(f" 📝 VIN: {vin}")
|
||
|
||
# 点击查询按钮
|
||
query_btn = self.driver.find_element(
|
||
By.XPATH, '//*[@id="pane-tabSingle"]/form/div/div[2]/button'
|
||
)
|
||
query_btn.click()
|
||
print(f" 🔍 查询中...")
|
||
|
||
# 等待结果
|
||
time.sleep(3)
|
||
|
||
# 获取结果
|
||
result_element = self.driver.find_element(
|
||
By.XPATH, '//*[@id="pane-tabSingle"]/section'
|
||
)
|
||
result_text = result_element.text.strip()
|
||
|
||
if not result_text:
|
||
print(f" ⚠️ 【{vin}】结果为空")
|
||
return "结果为空"
|
||
|
||
if any(keyword in result_text for keyword in ["无", "没有", "0", "null", "None"]):
|
||
print(f" ⚠️ 【{vin}】未查询到数据: {result_text[:50]}")
|
||
return result_text
|
||
|
||
print(f" ✅ 【{vin}】查询成功")
|
||
return result_text
|
||
|
||
except Exception as e:
|
||
print(f" ❌ 【{vin}】查询失败: {str(e)[:50]}")
|
||
self.save_screenshot(f"error_{vin}")
|
||
return f"查询失败: {str(e)[:50]}"
|
||
|
||
def batch_query(self, vin_list, output_file):
|
||
"""批量查询VIN"""
|
||
results = []
|
||
error_vins = []
|
||
success_count = 0
|
||
|
||
total = len(vin_list)
|
||
print(f"\n🚀 开始批量查询,共 {total} 个VIN\n")
|
||
|
||
for i, vin in enumerate(vin_list, 1):
|
||
vin = vin.strip().upper()
|
||
if not vin or len(vin) != 17:
|
||
continue
|
||
|
||
print(f"\n[{i}/{total}] 正在查询: {vin}")
|
||
|
||
result = self.query_vin(vin)
|
||
|
||
is_success = not any(keyword in result for keyword in ['失败', '错误', '异常'])
|
||
if is_success:
|
||
success_count += 1
|
||
else:
|
||
error_vins.append(vin)
|
||
|
||
results.append({
|
||
'VIN': vin,
|
||
'单车在线时长': result,
|
||
'查询时间': time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'状态': '成功' if is_success else '失败'
|
||
})
|
||
|
||
# 随机延迟
|
||
delay = random.uniform(2, 4)
|
||
print(f" ⏳ 等待 {delay:.1f} 秒...")
|
||
time.sleep(delay)
|
||
|
||
# 每10个保存临时文件
|
||
if i % 10 == 0:
|
||
self.export_results(results, f"temp_results_{i}.csv")
|
||
print(f" 💾 临时保存")
|
||
|
||
# 导出结果
|
||
self.export_results(results, output_file)
|
||
|
||
# 保存失败列表
|
||
if error_vins:
|
||
with open('error_vin.txt', 'w', encoding='utf-8') as f:
|
||
f.write('\n'.join(error_vins))
|
||
|
||
# 统计
|
||
print(f"\n{'='*60}")
|
||
print(f"✅ 查询完成!总计: {total} | 成功: {success_count} | 失败: {len(error_vins)}")
|
||
print(f"📁 结果文件: {output_file}")
|
||
if error_vins:
|
||
print(f"📁 失败VIN: error_vin.txt")
|
||
print(f"{'='*60}")
|
||
|
||
def export_results(self, results, output_file):
|
||
"""导出CSV"""
|
||
with open(output_file, 'w', newline='', encoding='utf-8-sig') as f:
|
||
writer = csv.DictWriter(f, fieldnames=['VIN', '单车在线时长', '查询时间', '状态'])
|
||
writer.writeheader()
|
||
writer.writerows(results)
|
||
|
||
def close(self):
|
||
if self.driver:
|
||
input("\n按回车关闭浏览器...")
|
||
self.driver.quit()
|
||
|
||
|
||
def read_vin_file(file_path):
|
||
if not os.path.exists(file_path):
|
||
return []
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
return [line.strip().upper() for line in f if line.strip()]
|
||
|
||
|
||
def main():
|
||
print("\n" + "="*60)
|
||
print(" VIN 批量查询爬虫 - Chery TSP 系统")
|
||
print("="*60 + "\n")
|
||
|
||
# ========== 配置区域 ==========
|
||
CHROMEDRIVER_PATH = r'.\chromedriver.exe'
|
||
USERNAME = 'your_username' # 👈 替换为你的账号
|
||
PASSWORD = 'your_password' # 👈 替换为你的密码
|
||
VIN_FILE = 'vin_list.txt'
|
||
OUTPUT_FILE = 'query_results.csv'
|
||
# ==============================
|
||
|
||
# 检查chromedriver
|
||
if not os.path.exists(CHROMEDRIVER_PATH):
|
||
print("❌ 找不到 chromedriver.exe")
|
||
print(" 请将 chromedriver.exe 放到脚本同目录")
|
||
print(" 下载地址: https://chromedriver.chromium.org/downloads")
|
||
return
|
||
|
||
# 读取VIN
|
||
vin_list = read_vin_file(VIN_FILE)
|
||
if not vin_list:
|
||
print("❌ VIN文件为空!")
|
||
return
|
||
|
||
print(f"🔧 驱动: {CHROMEDRIVER_PATH}")
|
||
print(f"📋 VIN数量: {len(vin_list)}")
|
||
print(f"👤 账号: {USERNAME}\n")
|
||
|
||
# 初始化
|
||
spider = VINSpider()
|
||
spider.init_driver(CHROMEDRIVER_PATH)
|
||
|
||
# 打开登录页
|
||
login_url = "https://tspmanager.chery.ru/"
|
||
spider.driver.get(login_url)
|
||
time.sleep(2)
|
||
|
||
# 登录
|
||
if not spider.login(USERNAME, PASSWORD):
|
||
print("❌ 登录失败,请检查账号密码和验证码")
|
||
spider.close()
|
||
return
|
||
|
||
# 等待查询页面
|
||
spider.wait_for_query_page()
|
||
|
||
# 批量查询
|
||
spider.batch_query(vin_list, OUTPUT_FILE)
|
||
|
||
spider.close()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|