#多线程 import os import time import pandas as pd import threading import queue from concurrent.futures import ThreadPoolExecutor, as_completed from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.service import Service # 配置 URL = "https://prod-eu-cmp.simbalinkglobal.com" INPUT_XPATH = '//textarea[@id="iccidList"] | //input[@id="iccidList"] | //*[@id="iccidList"]' CONFIRM_BTN_XPATH = '//button[contains(text(),"查询")] | //button[contains(text(),"确认")] | //*[@id="pop-confirm"]//button[2] | //button[@type="submit"]' BATCH_QUERY_BTN_XPATH = '/html/body/div/div[2]/main/div/div/div/div/div[2]/div[2]/div[1]/div[2]/div/button' RESULT_ROWS_XPATH = [ '//table[contains(@class,"ant-table")]//tbody/tr', '//table//tbody/tr', '//div[contains(@class,"ant-table")]//tbody/tr', '//table[@class="table"]//tbody/tr', '//*[contains(@class,"table")]//tr[position()>1]' ] MAX_PER_BATCH = 50 OUTPUT_CSV = "results.csv" ICCID_FILE = "text.txt" HEADLESS = True # Linux环境默认启用无头模式 BATCH_RETRY_COUNT = 2 # 多线程配置 MAX_THREADS = 3 # 最大线程数(优化:减少线程数) THREAD_BATCH_SIZE = 100 # 每个线程处理的批次大小 # 登录配置 USERNAME = "xiongshi_yunwei" # 请替换为实际用户名 PASSWORD = "Cm20p20Nmo!25" # 请替换为实际密码 # 登录页面元素XPath - 根据实际页面元素更新 USERNAME_XPATH = "/html/body/div/div[2]/div/form/div[1]/div/div/div/div/span/input" PASSWORD_XPATH = "//*[@id='password']" LOGIN_BUTTON_XPATH = "/html/body/div/div[2]/div/form/button" def read_query_items(path): """读取查询项目文件""" encodings = ['utf-8', 'gbk', 'utf-8-sig', 'cp1252'] for encoding in encodings: try: with open(path, 'r', encoding=encoding) as f: lines = [l.strip() for l in f.readlines() if l.strip()] if lines: return lines except Exception: continue raise Exception("无法读取文件") # 线程安全的文件写入锁 file_lock = threading.Lock() def save_results_to_csv(results, filename, is_first_batch=False): """线程安全地保存结果到CSV文件""" if not results: return 0 # 构建DataFrame df_data = [] for result in results: row_data = {"batch": result["batch"]} cells = result["cells"] # for i, cell in enumerate(cells): row_data[f"col_{i}"] = cell df_data.append(row_data) # 创建DataFrame df = pd.DataFrame(df_data) # 使用锁确保线程安全 with file_lock: try: if is_first_batch: # 第一次保存时,手动添加表头 header_data = { "batch": "批次号", "col_0": "ICCID", "col_1": "设备ID", "col_2": "生命周期", "col_3": "周期用量(MB)", "col_4": "流量上限(MB)", "col_5": "在用套餐", "col_6": "租户", "col_7": "服务状态", "col_8": "激活时间", "col_9": "MSISDN", "col_10": "IMSI" } # 创建表头DataFrame header_df = pd.DataFrame([header_data]) # 先写入表头,再写入数据 header_df.to_csv(filename, index=False, encoding='utf-8-sig') df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig') else: # 后续批次只追加数据 df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig') except Exception as e: print(f"保存文件时出错: {e}") return 0 return len(df_data) def login(driver, username, password): """使用用户名密码登录""" try: time.sleep(3) current_url = driver.current_url # 查找用户名输入框 - 使用更灵活的方式 username_input = None username_selectors = [ USERNAME_XPATH, # 您提供的精确XPath "//input[@id='username']", # 根据您提供的id "//input[@placeholder='请输入账号']", # 根据您提供的placeholder "//input[@type='text']", "//input[@placeholder*='用户名']", "//input[@placeholder*='账号']", "//input[@placeholder*='user']", "//input[@placeholder*='email']", "//input[@name='username']", "//input[@name='user']" ] for selector in username_selectors: try: username_input = WebDriverWait(driver, 2).until( EC.element_to_be_clickable((By.XPATH, selector)) ) break except Exception: continue if not username_input: print("未找到用户名输入框") return False # 查找密码输入框 - 使用更灵活的方式 password_input = None password_selectors = [ PASSWORD_XPATH, # 您提供的精确XPath "//input[@id='password']", # 根据您提供的id "//input[@placeholder='请输入密码']", # 根据您提供的placeholder "//input[@type='password']", "//input[@placeholder*='密码']", "//input[@placeholder*='password']", "//input[@name='password']", "//input[@name='pwd']" ] for selector in password_selectors: try: password_input = WebDriverWait(driver, 2).until( EC.element_to_be_clickable((By.XPATH, selector)) ) break except Exception: continue if not password_input: print("未找到密码输入框") return False # 查找登录按钮 - 使用更灵活的方式 login_button = None login_selectors = [ LOGIN_BUTTON_XPATH, # 您提供的精确XPath "//button[@type='submit']", "//button[contains(text(),'登录')]", "//button[contains(text(),'Login')]", "//button[contains(text(),'登入')]", "//input[@type='submit']", "//button[contains(@class,'login')]", "//button[contains(@class,'submit')]" ] for selector in login_selectors: try: login_button = WebDriverWait(driver, 2).until( EC.element_to_be_clickable((By.XPATH, selector)) ) break except Exception: continue if not login_button: print("未找到登录按钮") return False # 清空并输入用户名 try: # 先点击输入框确保焦点 username_input.click() time.sleep(0.5) # 清空输入框 username_input.clear() time.sleep(0.1) # 使用JavaScript设置值(更可靠) driver.execute_script("arguments[0].value = '';", username_input) username_input.send_keys(username) time.sleep(0.5) print(f"已输入用户名: {username}") except Exception as e: print(f"输入用户名失败: {e}") return False # 清空并输入密码 try: # 先点击输入框确保焦点 password_input.click() time.sleep(0.2) # 清空输入框 password_input.clear() time.sleep(0.1) # 使用JavaScript设置值(更可靠) driver.execute_script("arguments[0].value = '';", password_input) password_input.send_keys(password) time.sleep(0.5) except Exception as e: print(f"输入密码失败: {e}") return False # 点击登录按钮 try: # 确保按钮可见和可点击 driver.execute_script("arguments[0].scrollIntoView(true);", login_button) time.sleep(0.5) # 尝试JavaScript点击(更可靠) driver.execute_script("arguments[0].click();", login_button) time.sleep(2) except Exception as e: print(f"点击登录按钮失败: {e}") # 回退到普通点击 try: login_button.click() time.sleep(2) except Exception as e2: print(f"普通点击也失败: {e2}") return False # 等待登录完成,检查是否跳转到主页面 try: # 等待页面跳转或出现成功标识 WebDriverWait(driver, 15).until( lambda driver: driver.current_url != current_url or "login" not in driver.current_url.lower() or len(driver.find_elements(By.XPATH, "//*[contains(text(),'登录成功') or contains(text(),'欢迎')]")) > 0 ) new_url = driver.current_url # 检查是否有错误信息 error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]") if error_elements: print(f"登录错误信息: {error_elements[0].text}") return False print("登录成功!") return True except Exception as e: print(f"登录验证失败: {e}") # 检查是否有错误信息 try: error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]") if error_elements: print(f"登录错误信息: {error_elements[0].text}") else: print("未发现明显错误信息,可能登录成功") return True except: pass return False except Exception as e: print(f"登录过程中出错: {e}") return False def clear_input_box(driver, input_element): """清空输入框""" try: input_element.clear() input_element.send_keys(Keys.CONTROL + "a") input_element.send_keys(Keys.DELETE) driver.execute_script("arguments[0].value = '';", input_element) time.sleep(0.1) return True except Exception: return False def init_driver(headless=False): """初始化Chrome驱动 - 支持Linux无头模式""" import platform import random chrome_opts = Options() # 检测操作系统 system = platform.system().lower() is_linux = system == 'linux' # 强制无头模式(Linux环境或指定headless) if is_linux or headless: chrome_opts.add_argument("--headless=new") print("启用无头模式") # Linux环境专用优化 if is_linux: chrome_opts.add_argument("--no-sandbox") chrome_opts.add_argument("--disable-dev-shm-usage") chrome_opts.add_argument("--disable-gpu") chrome_opts.add_argument("--disable-software-rasterizer") chrome_opts.add_argument("--disable-background-timer-throttling") chrome_opts.add_argument("--disable-backgrounding-occluded-windows") chrome_opts.add_argument("--disable-renderer-backgrounding") chrome_opts.add_argument("--disable-features=TranslateUI") chrome_opts.add_argument("--disable-ipc-flooding-protection") chrome_opts.add_argument("--single-process") # Linux单进程模式 chrome_opts.add_argument("--memory-pressure-off") chrome_opts.add_argument("--max_old_space_size=4096") # 反爬虫机制规避 # 随机用户代理 user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0" ] selected_ua = random.choice(user_agents) chrome_opts.add_argument(f"--user-agent={selected_ua}") # 反检测措施 chrome_opts.add_argument("--disable-blink-features=AutomationControlled") chrome_opts.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_opts.add_experimental_option('useAutomationExtension', False) # 窗口和显示设置 chrome_opts.add_argument("--window-size=1920,1080") chrome_opts.add_argument("--start-maximized") chrome_opts.add_argument("--disable-infobars") chrome_opts.add_argument("--disable-notifications") chrome_opts.add_argument("--disable-popup-blocking") # 性能优化 chrome_opts.add_argument("--disable-images") # 禁用图片加载 chrome_opts.add_argument("--disable-javascript") # 禁用JavaScript(如果需要的话) chrome_opts.add_argument("--disable-plugins") chrome_opts.add_argument("--disable-extensions") chrome_opts.add_argument("--disable-default-apps") chrome_opts.add_argument("--disable-sync") chrome_opts.add_argument("--disable-translate") chrome_opts.add_argument("--hide-scrollbars") chrome_opts.add_argument("--mute-audio") chrome_opts.add_argument("--no-first-run") # 网络和安全设置 chrome_opts.add_argument("--disable-web-security") chrome_opts.add_argument("--disable-features=VizDisplayCompositor") chrome_opts.add_argument("--disable-client-side-phishing-detection") chrome_opts.add_argument("--disable-component-extensions-with-background-pages") chrome_opts.add_argument("--disable-background-networking") # 日志控制 chrome_opts.add_argument("--log-level=3") chrome_opts.add_argument("--silent") chrome_opts.add_experimental_option('excludeSwitches', ['enable-logging']) # 随机端口避免冲突 debug_port = random.randint(9222, 9999) chrome_opts.add_argument(f"--remote-debugging-port={debug_port}") try: # 尝试使用ChromeDriverManager自动下载驱动 print("正在初始化Chrome驱动...") if is_linux: print("检测到Linux环境,使用Linux优化配置") service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=chrome_opts) print("Chrome驱动初始化成功") # 执行反检测脚本 driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})") driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})") except Exception as e: print(f"ChromeDriverManager失败: {e}") print("尝试使用系统PATH中的chromedriver...") try: # 回退到系统PATH中的chromedriver driver = webdriver.Chrome(options=chrome_opts) print("使用系统chromedriver成功") # 执行反检测脚本 driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})") driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})") except Exception as e2: print(f"系统chromedriver也失败: {e2}") if is_linux: print("done") else: print("检查Chrome浏览器和chromedriver") raise Exception("检查Chrome浏览器和chromedriver安装") # 设置窗口大小(无头模式也需要) try: if not is_linux and not headless: driver.maximize_window() else: driver.set_window_size(1920, 1080) except Exception as e: print(f"窗口设置失败: {e}") # 访问登录页面 try: print(f"正在访问: {URL}") driver.get(URL) print("页面加载成功") # 随机等待,模拟人类行为 wait_time = random.uniform(1, 3) time.sleep(wait_time) except Exception as e: print(f"页面加载失败: {e}") raise return driver def scrape_results_from_table(driver): """抓取表格结果""" results = [] # 使用JavaScript直接获取表格数据 try: # 尝试JavaScript方式获取表格数据 table_data = driver.execute_script(""" var tables = document.querySelectorAll('table, .ant-table'); for (var i = 0; i < tables.length; i++) { var table = tables[i]; var rows = table.querySelectorAll('tbody tr, tr'); if (rows.length > 0) { var data = []; for (var j = 0; j < rows.length; j++) { var cells = rows[j].querySelectorAll('td, th, div'); var rowData = []; for (var k = 0; k < cells.length; k++) { var text = cells[k].textContent || cells[k].innerText; if (text && text.trim()) { rowData.push(text.trim()); } } if (rowData.length > 0) { // 过滤掉表头行(包含"ICCID"、"设备ID"等关键词的行) var isHeader = false; var headerKeywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI']; for (var h = 0; h < headerKeywords.length; h++) { if (rowData.some(function(cell) { return cell.includes(headerKeywords[h]); })) { isHeader = true; break; } } if (!isHeader) { data.push(rowData); } } } if (data.length > 0) { return data; } } } return []; """) if table_data and len(table_data) > 0: return table_data except Exception as e: print(f"JavaScript方式失败: {e}") # 如果JavaScript失败,回退到Selenium方式(但优化等待时间) for xpath in RESULT_ROWS_XPATH: try: rows = WebDriverWait(driver, 2).until(EC.presence_of_all_elements_located((By.XPATH, xpath))) # 减少等待时间 if rows: print(f"Selenium方式找到 {len(rows)} 行数据") break except Exception: continue else: print("未找到结果表格") return [] # 优化单元格获取方式 for r in rows: try: # 优先使用JavaScript获取文本,更快 cell_texts = driver.execute_script(""" var cells = arguments[0].querySelectorAll('td, th, div'); var texts = []; for (var i = 0; i < cells.length; i++) { var text = cells[i].textContent || cells[i].innerText; if (text && text.trim()) { texts.push(text.trim()); } } return texts; """, r) if cell_texts and len(cell_texts) > 0: # 过滤掉表头行 header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI'] is_header = any(keyword in cell for cell in cell_texts for keyword in header_keywords) if not is_header: results.append(cell_texts) except Exception: # 回退到传统方式 try: cells = r.find_elements(By.TAG_NAME, "td") if not cells: cells = r.find_elements(By.TAG_NAME, "th") if not cells: cells = r.find_elements(By.XPATH, ".//div") texts = [c.text.strip() for c in cells if c.text.strip()] if texts: # 过滤掉表头行 header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI'] is_header = any(keyword in text for text in texts for keyword in header_keywords) if not is_header: results.append(texts) except Exception: continue return results def submit_batch_and_collect(driver, batch_items): """提交批次查询并收集结果 - 增强反爬虫机制""" import random if len(batch_items) > MAX_PER_BATCH: batch_items = batch_items[:MAX_PER_BATCH] # 随机等待,模拟人类行为 wait_time = random.uniform(0.5, 2.0) time.sleep(wait_time) # 尝试打开批量查询弹窗 try: batch_btn = WebDriverWait(driver, 0.3).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH))) batch_btn.click() time.sleep(random.uniform(0.3, 0.8)) except Exception: pass # 查找输入框 try: inp = WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH))) except Exception: print("未找到输入框") return [] # 清空并输入数据 if not clear_input_box(driver, inp): return [] # 模拟人类输入行为 - 分批输入 payload = "\n".join(batch_items) # 分批输入,模拟人类打字 chunk_size = 50 # 每批输入10个ICCID chunks = [batch_items[i:i+chunk_size] for i in range(0, len(batch_items), chunk_size)] for i, chunk in enumerate(chunks): chunk_payload = "\n".join(chunk) if i == 0: inp.send_keys(chunk_payload) else: inp.send_keys("\n" + chunk_payload) # 随机等待,模拟人类输入间隔 if i < len(chunks) - 1: wait_time = random.uniform(0.1, 0.3) time.sleep(wait_time) # 随机等待,模拟人类思考时间 time.sleep(random.uniform(0.5, 1.5)) # 点击查询按钮 try: btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH))) btn.click() except Exception: return [] # 随机等待后处理 time.sleep(random.uniform(0.5, 1.0)) # 检查并关闭可能出现的"不存在"弹窗 try: # 使用精确的XPath关闭弹窗 close_btn_xpath = '/html/body/div[2]/div/div[2]/div/div[2]/div/button' try: close_btn = WebDriverWait(driver, 0.5).until(EC.element_to_be_clickable((By.XPATH, close_btn_xpath))) close_btn.click() print("已关闭'不存在'弹窗") time.sleep(0.5) except Exception: # 如果精确XPath失败,尝试其他可能的关闭按钮 alternative_selectors = [ '//div[contains(text(),"批量查询结果")]//button', '//div[contains(text(),"以下SIM卡不存在")]//button', '//div[contains(@class,"modal")]//button[contains(@class,"close")]', '//div[contains(@class,"ant-modal")]//button[contains(@class,"close")]' ] for selector in alternative_selectors: try: close_btn = WebDriverWait(driver, 0.2).until(EC.element_to_be_clickable((By.XPATH, selector))) close_btn.click() print("已关闭'不存在'弹窗(备用方式)") time.sleep(0.1) break except Exception: continue except Exception: pass # 抓取结果 results = scrape_results_from_table(driver) # 清理输入框 try: inp = driver.find_element(By.XPATH, INPUT_XPATH) driver.execute_script("arguments[0].value = '';", inp) except Exception: pass return results def worker_thread(thread_id, assigned_batches, driver, shared_results, lock): """工作线程函数 - 处理预先分配的批次任务""" print(f"线程 {thread_id} 启动,分配了 {len(assigned_batches)} 个批次") thread_results = [] for batch_index, batch_items in assigned_batches: print(f"线程 {thread_id} 开始处理批次 {batch_index}") try: results = submit_batch_and_collect(driver, batch_items) if results: batch_results = [{"batch": batch_index, "cells": row_cells} for row_cells in results] thread_results.append((batch_index, batch_results, len(results))) print(f"线程 {thread_id} 完成批次 {batch_index},获得 {len(results)} 条结果") else: thread_results.append((batch_index, [], 0)) print(f"线程 {thread_id} 完成批次 {batch_index},无结果") except Exception as e: print(f"线程 {thread_id} 处理批次 {batch_index} 失败: {e}") thread_results.append((batch_index, [], 0)) # 减少等待时间,提高效率 import random time.sleep(random.uniform(0.1, 0.3)) # 线程安全地将结果添加到共享列表 with lock: shared_results.extend(thread_results) print(f"线程 {thread_id} 完成所有分配任务,共处理 {len(thread_results)} 个批次") def main(): """多线程主函数""" start_time = time.time() init_start_time = time.time() query_items = read_query_items(ICCID_FILE) if not query_items: print(f"在 {ICCID_FILE} 中未找到查询项") return print(f"总共读取到 {len(query_items)} 个查询项") batches = [query_items[i:i+MAX_PER_BATCH] for i in range(0, len(query_items), MAX_PER_BATCH)] print(f"将分为 {len(batches)} 个批次处理") print(f"使用 {MAX_THREADS} 个线程并行处理") # 预先分配批次给各个线程 batches_with_index = [(i+1, batch) for i, batch in enumerate(batches)] # 将批次分配给线程 thread_assignments = [] batches_per_thread = len(batches) // MAX_THREADS remaining_batches = len(batches) % MAX_THREADS start_idx = 0 for thread_id in range(MAX_THREADS): # 计算当前线程应处理的批次数量 current_batch_count = batches_per_thread if thread_id < remaining_batches: current_batch_count += 1 # 分配批次 assigned_batches = batches_with_index[start_idx:start_idx + current_batch_count] thread_assignments.append(assigned_batches) start_idx += current_batch_count print(f"线程 {thread_id + 1} 分配了 {len(assigned_batches)} 个批次: {[b[0] for b in assigned_batches]}") # 创建共享结果列表和锁 shared_results = [] results_lock = threading.Lock() # 初始化多个驱动实例 drivers = [] for i in range(MAX_THREADS): driver = None max_retries = 3 for retry in range(max_retries): try: print(f"初始化线程 {i+1} 的Chrome驱动 (第 {retry + 1}/{max_retries} 次)...") driver = init_driver(HEADLESS) break except Exception as e: print(f"线程 {i+1} 第 {retry + 1} 次初始化失败: {e}") if retry == max_retries - 1: print(f"线程 {i+1} 所有初始化尝试都失败了") return time.sleep(3) if driver is None: print(f"线程 {i+1} 无法初始化Chrome驱动,程序退出") return drivers.append(driver) # 并行登录所有驱动 print("开始并行登录所有线程...") login_start_time = time.time() def login_driver(driver_info): """登录单个驱动的函数""" thread_id, driver = driver_info try: if login(driver, USERNAME, PASSWORD): print(f"线程 {thread_id} 登录成功") return thread_id, True, None else: print(f"线程 {thread_id} 登录失败") return thread_id, False, "登录失败" except Exception as e: print(f"线程 {thread_id} 登录异常: {e}") return thread_id, False, str(e) # 使用线程池并行登录 login_results = [] with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: # 提交所有登录任务 driver_infos = [(i+1, driver) for i, driver in enumerate(drivers)] future_to_thread = {executor.submit(login_driver, info): info[0] for info in driver_infos} # 收集登录结果 for future in as_completed(future_to_thread): thread_id, success, error = future.result() login_results.append((thread_id, success, error)) # 检查登录结果 failed_logins = [result for result in login_results if not result[1]] if failed_logins: print(f"❌ {len(failed_logins)} 个线程登录失败:") for thread_id, _, error in failed_logins: print(f" - 线程 {thread_id}: {error}") print("程序退出") return login_end_time = time.time() login_duration = login_end_time - login_start_time print(f"✅ 所有 {MAX_THREADS} 个线程登录成功") print(f"⏱️ 并行登录耗时: {login_duration:.2f}秒") print("开始批量查询...") # 记录初始化完成时间 init_end_time = time.time() init_duration = init_end_time - init_start_time print(f"⏱️ 总初始化耗时: {init_duration:.2f}秒") # 启动工作线程 process_start_time = time.time() threads = [] for i in range(MAX_THREADS): thread = threading.Thread( target=worker_thread, args=(i+1, thread_assignments[i], drivers[i], shared_results, results_lock) ) thread.start() threads.append(thread) print("所有线程已启动,等待完成...") # 等待所有线程完成 for thread in threads: thread.join() print("所有线程已完成,开始按批次顺序保存结果...") # 按批次号排序并保存结果 shared_results.sort(key=lambda x: x[0]) # 按批次号排序 total_saved_results = 0 is_first_batch = True for batch_index, batch_results, result_count in shared_results: if batch_results: saved_count = save_results_to_csv(batch_results, OUTPUT_CSV, is_first_batch) total_saved_results += saved_count print(f"✅ 已保存批次 {batch_index} 的 {saved_count} 条结果") is_first_batch = False else: print(f"批次 {batch_index} 无结果") # 关闭所有驱动 print("关闭所有Chrome驱动...") for i, driver in enumerate(drivers): try: driver.quit() print(f"线程 {i+1} 驱动已关闭") except Exception as e: print(f"关闭线程 {i+1} 驱动时出错: {e}") # 生成总结报告 end_time = time.time() total_time = end_time - start_time process_time = end_time - process_start_time avg_time_per_batch = total_time / len(batches) if len(batches) > 0 else 0 print(f"\n{'='*50}") print(f"📊 多线程批量查询完成报告") print(f"{'='*50}") print(f"总查询项: {len(query_items)}") print(f"总批次数: {len(batches)}") print(f"使用线程数: {MAX_THREADS}") print(f"完成批次: {len(batches)}") print(f"总保存结果数: {total_saved_results}") print(f"⏱️ 总初始化耗时: {init_duration:.2f}秒") print(f"⏱️ 并行登录耗时: {login_duration:.2f}秒") print(f"⏱️ 处理耗时: {process_time:.2f}秒") print(f"⏱️ 总耗时: {total_time:.2f}秒") print(f"⏱️ 平均每批次: {avg_time_per_batch:.2f}秒") # 计算实际性能提升 estimated_single_thread_time = avg_time_per_batch * len(batches) actual_speedup = estimated_single_thread_time / total_time if total_time > 0 else 1 print(f"🚀 实际性能提升: {actual_speedup:.2f}x") print(f"{'='*50}") print(f"✅ 所有结果已按顺序保存到 {OUTPUT_CSV}") if __name__ == "__main__": main()