From 2c77fd783dee56e3eb2497436cdd9c284687b955 Mon Sep 17 00:00:00 2001 From: zhaojie <1710884619@qq.com> Date: Wed, 1 Oct 2025 00:11:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20pa=5Fupgrade.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pa_upgrade.py | 886 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 886 insertions(+) create mode 100644 pa_upgrade.py diff --git a/pa_upgrade.py b/pa_upgrade.py new file mode 100644 index 0000000..59de12f --- /dev/null +++ b/pa_upgrade.py @@ -0,0 +1,886 @@ +#多线程 +import os +import time +import pandas as pd +import threading +import queue +from concurrent.futures import ThreadPoolExecutor, as_completed +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from webdriver_manager.chrome import ChromeDriverManager +from selenium.webdriver.chrome.service import Service + +# 配置 +URL = "https://prod-eu-cmp.simbalinkglobal.com" +INPUT_XPATH = '//textarea[@id="iccidList"] | //input[@id="iccidList"] | //*[@id="iccidList"]' +CONFIRM_BTN_XPATH = '//button[contains(text(),"查询")] | //button[contains(text(),"确认")] | //*[@id="pop-confirm"]//button[2] | //button[@type="submit"]' +BATCH_QUERY_BTN_XPATH = '/html/body/div/div[2]/main/div/div/div/div/div[2]/div[2]/div[1]/div[2]/div/button' +RESULT_ROWS_XPATH = [ + '//table[contains(@class,"ant-table")]//tbody/tr', + '//table//tbody/tr', + '//div[contains(@class,"ant-table")]//tbody/tr', + '//table[@class="table"]//tbody/tr', + '//*[contains(@class,"table")]//tr[position()>1]' +] + +MAX_PER_BATCH = 50 +OUTPUT_CSV = "results.csv" +ICCID_FILE = "text.txt" +HEADLESS = True # Linux环境默认启用无头模式 +BATCH_RETRY_COUNT = 2 + +# 多线程配置 +MAX_THREADS = 3 # 最大线程数(优化:减少线程数) +THREAD_BATCH_SIZE = 100 # 每个线程处理的批次大小 + +# 登录配置 +USERNAME = "xiongshi_yunwei" # 请替换为实际用户名 +PASSWORD = "Cm20p20Nmo!25" # 请替换为实际密码 + +# 登录页面元素XPath - 根据实际页面元素更新 +USERNAME_XPATH = "/html/body/div/div[2]/div/form/div[1]/div/div/div/div/span/input" +PASSWORD_XPATH = "//*[@id='password']" +LOGIN_BUTTON_XPATH = "/html/body/div/div[2]/div/form/button" + +def read_query_items(path): + """读取查询项目文件""" + encodings = ['utf-8', 'gbk', 'utf-8-sig', 'cp1252'] + for encoding in encodings: + try: + with open(path, 'r', encoding=encoding) as f: + lines = [l.strip() for l in f.readlines() if l.strip()] + if lines: + return lines + except Exception: + continue + raise Exception("无法读取文件") + +# 线程安全的文件写入锁 +file_lock = threading.Lock() + +def save_results_to_csv(results, filename, is_first_batch=False): + """线程安全地保存结果到CSV文件""" + if not results: + return 0 + + # 构建DataFrame + df_data = [] + for result in results: + row_data = {"batch": result["batch"]} + cells = result["cells"] + # + for i, cell in enumerate(cells): + row_data[f"col_{i}"] = cell + df_data.append(row_data) + + # 创建DataFrame + df = pd.DataFrame(df_data) + + # 使用锁确保线程安全 + with file_lock: + try: + if is_first_batch: + # 第一次保存时,手动添加表头 + header_data = { + "batch": "批次号", + "col_0": "ICCID", + "col_1": "设备ID", + "col_2": "生命周期", + "col_3": "周期用量(MB)", + "col_4": "流量上限(MB)", + "col_5": "在用套餐", + "col_6": "租户", + "col_7": "服务状态", + "col_8": "激活时间", + "col_9": "MSISDN", + "col_10": "IMSI" + } + # 创建表头DataFrame + header_df = pd.DataFrame([header_data]) + # 先写入表头,再写入数据 + header_df.to_csv(filename, index=False, encoding='utf-8-sig') + df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig') + else: + # 后续批次只追加数据 + df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig') + except Exception as e: + print(f"保存文件时出错: {e}") + return 0 + + return len(df_data) + +def login(driver, username, password): + """使用用户名密码登录""" + try: + time.sleep(3) + current_url = driver.current_url + + # 查找用户名输入框 - 使用更灵活的方式 + username_input = None + username_selectors = [ + USERNAME_XPATH, # 您提供的精确XPath + "//input[@id='username']", # 根据您提供的id + "//input[@placeholder='请输入账号']", # 根据您提供的placeholder + "//input[@type='text']", + "//input[@placeholder*='用户名']", + "//input[@placeholder*='账号']", + "//input[@placeholder*='user']", + "//input[@placeholder*='email']", + "//input[@name='username']", + "//input[@name='user']" + ] + + for selector in username_selectors: + try: + username_input = WebDriverWait(driver, 2).until( + EC.element_to_be_clickable((By.XPATH, selector)) + ) + break + except Exception: + continue + + if not username_input: + print("未找到用户名输入框") + return False + + # 查找密码输入框 - 使用更灵活的方式 + password_input = None + password_selectors = [ + PASSWORD_XPATH, # 您提供的精确XPath + "//input[@id='password']", # 根据您提供的id + "//input[@placeholder='请输入密码']", # 根据您提供的placeholder + "//input[@type='password']", + "//input[@placeholder*='密码']", + "//input[@placeholder*='password']", + "//input[@name='password']", + "//input[@name='pwd']" + ] + + for selector in password_selectors: + try: + password_input = WebDriverWait(driver, 2).until( + EC.element_to_be_clickable((By.XPATH, selector)) + ) + break + except Exception: + continue + + if not password_input: + print("未找到密码输入框") + return False + + # 查找登录按钮 - 使用更灵活的方式 + login_button = None + login_selectors = [ + LOGIN_BUTTON_XPATH, # 您提供的精确XPath + "//button[@type='submit']", + "//button[contains(text(),'登录')]", + "//button[contains(text(),'Login')]", + "//button[contains(text(),'登入')]", + "//input[@type='submit']", + "//button[contains(@class,'login')]", + "//button[contains(@class,'submit')]" + ] + + for selector in login_selectors: + try: + login_button = WebDriverWait(driver, 2).until( + EC.element_to_be_clickable((By.XPATH, selector)) + ) + break + except Exception: + continue + + if not login_button: + print("未找到登录按钮") + return False + + # 清空并输入用户名 + try: + # 先点击输入框确保焦点 + username_input.click() + time.sleep(0.5) + + # 清空输入框 + username_input.clear() + time.sleep(0.1) + + # 使用JavaScript设置值(更可靠) + driver.execute_script("arguments[0].value = '';", username_input) + username_input.send_keys(username) + time.sleep(0.5) + + print(f"已输入用户名: {username}") + except Exception as e: + print(f"输入用户名失败: {e}") + return False + + # 清空并输入密码 + try: + # 先点击输入框确保焦点 + password_input.click() + time.sleep(0.2) + + # 清空输入框 + password_input.clear() + time.sleep(0.1) + + # 使用JavaScript设置值(更可靠) + driver.execute_script("arguments[0].value = '';", password_input) + password_input.send_keys(password) + time.sleep(0.5) + except Exception as e: + print(f"输入密码失败: {e}") + return False + + # 点击登录按钮 + try: + # 确保按钮可见和可点击 + driver.execute_script("arguments[0].scrollIntoView(true);", login_button) + time.sleep(0.5) + + # 尝试JavaScript点击(更可靠) + driver.execute_script("arguments[0].click();", login_button) + time.sleep(2) + except Exception as e: + print(f"点击登录按钮失败: {e}") + # 回退到普通点击 + try: + login_button.click() + time.sleep(2) + except Exception as e2: + print(f"普通点击也失败: {e2}") + return False + + # 等待登录完成,检查是否跳转到主页面 + try: + # 等待页面跳转或出现成功标识 + WebDriverWait(driver, 15).until( + lambda driver: driver.current_url != current_url or + "login" not in driver.current_url.lower() or + len(driver.find_elements(By.XPATH, "//*[contains(text(),'登录成功') or contains(text(),'欢迎')]")) > 0 + ) + + new_url = driver.current_url + + # 检查是否有错误信息 + error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]") + if error_elements: + print(f"登录错误信息: {error_elements[0].text}") + return False + + print("登录成功!") + return True + + except Exception as e: + print(f"登录验证失败: {e}") + # 检查是否有错误信息 + try: + error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]") + if error_elements: + print(f"登录错误信息: {error_elements[0].text}") + else: + print("未发现明显错误信息,可能登录成功") + return True + except: + pass + return False + + except Exception as e: + print(f"登录过程中出错: {e}") + return False + +def clear_input_box(driver, input_element): + """清空输入框""" + try: + input_element.clear() + input_element.send_keys(Keys.CONTROL + "a") + input_element.send_keys(Keys.DELETE) + driver.execute_script("arguments[0].value = '';", input_element) + time.sleep(0.1) + return True + except Exception: + return False + +def init_driver(headless=False): + """初始化Chrome驱动 - 支持Linux无头模式""" + import platform + import random + + chrome_opts = Options() + + # 检测操作系统 + system = platform.system().lower() + is_linux = system == 'linux' + + # 强制无头模式(Linux环境或指定headless) + if is_linux or headless: + chrome_opts.add_argument("--headless=new") + print("启用无头模式") + + # Linux环境专用优化 + if is_linux: + chrome_opts.add_argument("--no-sandbox") + chrome_opts.add_argument("--disable-dev-shm-usage") + chrome_opts.add_argument("--disable-gpu") + chrome_opts.add_argument("--disable-software-rasterizer") + chrome_opts.add_argument("--disable-background-timer-throttling") + chrome_opts.add_argument("--disable-backgrounding-occluded-windows") + chrome_opts.add_argument("--disable-renderer-backgrounding") + chrome_opts.add_argument("--disable-features=TranslateUI") + chrome_opts.add_argument("--disable-ipc-flooding-protection") + chrome_opts.add_argument("--single-process") # Linux单进程模式 + chrome_opts.add_argument("--memory-pressure-off") + chrome_opts.add_argument("--max_old_space_size=4096") + + # 反爬虫机制规避 + # 随机用户代理 + user_agents = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0" + ] + selected_ua = random.choice(user_agents) + chrome_opts.add_argument(f"--user-agent={selected_ua}") + + # 反检测措施 + chrome_opts.add_argument("--disable-blink-features=AutomationControlled") + chrome_opts.add_experimental_option("excludeSwitches", ["enable-automation"]) + chrome_opts.add_experimental_option('useAutomationExtension', False) + + # 窗口和显示设置 + chrome_opts.add_argument("--window-size=1920,1080") + chrome_opts.add_argument("--start-maximized") + chrome_opts.add_argument("--disable-infobars") + chrome_opts.add_argument("--disable-notifications") + chrome_opts.add_argument("--disable-popup-blocking") + + # 性能优化 + chrome_opts.add_argument("--disable-images") # 禁用图片加载 + chrome_opts.add_argument("--disable-javascript") # 禁用JavaScript(如果需要的话) + chrome_opts.add_argument("--disable-plugins") + chrome_opts.add_argument("--disable-extensions") + chrome_opts.add_argument("--disable-default-apps") + chrome_opts.add_argument("--disable-sync") + chrome_opts.add_argument("--disable-translate") + chrome_opts.add_argument("--hide-scrollbars") + chrome_opts.add_argument("--mute-audio") + chrome_opts.add_argument("--no-first-run") + + # 网络和安全设置 + chrome_opts.add_argument("--disable-web-security") + chrome_opts.add_argument("--disable-features=VizDisplayCompositor") + chrome_opts.add_argument("--disable-client-side-phishing-detection") + chrome_opts.add_argument("--disable-component-extensions-with-background-pages") + chrome_opts.add_argument("--disable-background-networking") + + # 日志控制 + chrome_opts.add_argument("--log-level=3") + chrome_opts.add_argument("--silent") + chrome_opts.add_experimental_option('excludeSwitches', ['enable-logging']) + + # 随机端口避免冲突 + debug_port = random.randint(9222, 9999) + chrome_opts.add_argument(f"--remote-debugging-port={debug_port}") + + try: + # 尝试使用ChromeDriverManager自动下载驱动 + print("正在初始化Chrome驱动...") + if is_linux: + print("检测到Linux环境,使用Linux优化配置") + + service = Service(ChromeDriverManager().install()) + driver = webdriver.Chrome(service=service, options=chrome_opts) + print("Chrome驱动初始化成功") + + # 执行反检测脚本 + driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})") + driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})") + + except Exception as e: + print(f"ChromeDriverManager失败: {e}") + print("尝试使用系统PATH中的chromedriver...") + try: + # 回退到系统PATH中的chromedriver + driver = webdriver.Chrome(options=chrome_opts) + print("使用系统chromedriver成功") + + # 执行反检测脚本 + driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})") + driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})") + + except Exception as e2: + print(f"系统chromedriver也失败: {e2}") + if is_linux: + print("done") + else: + print("检查Chrome浏览器和chromedriver") + raise Exception("检查Chrome浏览器和chromedriver安装") + + # 设置窗口大小(无头模式也需要) + try: + if not is_linux and not headless: + driver.maximize_window() + else: + driver.set_window_size(1920, 1080) + + except Exception as e: + print(f"窗口设置失败: {e}") + + # 访问登录页面 + try: + print(f"正在访问: {URL}") + driver.get(URL) + print("页面加载成功") + + # 随机等待,模拟人类行为 + wait_time = random.uniform(1, 3) + time.sleep(wait_time) + + except Exception as e: + print(f"页面加载失败: {e}") + raise + + return driver + +def scrape_results_from_table(driver): + """抓取表格结果""" + results = [] + + # 使用JavaScript直接获取表格数据 + try: + # 尝试JavaScript方式获取表格数据 + table_data = driver.execute_script(""" + var tables = document.querySelectorAll('table, .ant-table'); + for (var i = 0; i < tables.length; i++) { + var table = tables[i]; + var rows = table.querySelectorAll('tbody tr, tr'); + if (rows.length > 0) { + var data = []; + for (var j = 0; j < rows.length; j++) { + var cells = rows[j].querySelectorAll('td, th, div'); + var rowData = []; + for (var k = 0; k < cells.length; k++) { + var text = cells[k].textContent || cells[k].innerText; + if (text && text.trim()) { + rowData.push(text.trim()); + } + } + if (rowData.length > 0) { + // 过滤掉表头行(包含"ICCID"、"设备ID"等关键词的行) + var isHeader = false; + var headerKeywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI']; + for (var h = 0; h < headerKeywords.length; h++) { + if (rowData.some(function(cell) { return cell.includes(headerKeywords[h]); })) { + isHeader = true; + break; + } + } + if (!isHeader) { + data.push(rowData); + } + } + } + if (data.length > 0) { + return data; + } + } + } + return []; + """) + + if table_data and len(table_data) > 0: + return table_data + except Exception as e: + print(f"JavaScript方式失败: {e}") + + # 如果JavaScript失败,回退到Selenium方式(但优化等待时间) + for xpath in RESULT_ROWS_XPATH: + try: + rows = WebDriverWait(driver, 2).until(EC.presence_of_all_elements_located((By.XPATH, xpath))) # 减少等待时间 + if rows: + print(f"Selenium方式找到 {len(rows)} 行数据") + break + except Exception: + continue + else: + print("未找到结果表格") + return [] + + # 优化单元格获取方式 + for r in rows: + try: + # 优先使用JavaScript获取文本,更快 + cell_texts = driver.execute_script(""" + var cells = arguments[0].querySelectorAll('td, th, div'); + var texts = []; + for (var i = 0; i < cells.length; i++) { + var text = cells[i].textContent || cells[i].innerText; + if (text && text.trim()) { + texts.push(text.trim()); + } + } + return texts; + """, r) + + if cell_texts and len(cell_texts) > 0: + # 过滤掉表头行 + header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI'] + is_header = any(keyword in cell for cell in cell_texts for keyword in header_keywords) + if not is_header: + results.append(cell_texts) + except Exception: + # 回退到传统方式 + try: + cells = r.find_elements(By.TAG_NAME, "td") + if not cells: + cells = r.find_elements(By.TAG_NAME, "th") + if not cells: + cells = r.find_elements(By.XPATH, ".//div") + + texts = [c.text.strip() for c in cells if c.text.strip()] + if texts: + # 过滤掉表头行 + header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI'] + is_header = any(keyword in text for text in texts for keyword in header_keywords) + if not is_header: + results.append(texts) + except Exception: + continue + + return results + +def submit_batch_and_collect(driver, batch_items): + """提交批次查询并收集结果 - 增强反爬虫机制""" + import random + + if len(batch_items) > MAX_PER_BATCH: + batch_items = batch_items[:MAX_PER_BATCH] + + # 随机等待,模拟人类行为 + wait_time = random.uniform(0.5, 2.0) + time.sleep(wait_time) + + # 尝试打开批量查询弹窗 + try: + batch_btn = WebDriverWait(driver, 0.3).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH))) + batch_btn.click() + time.sleep(random.uniform(0.3, 0.8)) + except Exception: + pass + + # 查找输入框 + try: + inp = WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH))) + except Exception: + print("未找到输入框") + return [] + + # 清空并输入数据 + if not clear_input_box(driver, inp): + return [] + + # 模拟人类输入行为 - 分批输入 + payload = "\n".join(batch_items) + + # 分批输入,模拟人类打字 + chunk_size = 50 # 每批输入10个ICCID + chunks = [batch_items[i:i+chunk_size] for i in range(0, len(batch_items), chunk_size)] + + for i, chunk in enumerate(chunks): + chunk_payload = "\n".join(chunk) + if i == 0: + inp.send_keys(chunk_payload) + else: + inp.send_keys("\n" + chunk_payload) + + # 随机等待,模拟人类输入间隔 + if i < len(chunks) - 1: + wait_time = random.uniform(0.1, 0.3) + time.sleep(wait_time) + + # 随机等待,模拟人类思考时间 + time.sleep(random.uniform(0.5, 1.5)) + + # 点击查询按钮 + try: + btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH))) + btn.click() + except Exception: + return [] + + # 随机等待后处理 + time.sleep(random.uniform(0.5, 1.0)) + + # 检查并关闭可能出现的"不存在"弹窗 + try: + # 使用精确的XPath关闭弹窗 + close_btn_xpath = '/html/body/div[2]/div/div[2]/div/div[2]/div/button' + try: + close_btn = WebDriverWait(driver, 0.5).until(EC.element_to_be_clickable((By.XPATH, close_btn_xpath))) + close_btn.click() + print("已关闭'不存在'弹窗") + time.sleep(0.5) + except Exception: + # 如果精确XPath失败,尝试其他可能的关闭按钮 + alternative_selectors = [ + '//div[contains(text(),"批量查询结果")]//button', + '//div[contains(text(),"以下SIM卡不存在")]//button', + '//div[contains(@class,"modal")]//button[contains(@class,"close")]', + '//div[contains(@class,"ant-modal")]//button[contains(@class,"close")]' + ] + + for selector in alternative_selectors: + try: + close_btn = WebDriverWait(driver, 0.2).until(EC.element_to_be_clickable((By.XPATH, selector))) + close_btn.click() + print("已关闭'不存在'弹窗(备用方式)") + time.sleep(0.1) + break + except Exception: + continue + + except Exception: + pass + + # 抓取结果 + results = scrape_results_from_table(driver) + + # 清理输入框 + try: + inp = driver.find_element(By.XPATH, INPUT_XPATH) + driver.execute_script("arguments[0].value = '';", inp) + except Exception: + pass + + return results + +def worker_thread(thread_id, assigned_batches, driver, shared_results, lock): + """工作线程函数 - 处理预先分配的批次任务""" + print(f"线程 {thread_id} 启动,分配了 {len(assigned_batches)} 个批次") + + thread_results = [] + + for batch_index, batch_items in assigned_batches: + print(f"线程 {thread_id} 开始处理批次 {batch_index}") + + try: + results = submit_batch_and_collect(driver, batch_items) + if results: + batch_results = [{"batch": batch_index, "cells": row_cells} for row_cells in results] + thread_results.append((batch_index, batch_results, len(results))) + print(f"线程 {thread_id} 完成批次 {batch_index},获得 {len(results)} 条结果") + else: + thread_results.append((batch_index, [], 0)) + print(f"线程 {thread_id} 完成批次 {batch_index},无结果") + + except Exception as e: + print(f"线程 {thread_id} 处理批次 {batch_index} 失败: {e}") + thread_results.append((batch_index, [], 0)) + + # 减少等待时间,提高效率 + import random + time.sleep(random.uniform(0.1, 0.3)) + + # 线程安全地将结果添加到共享列表 + with lock: + shared_results.extend(thread_results) + + print(f"线程 {thread_id} 完成所有分配任务,共处理 {len(thread_results)} 个批次") + +def main(): + """多线程主函数""" + start_time = time.time() + init_start_time = time.time() + + query_items = read_query_items(ICCID_FILE) + if not query_items: + print(f"在 {ICCID_FILE} 中未找到查询项") + return + + print(f"总共读取到 {len(query_items)} 个查询项") + batches = [query_items[i:i+MAX_PER_BATCH] for i in range(0, len(query_items), MAX_PER_BATCH)] + print(f"将分为 {len(batches)} 个批次处理") + print(f"使用 {MAX_THREADS} 个线程并行处理") + + # 预先分配批次给各个线程 + batches_with_index = [(i+1, batch) for i, batch in enumerate(batches)] + + # 将批次分配给线程 + thread_assignments = [] + batches_per_thread = len(batches) // MAX_THREADS + remaining_batches = len(batches) % MAX_THREADS + + start_idx = 0 + for thread_id in range(MAX_THREADS): + # 计算当前线程应处理的批次数量 + current_batch_count = batches_per_thread + if thread_id < remaining_batches: + current_batch_count += 1 + + # 分配批次 + assigned_batches = batches_with_index[start_idx:start_idx + current_batch_count] + thread_assignments.append(assigned_batches) + start_idx += current_batch_count + + print(f"线程 {thread_id + 1} 分配了 {len(assigned_batches)} 个批次: {[b[0] for b in assigned_batches]}") + + # 创建共享结果列表和锁 + shared_results = [] + results_lock = threading.Lock() + + # 初始化多个驱动实例 + drivers = [] + for i in range(MAX_THREADS): + driver = None + max_retries = 3 + for retry in range(max_retries): + try: + print(f"初始化线程 {i+1} 的Chrome驱动 (第 {retry + 1}/{max_retries} 次)...") + driver = init_driver(HEADLESS) + break + except Exception as e: + print(f"线程 {i+1} 第 {retry + 1} 次初始化失败: {e}") + if retry == max_retries - 1: + print(f"线程 {i+1} 所有初始化尝试都失败了") + return + time.sleep(3) + + if driver is None: + print(f"线程 {i+1} 无法初始化Chrome驱动,程序退出") + return + + drivers.append(driver) + + # 并行登录所有驱动 + print("开始并行登录所有线程...") + login_start_time = time.time() + + def login_driver(driver_info): + """登录单个驱动的函数""" + thread_id, driver = driver_info + try: + if login(driver, USERNAME, PASSWORD): + print(f"线程 {thread_id} 登录成功") + return thread_id, True, None + else: + print(f"线程 {thread_id} 登录失败") + return thread_id, False, "登录失败" + except Exception as e: + print(f"线程 {thread_id} 登录异常: {e}") + return thread_id, False, str(e) + + # 使用线程池并行登录 + login_results = [] + with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: + # 提交所有登录任务 + driver_infos = [(i+1, driver) for i, driver in enumerate(drivers)] + future_to_thread = {executor.submit(login_driver, info): info[0] for info in driver_infos} + + # 收集登录结果 + for future in as_completed(future_to_thread): + thread_id, success, error = future.result() + login_results.append((thread_id, success, error)) + + # 检查登录结果 + failed_logins = [result for result in login_results if not result[1]] + if failed_logins: + print(f"❌ {len(failed_logins)} 个线程登录失败:") + for thread_id, _, error in failed_logins: + print(f" - 线程 {thread_id}: {error}") + print("程序退出") + return + + login_end_time = time.time() + login_duration = login_end_time - login_start_time + print(f"✅ 所有 {MAX_THREADS} 个线程登录成功") + print(f"⏱️ 并行登录耗时: {login_duration:.2f}秒") + print("开始批量查询...") + + # 记录初始化完成时间 + init_end_time = time.time() + init_duration = init_end_time - init_start_time + print(f"⏱️ 总初始化耗时: {init_duration:.2f}秒") + + # 启动工作线程 + process_start_time = time.time() + threads = [] + for i in range(MAX_THREADS): + thread = threading.Thread( + target=worker_thread, + args=(i+1, thread_assignments[i], drivers[i], shared_results, results_lock) + ) + thread.start() + threads.append(thread) + + print("所有线程已启动,等待完成...") + + # 等待所有线程完成 + for thread in threads: + thread.join() + + print("所有线程已完成,开始按批次顺序保存结果...") + + # 按批次号排序并保存结果 + shared_results.sort(key=lambda x: x[0]) # 按批次号排序 + + total_saved_results = 0 + is_first_batch = True + + for batch_index, batch_results, result_count in shared_results: + if batch_results: + saved_count = save_results_to_csv(batch_results, OUTPUT_CSV, is_first_batch) + total_saved_results += saved_count + print(f"✅ 已保存批次 {batch_index} 的 {saved_count} 条结果") + is_first_batch = False + else: + print(f"批次 {batch_index} 无结果") + + + # 关闭所有驱动 + print("关闭所有Chrome驱动...") + for i, driver in enumerate(drivers): + try: + driver.quit() + print(f"线程 {i+1} 驱动已关闭") + except Exception as e: + print(f"关闭线程 {i+1} 驱动时出错: {e}") + + # 生成总结报告 + end_time = time.time() + total_time = end_time - start_time + process_time = end_time - process_start_time + avg_time_per_batch = total_time / len(batches) if len(batches) > 0 else 0 + + print(f"\n{'='*50}") + print(f"📊 多线程批量查询完成报告") + print(f"{'='*50}") + print(f"总查询项: {len(query_items)}") + print(f"总批次数: {len(batches)}") + print(f"使用线程数: {MAX_THREADS}") + print(f"完成批次: {len(batches)}") + print(f"总保存结果数: {total_saved_results}") + print(f"⏱️ 总初始化耗时: {init_duration:.2f}秒") + print(f"⏱️ 并行登录耗时: {login_duration:.2f}秒") + print(f"⏱️ 处理耗时: {process_time:.2f}秒") + print(f"⏱️ 总耗时: {total_time:.2f}秒") + print(f"⏱️ 平均每批次: {avg_time_per_batch:.2f}秒") + + # 计算实际性能提升 + estimated_single_thread_time = avg_time_per_batch * len(batches) + actual_speedup = estimated_single_thread_time / total_time if total_time > 0 else 1 + print(f"🚀 实际性能提升: {actual_speedup:.2f}x") + + print(f"{'='*50}") + print(f"✅ 所有结果已按顺序保存到 {OUTPUT_CSV}") + +if __name__ == "__main__": + main() \ No newline at end of file