from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.keys import Keys import time import csv import os import random class VINSpider: def __init__(self): self.options = webdriver.ChromeOptions() self.options.add_argument('--disable-blink-features=AutomationControlled') self.options.add_argument('--start-maximized') self.options.add_experimental_option('excludeSwitches', ['enable-automation']) self.options.add_experimental_option('useAutomationExtension', False) self.driver = None self.wait = None def init_driver(self, chromedriver_path): """初始化浏览器驱动""" service = Service(chromedriver_path) self.driver = webdriver.Chrome(service=service, options=self.options) self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', { get: () => undefined }) ''' }) self.wait = WebDriverWait(self.driver, 30) print("✅ 浏览器启动成功") def save_screenshot(self, name="screenshot"): """保存截图""" self.driver.save_screenshot(f"{name}.png") print(f"📸 截图已保存: {name}.png") def login(self, username, password): """登录系统""" print("\n🔐 开始登录...") try: # 等待登录表单加载 login_form = self.wait.until( EC.presence_of_element_located((By.XPATH, '//form[@class="el-form login-form"]')) ) print("✅ 登录表单已找到") # ======== 用户名输入框 ======== # input[@placeholder="账号"] 或 input[@name="username"] username_input = login_form.find_element( By.XPATH, './/input[@placeholder="账号" or @name="username"]' ) username_input.clear() username_input.send_keys(username) print(f" ✅ 已输入账号: {username}") # ======== 密码输入框 ======== password_input = login_form.find_element( By.XPATH, './/input[@placeholder="密码" or @name="password"]' ) password_input.clear() password_input.send_keys(password) print(" ✅ 已输入密码") # ======== 验证码输入框 ======== try: captcha_input = login_form.find_element( By.XPATH, './/input[@placeholder="请输入验证码"]' ) print(" 🔒 找到验证码输入框") has_captcha = True except: has_captcha = False print(" ⚠️ 未找到验证码输入框") # ======== 验证码处理 ======== if has_captcha: # 保存验证码图片(方便人工识别) try: captcha_img = login_form.find_element( By.XPATH, './/img[@class="login-code-img"]' ) self.driver.save_screenshot("captcha_page.png") print(" 📸 已保存验证码页面截图") except: pass print("\n" + "="*50) print("⚠️ 请查看截图 'captcha_page.png' 获取验证码") print(" 或在浏览器中查看验证码图片") print("="*50) captcha_code = input("请输入验证码: ").strip() captcha_input.send_keys(captcha_code) print(" ✅ 已输入验证码") # ======== 点击登录按钮 ======== # button[@class="el-button submit-login el-button--primary"] login_btn = login_form.find_element( By.XPATH, './/button[@class="el-button submit-login el-button--primary"]' ) login_btn.click() print(" ✅ 已点击登录按钮") # 等待登录结果 time.sleep(3) # 检查是否登录成功(是否有错误提示) try: error_msg = self.driver.find_element( By.XPATH, '//p[@class="el-form-item__error"]' ) if error_msg.is_displayed(): print(f" ❌ 登录错误: {error_msg.text}") self.save_screenshot("login_error") return False except: pass print("✅ 登录成功!") return True except Exception as e: print(f"❌ 登录过程出错: {str(e)}") self.save_screenshot("login_error") return False def wait_for_query_page(self): """等待进入查询页面""" print("\n⏳ 正在进入查询页面...") target_url = "https://tspmanager.chery.ru/#/f-statistic-analysis/s-universal-analysis/t-vehicle-online-duration" self.driver.get(target_url) time.sleep(3) # 检测查询页面元素 try: vin_input = self.wait.until( EC.presence_of_element_located( (By.XPATH, '//*[@id="pane-tabSingle"]/form/div/div[1]/div/div/div/input') ) ) print("✅ 已进入查询页面") return True except: print("⚠️ 未检测到查询页面元素") self.save_screenshot("query_page_check") print("请手动进入查询页面,然后按回车继续") input("按回车继续...") return True def query_vin(self, vin): """查询单个VIN""" try: # 清空并输入VIN vin_input = self.wait.until( EC.element_to_be_clickable( (By.XPATH, '//*[@id="pane-tabSingle"]/form/div/div[1]/div/div/div/input') ) ) vin_input.clear() vin_input.send_keys(vin) print(f" 📝 VIN: {vin}") # 点击查询按钮 query_btn = self.driver.find_element( By.XPATH, '//*[@id="pane-tabSingle"]/form/div/div[2]/button' ) query_btn.click() print(f" 🔍 查询中...") # 等待结果 time.sleep(3) # 获取结果 result_element = self.driver.find_element( By.XPATH, '//*[@id="pane-tabSingle"]/section' ) result_text = result_element.text.strip() if not result_text: print(f" ⚠️ 【{vin}】结果为空") return "结果为空" if any(keyword in result_text for keyword in ["无", "没有", "0", "null", "None"]): print(f" ⚠️ 【{vin}】未查询到数据: {result_text[:50]}") return result_text print(f" ✅ 【{vin}】查询成功") return result_text except Exception as e: print(f" ❌ 【{vin}】查询失败: {str(e)[:50]}") self.save_screenshot(f"error_{vin}") return f"查询失败: {str(e)[:50]}" def batch_query(self, vin_list, output_file): """批量查询VIN""" results = [] error_vins = [] success_count = 0 total = len(vin_list) print(f"\n🚀 开始批量查询,共 {total} 个VIN\n") for i, vin in enumerate(vin_list, 1): vin = vin.strip().upper() if not vin or len(vin) != 17: continue print(f"\n[{i}/{total}] 正在查询: {vin}") result = self.query_vin(vin) is_success = not any(keyword in result for keyword in ['失败', '错误', '异常']) if is_success: success_count += 1 else: error_vins.append(vin) results.append({ 'VIN': vin, '单车在线时长': result, '查询时间': time.strftime('%Y-%m-%d %H:%M:%S'), '状态': '成功' if is_success else '失败' }) # 随机延迟 delay = random.uniform(2, 4) print(f" ⏳ 等待 {delay:.1f} 秒...") time.sleep(delay) # 每10个保存临时文件 if i % 10 == 0: self.export_results(results, f"temp_results_{i}.csv") print(f" 💾 临时保存") # 导出结果 self.export_results(results, output_file) # 保存失败列表 if error_vins: with open('error_vin.txt', 'w', encoding='utf-8') as f: f.write('\n'.join(error_vins)) # 统计 print(f"\n{'='*60}") print(f"✅ 查询完成!总计: {total} | 成功: {success_count} | 失败: {len(error_vins)}") print(f"📁 结果文件: {output_file}") if error_vins: print(f"📁 失败VIN: error_vin.txt") print(f"{'='*60}") def export_results(self, results, output_file): """导出CSV""" with open(output_file, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=['VIN', '单车在线时长', '查询时间', '状态']) writer.writeheader() writer.writerows(results) def close(self): if self.driver: input("\n按回车关闭浏览器...") self.driver.quit() def read_vin_file(file_path): if not os.path.exists(file_path): return [] with open(file_path, 'r', encoding='utf-8') as f: return [line.strip().upper() for line in f if line.strip()] def main(): print("\n" + "="*60) print(" VIN 批量查询爬虫 - Chery TSP 系统") print("="*60 + "\n") # ========== 配置区域 ========== CHROMEDRIVER_PATH = r'.\chromedriver.exe' USERNAME = 'your_username' # 👈 替换为你的账号 PASSWORD = 'your_password' # 👈 替换为你的密码 VIN_FILE = 'vin_list.txt' OUTPUT_FILE = 'query_results.csv' # ============================== # 检查chromedriver if not os.path.exists(CHROMEDRIVER_PATH): print("❌ 找不到 chromedriver.exe") print(" 请将 chromedriver.exe 放到脚本同目录") print(" 下载地址: https://chromedriver.chromium.org/downloads") return # 读取VIN vin_list = read_vin_file(VIN_FILE) if not vin_list: print("❌ VIN文件为空!") return print(f"🔧 驱动: {CHROMEDRIVER_PATH}") print(f"📋 VIN数量: {len(vin_list)}") print(f"👤 账号: {USERNAME}\n") # 初始化 spider = VINSpider() spider.init_driver(CHROMEDRIVER_PATH) # 打开登录页 login_url = "https://tspmanager.chery.ru/" spider.driver.get(login_url) time.sleep(2) # 登录 if not spider.login(USERNAME, PASSWORD): print("❌ 登录失败,请检查账号密码和验证码") spider.close() return # 等待查询页面 spider.wait_for_query_page() # 批量查询 spider.batch_query(vin_list, OUTPUT_FILE) spider.close() if __name__ == '__main__': main()