Files
pa/pa_upgrade.py
2025-10-01 00:11:49 +08:00

886 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#多线程
import os
import time
import pandas as pd
import threading
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
# 配置
URL = "https://prod-eu-cmp.simbalinkglobal.com"
INPUT_XPATH = '//textarea[@id="iccidList"] | //input[@id="iccidList"] | //*[@id="iccidList"]'
CONFIRM_BTN_XPATH = '//button[contains(text(),"查询")] | //button[contains(text(),"确认")] | //*[@id="pop-confirm"]//button[2] | //button[@type="submit"]'
BATCH_QUERY_BTN_XPATH = '/html/body/div/div[2]/main/div/div/div/div/div[2]/div[2]/div[1]/div[2]/div/button'
RESULT_ROWS_XPATH = [
'//table[contains(@class,"ant-table")]//tbody/tr',
'//table//tbody/tr',
'//div[contains(@class,"ant-table")]//tbody/tr',
'//table[@class="table"]//tbody/tr',
'//*[contains(@class,"table")]//tr[position()>1]'
]
MAX_PER_BATCH = 50
OUTPUT_CSV = "results.csv"
ICCID_FILE = "text.txt"
HEADLESS = True # Linux环境默认启用无头模式
BATCH_RETRY_COUNT = 2
# 多线程配置
MAX_THREADS = 3 # 最大线程数(优化:减少线程数)
THREAD_BATCH_SIZE = 100 # 每个线程处理的批次大小
# 登录配置
USERNAME = "xiongshi_yunwei" # 请替换为实际用户名
PASSWORD = "Cm20p20Nmo!25" # 请替换为实际密码
# 登录页面元素XPath - 根据实际页面元素更新
USERNAME_XPATH = "/html/body/div/div[2]/div/form/div[1]/div/div/div/div/span/input"
PASSWORD_XPATH = "//*[@id='password']"
LOGIN_BUTTON_XPATH = "/html/body/div/div[2]/div/form/button"
def read_query_items(path):
"""读取查询项目文件"""
encodings = ['utf-8', 'gbk', 'utf-8-sig', 'cp1252']
for encoding in encodings:
try:
with open(path, 'r', encoding=encoding) as f:
lines = [l.strip() for l in f.readlines() if l.strip()]
if lines:
return lines
except Exception:
continue
raise Exception("无法读取文件")
# 线程安全的文件写入锁
file_lock = threading.Lock()
def save_results_to_csv(results, filename, is_first_batch=False):
"""线程安全地保存结果到CSV文件"""
if not results:
return 0
# 构建DataFrame
df_data = []
for result in results:
row_data = {"batch": result["batch"]}
cells = result["cells"]
#
for i, cell in enumerate(cells):
row_data[f"col_{i}"] = cell
df_data.append(row_data)
# 创建DataFrame
df = pd.DataFrame(df_data)
# 使用锁确保线程安全
with file_lock:
try:
if is_first_batch:
# 第一次保存时,手动添加表头
header_data = {
"batch": "批次号",
"col_0": "ICCID",
"col_1": "设备ID",
"col_2": "生命周期",
"col_3": "周期用量(MB)",
"col_4": "流量上限(MB)",
"col_5": "在用套餐",
"col_6": "租户",
"col_7": "服务状态",
"col_8": "激活时间",
"col_9": "MSISDN",
"col_10": "IMSI"
}
# 创建表头DataFrame
header_df = pd.DataFrame([header_data])
# 先写入表头,再写入数据
header_df.to_csv(filename, index=False, encoding='utf-8-sig')
df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig')
else:
# 后续批次只追加数据
df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig')
except Exception as e:
print(f"保存文件时出错: {e}")
return 0
return len(df_data)
def login(driver, username, password):
"""使用用户名密码登录"""
try:
time.sleep(3)
current_url = driver.current_url
# 查找用户名输入框 - 使用更灵活的方式
username_input = None
username_selectors = [
USERNAME_XPATH, # 您提供的精确XPath
"//input[@id='username']", # 根据您提供的id
"//input[@placeholder='请输入账号']", # 根据您提供的placeholder
"//input[@type='text']",
"//input[@placeholder*='用户名']",
"//input[@placeholder*='账号']",
"//input[@placeholder*='user']",
"//input[@placeholder*='email']",
"//input[@name='username']",
"//input[@name='user']"
]
for selector in username_selectors:
try:
username_input = WebDriverWait(driver, 2).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
break
except Exception:
continue
if not username_input:
print("未找到用户名输入框")
return False
# 查找密码输入框 - 使用更灵活的方式
password_input = None
password_selectors = [
PASSWORD_XPATH, # 您提供的精确XPath
"//input[@id='password']", # 根据您提供的id
"//input[@placeholder='请输入密码']", # 根据您提供的placeholder
"//input[@type='password']",
"//input[@placeholder*='密码']",
"//input[@placeholder*='password']",
"//input[@name='password']",
"//input[@name='pwd']"
]
for selector in password_selectors:
try:
password_input = WebDriverWait(driver, 2).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
break
except Exception:
continue
if not password_input:
print("未找到密码输入框")
return False
# 查找登录按钮 - 使用更灵活的方式
login_button = None
login_selectors = [
LOGIN_BUTTON_XPATH, # 您提供的精确XPath
"//button[@type='submit']",
"//button[contains(text(),'登录')]",
"//button[contains(text(),'Login')]",
"//button[contains(text(),'登入')]",
"//input[@type='submit']",
"//button[contains(@class,'login')]",
"//button[contains(@class,'submit')]"
]
for selector in login_selectors:
try:
login_button = WebDriverWait(driver, 2).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
break
except Exception:
continue
if not login_button:
print("未找到登录按钮")
return False
# 清空并输入用户名
try:
# 先点击输入框确保焦点
username_input.click()
time.sleep(0.5)
# 清空输入框
username_input.clear()
time.sleep(0.1)
# 使用JavaScript设置值更可靠
driver.execute_script("arguments[0].value = '';", username_input)
username_input.send_keys(username)
time.sleep(0.5)
print(f"已输入用户名: {username}")
except Exception as e:
print(f"输入用户名失败: {e}")
return False
# 清空并输入密码
try:
# 先点击输入框确保焦点
password_input.click()
time.sleep(0.2)
# 清空输入框
password_input.clear()
time.sleep(0.1)
# 使用JavaScript设置值更可靠
driver.execute_script("arguments[0].value = '';", password_input)
password_input.send_keys(password)
time.sleep(0.5)
except Exception as e:
print(f"输入密码失败: {e}")
return False
# 点击登录按钮
try:
# 确保按钮可见和可点击
driver.execute_script("arguments[0].scrollIntoView(true);", login_button)
time.sleep(0.5)
# 尝试JavaScript点击更可靠
driver.execute_script("arguments[0].click();", login_button)
time.sleep(2)
except Exception as e:
print(f"点击登录按钮失败: {e}")
# 回退到普通点击
try:
login_button.click()
time.sleep(2)
except Exception as e2:
print(f"普通点击也失败: {e2}")
return False
# 等待登录完成,检查是否跳转到主页面
try:
# 等待页面跳转或出现成功标识
WebDriverWait(driver, 15).until(
lambda driver: driver.current_url != current_url or
"login" not in driver.current_url.lower() or
len(driver.find_elements(By.XPATH, "//*[contains(text(),'登录成功') or contains(text(),'欢迎')]")) > 0
)
new_url = driver.current_url
# 检查是否有错误信息
error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]")
if error_elements:
print(f"登录错误信息: {error_elements[0].text}")
return False
print("登录成功!")
return True
except Exception as e:
print(f"登录验证失败: {e}")
# 检查是否有错误信息
try:
error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]")
if error_elements:
print(f"登录错误信息: {error_elements[0].text}")
else:
print("未发现明显错误信息,可能登录成功")
return True
except:
pass
return False
except Exception as e:
print(f"登录过程中出错: {e}")
return False
def clear_input_box(driver, input_element):
"""清空输入框"""
try:
input_element.clear()
input_element.send_keys(Keys.CONTROL + "a")
input_element.send_keys(Keys.DELETE)
driver.execute_script("arguments[0].value = '';", input_element)
time.sleep(0.1)
return True
except Exception:
return False
def init_driver(headless=False):
"""初始化Chrome驱动 - 支持Linux无头模式"""
import platform
import random
chrome_opts = Options()
# 检测操作系统
system = platform.system().lower()
is_linux = system == 'linux'
# 强制无头模式Linux环境或指定headless
if is_linux or headless:
chrome_opts.add_argument("--headless=new")
print("启用无头模式")
# Linux环境专用优化
if is_linux:
chrome_opts.add_argument("--no-sandbox")
chrome_opts.add_argument("--disable-dev-shm-usage")
chrome_opts.add_argument("--disable-gpu")
chrome_opts.add_argument("--disable-software-rasterizer")
chrome_opts.add_argument("--disable-background-timer-throttling")
chrome_opts.add_argument("--disable-backgrounding-occluded-windows")
chrome_opts.add_argument("--disable-renderer-backgrounding")
chrome_opts.add_argument("--disable-features=TranslateUI")
chrome_opts.add_argument("--disable-ipc-flooding-protection")
chrome_opts.add_argument("--single-process") # Linux单进程模式
chrome_opts.add_argument("--memory-pressure-off")
chrome_opts.add_argument("--max_old_space_size=4096")
# 反爬虫机制规避
# 随机用户代理
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0"
]
selected_ua = random.choice(user_agents)
chrome_opts.add_argument(f"--user-agent={selected_ua}")
# 反检测措施
chrome_opts.add_argument("--disable-blink-features=AutomationControlled")
chrome_opts.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_opts.add_experimental_option('useAutomationExtension', False)
# 窗口和显示设置
chrome_opts.add_argument("--window-size=1920,1080")
chrome_opts.add_argument("--start-maximized")
chrome_opts.add_argument("--disable-infobars")
chrome_opts.add_argument("--disable-notifications")
chrome_opts.add_argument("--disable-popup-blocking")
# 性能优化
chrome_opts.add_argument("--disable-images") # 禁用图片加载
chrome_opts.add_argument("--disable-javascript") # 禁用JavaScript如果需要的话
chrome_opts.add_argument("--disable-plugins")
chrome_opts.add_argument("--disable-extensions")
chrome_opts.add_argument("--disable-default-apps")
chrome_opts.add_argument("--disable-sync")
chrome_opts.add_argument("--disable-translate")
chrome_opts.add_argument("--hide-scrollbars")
chrome_opts.add_argument("--mute-audio")
chrome_opts.add_argument("--no-first-run")
# 网络和安全设置
chrome_opts.add_argument("--disable-web-security")
chrome_opts.add_argument("--disable-features=VizDisplayCompositor")
chrome_opts.add_argument("--disable-client-side-phishing-detection")
chrome_opts.add_argument("--disable-component-extensions-with-background-pages")
chrome_opts.add_argument("--disable-background-networking")
# 日志控制
chrome_opts.add_argument("--log-level=3")
chrome_opts.add_argument("--silent")
chrome_opts.add_experimental_option('excludeSwitches', ['enable-logging'])
# 随机端口避免冲突
debug_port = random.randint(9222, 9999)
chrome_opts.add_argument(f"--remote-debugging-port={debug_port}")
try:
# 尝试使用ChromeDriverManager自动下载驱动
print("正在初始化Chrome驱动...")
if is_linux:
print("检测到Linux环境使用Linux优化配置")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_opts)
print("Chrome驱动初始化成功")
# 执行反检测脚本
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})")
except Exception as e:
print(f"ChromeDriverManager失败: {e}")
print("尝试使用系统PATH中的chromedriver...")
try:
# 回退到系统PATH中的chromedriver
driver = webdriver.Chrome(options=chrome_opts)
print("使用系统chromedriver成功")
# 执行反检测脚本
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})")
except Exception as e2:
print(f"系统chromedriver也失败: {e2}")
if is_linux:
print("done")
else:
print("检查Chrome浏览器和chromedriver")
raise Exception("检查Chrome浏览器和chromedriver安装")
# 设置窗口大小(无头模式也需要)
try:
if not is_linux and not headless:
driver.maximize_window()
else:
driver.set_window_size(1920, 1080)
except Exception as e:
print(f"窗口设置失败: {e}")
# 访问登录页面
try:
print(f"正在访问: {URL}")
driver.get(URL)
print("页面加载成功")
# 随机等待,模拟人类行为
wait_time = random.uniform(1, 3)
time.sleep(wait_time)
except Exception as e:
print(f"页面加载失败: {e}")
raise
return driver
def scrape_results_from_table(driver):
"""抓取表格结果"""
results = []
# 使用JavaScript直接获取表格数据
try:
# 尝试JavaScript方式获取表格数据
table_data = driver.execute_script("""
var tables = document.querySelectorAll('table, .ant-table');
for (var i = 0; i < tables.length; i++) {
var table = tables[i];
var rows = table.querySelectorAll('tbody tr, tr');
if (rows.length > 0) {
var data = [];
for (var j = 0; j < rows.length; j++) {
var cells = rows[j].querySelectorAll('td, th, div');
var rowData = [];
for (var k = 0; k < cells.length; k++) {
var text = cells[k].textContent || cells[k].innerText;
if (text && text.trim()) {
rowData.push(text.trim());
}
}
if (rowData.length > 0) {
// 过滤掉表头行(包含"ICCID""设备ID"等关键词的行)
var isHeader = false;
var headerKeywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI'];
for (var h = 0; h < headerKeywords.length; h++) {
if (rowData.some(function(cell) { return cell.includes(headerKeywords[h]); })) {
isHeader = true;
break;
}
}
if (!isHeader) {
data.push(rowData);
}
}
}
if (data.length > 0) {
return data;
}
}
}
return [];
""")
if table_data and len(table_data) > 0:
return table_data
except Exception as e:
print(f"JavaScript方式失败: {e}")
# 如果JavaScript失败回退到Selenium方式但优化等待时间
for xpath in RESULT_ROWS_XPATH:
try:
rows = WebDriverWait(driver, 2).until(EC.presence_of_all_elements_located((By.XPATH, xpath))) # 减少等待时间
if rows:
print(f"Selenium方式找到 {len(rows)} 行数据")
break
except Exception:
continue
else:
print("未找到结果表格")
return []
# 优化单元格获取方式
for r in rows:
try:
# 优先使用JavaScript获取文本更快
cell_texts = driver.execute_script("""
var cells = arguments[0].querySelectorAll('td, th, div');
var texts = [];
for (var i = 0; i < cells.length; i++) {
var text = cells[i].textContent || cells[i].innerText;
if (text && text.trim()) {
texts.push(text.trim());
}
}
return texts;
""", r)
if cell_texts and len(cell_texts) > 0:
# 过滤掉表头行
header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI']
is_header = any(keyword in cell for cell in cell_texts for keyword in header_keywords)
if not is_header:
results.append(cell_texts)
except Exception:
# 回退到传统方式
try:
cells = r.find_elements(By.TAG_NAME, "td")
if not cells:
cells = r.find_elements(By.TAG_NAME, "th")
if not cells:
cells = r.find_elements(By.XPATH, ".//div")
texts = [c.text.strip() for c in cells if c.text.strip()]
if texts:
# 过滤掉表头行
header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI']
is_header = any(keyword in text for text in texts for keyword in header_keywords)
if not is_header:
results.append(texts)
except Exception:
continue
return results
def submit_batch_and_collect(driver, batch_items):
"""提交批次查询并收集结果 - 增强反爬虫机制"""
import random
if len(batch_items) > MAX_PER_BATCH:
batch_items = batch_items[:MAX_PER_BATCH]
# 随机等待,模拟人类行为
wait_time = random.uniform(0.5, 2.0)
time.sleep(wait_time)
# 尝试打开批量查询弹窗
try:
batch_btn = WebDriverWait(driver, 0.3).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH)))
batch_btn.click()
time.sleep(random.uniform(0.3, 0.8))
except Exception:
pass
# 查找输入框
try:
inp = WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH)))
except Exception:
print("未找到输入框")
return []
# 清空并输入数据
if not clear_input_box(driver, inp):
return []
# 模拟人类输入行为 - 分批输入
payload = "\n".join(batch_items)
# 分批输入,模拟人类打字
chunk_size = 50 # 每批输入10个ICCID
chunks = [batch_items[i:i+chunk_size] for i in range(0, len(batch_items), chunk_size)]
for i, chunk in enumerate(chunks):
chunk_payload = "\n".join(chunk)
if i == 0:
inp.send_keys(chunk_payload)
else:
inp.send_keys("\n" + chunk_payload)
# 随机等待,模拟人类输入间隔
if i < len(chunks) - 1:
wait_time = random.uniform(0.1, 0.3)
time.sleep(wait_time)
# 随机等待,模拟人类思考时间
time.sleep(random.uniform(0.5, 1.5))
# 点击查询按钮
try:
btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH)))
btn.click()
except Exception:
return []
# 随机等待后处理
time.sleep(random.uniform(0.5, 1.0))
# 检查并关闭可能出现的"不存在"弹窗
try:
# 使用精确的XPath关闭弹窗
close_btn_xpath = '/html/body/div[2]/div/div[2]/div/div[2]/div/button'
try:
close_btn = WebDriverWait(driver, 0.5).until(EC.element_to_be_clickable((By.XPATH, close_btn_xpath)))
close_btn.click()
print("已关闭'不存在'弹窗")
time.sleep(0.5)
except Exception:
# 如果精确XPath失败尝试其他可能的关闭按钮
alternative_selectors = [
'//div[contains(text(),"批量查询结果")]//button',
'//div[contains(text(),"以下SIM卡不存在")]//button',
'//div[contains(@class,"modal")]//button[contains(@class,"close")]',
'//div[contains(@class,"ant-modal")]//button[contains(@class,"close")]'
]
for selector in alternative_selectors:
try:
close_btn = WebDriverWait(driver, 0.2).until(EC.element_to_be_clickable((By.XPATH, selector)))
close_btn.click()
print("已关闭'不存在'弹窗(备用方式)")
time.sleep(0.1)
break
except Exception:
continue
except Exception:
pass
# 抓取结果
results = scrape_results_from_table(driver)
# 清理输入框
try:
inp = driver.find_element(By.XPATH, INPUT_XPATH)
driver.execute_script("arguments[0].value = '';", inp)
except Exception:
pass
return results
def worker_thread(thread_id, assigned_batches, driver, shared_results, lock):
"""工作线程函数 - 处理预先分配的批次任务"""
print(f"线程 {thread_id} 启动,分配了 {len(assigned_batches)} 个批次")
thread_results = []
for batch_index, batch_items in assigned_batches:
print(f"线程 {thread_id} 开始处理批次 {batch_index}")
try:
results = submit_batch_and_collect(driver, batch_items)
if results:
batch_results = [{"batch": batch_index, "cells": row_cells} for row_cells in results]
thread_results.append((batch_index, batch_results, len(results)))
print(f"线程 {thread_id} 完成批次 {batch_index},获得 {len(results)} 条结果")
else:
thread_results.append((batch_index, [], 0))
print(f"线程 {thread_id} 完成批次 {batch_index},无结果")
except Exception as e:
print(f"线程 {thread_id} 处理批次 {batch_index} 失败: {e}")
thread_results.append((batch_index, [], 0))
# 减少等待时间,提高效率
import random
time.sleep(random.uniform(0.1, 0.3))
# 线程安全地将结果添加到共享列表
with lock:
shared_results.extend(thread_results)
print(f"线程 {thread_id} 完成所有分配任务,共处理 {len(thread_results)} 个批次")
def main():
"""多线程主函数"""
start_time = time.time()
init_start_time = time.time()
query_items = read_query_items(ICCID_FILE)
if not query_items:
print(f"{ICCID_FILE} 中未找到查询项")
return
print(f"总共读取到 {len(query_items)} 个查询项")
batches = [query_items[i:i+MAX_PER_BATCH] for i in range(0, len(query_items), MAX_PER_BATCH)]
print(f"将分为 {len(batches)} 个批次处理")
print(f"使用 {MAX_THREADS} 个线程并行处理")
# 预先分配批次给各个线程
batches_with_index = [(i+1, batch) for i, batch in enumerate(batches)]
# 将批次分配给线程
thread_assignments = []
batches_per_thread = len(batches) // MAX_THREADS
remaining_batches = len(batches) % MAX_THREADS
start_idx = 0
for thread_id in range(MAX_THREADS):
# 计算当前线程应处理的批次数量
current_batch_count = batches_per_thread
if thread_id < remaining_batches:
current_batch_count += 1
# 分配批次
assigned_batches = batches_with_index[start_idx:start_idx + current_batch_count]
thread_assignments.append(assigned_batches)
start_idx += current_batch_count
print(f"线程 {thread_id + 1} 分配了 {len(assigned_batches)} 个批次: {[b[0] for b in assigned_batches]}")
# 创建共享结果列表和锁
shared_results = []
results_lock = threading.Lock()
# 初始化多个驱动实例
drivers = []
for i in range(MAX_THREADS):
driver = None
max_retries = 3
for retry in range(max_retries):
try:
print(f"初始化线程 {i+1} 的Chrome驱动 (第 {retry + 1}/{max_retries} 次)...")
driver = init_driver(HEADLESS)
break
except Exception as e:
print(f"线程 {i+1}{retry + 1} 次初始化失败: {e}")
if retry == max_retries - 1:
print(f"线程 {i+1} 所有初始化尝试都失败了")
return
time.sleep(3)
if driver is None:
print(f"线程 {i+1} 无法初始化Chrome驱动程序退出")
return
drivers.append(driver)
# 并行登录所有驱动
print("开始并行登录所有线程...")
login_start_time = time.time()
def login_driver(driver_info):
"""登录单个驱动的函数"""
thread_id, driver = driver_info
try:
if login(driver, USERNAME, PASSWORD):
print(f"线程 {thread_id} 登录成功")
return thread_id, True, None
else:
print(f"线程 {thread_id} 登录失败")
return thread_id, False, "登录失败"
except Exception as e:
print(f"线程 {thread_id} 登录异常: {e}")
return thread_id, False, str(e)
# 使用线程池并行登录
login_results = []
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
# 提交所有登录任务
driver_infos = [(i+1, driver) for i, driver in enumerate(drivers)]
future_to_thread = {executor.submit(login_driver, info): info[0] for info in driver_infos}
# 收集登录结果
for future in as_completed(future_to_thread):
thread_id, success, error = future.result()
login_results.append((thread_id, success, error))
# 检查登录结果
failed_logins = [result for result in login_results if not result[1]]
if failed_logins:
print(f"{len(failed_logins)} 个线程登录失败:")
for thread_id, _, error in failed_logins:
print(f" - 线程 {thread_id}: {error}")
print("程序退出")
return
login_end_time = time.time()
login_duration = login_end_time - login_start_time
print(f"✅ 所有 {MAX_THREADS} 个线程登录成功")
print(f"⏱️ 并行登录耗时: {login_duration:.2f}")
print("开始批量查询...")
# 记录初始化完成时间
init_end_time = time.time()
init_duration = init_end_time - init_start_time
print(f"⏱️ 总初始化耗时: {init_duration:.2f}")
# 启动工作线程
process_start_time = time.time()
threads = []
for i in range(MAX_THREADS):
thread = threading.Thread(
target=worker_thread,
args=(i+1, thread_assignments[i], drivers[i], shared_results, results_lock)
)
thread.start()
threads.append(thread)
print("所有线程已启动,等待完成...")
# 等待所有线程完成
for thread in threads:
thread.join()
print("所有线程已完成,开始按批次顺序保存结果...")
# 按批次号排序并保存结果
shared_results.sort(key=lambda x: x[0]) # 按批次号排序
total_saved_results = 0
is_first_batch = True
for batch_index, batch_results, result_count in shared_results:
if batch_results:
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV, is_first_batch)
total_saved_results += saved_count
print(f"✅ 已保存批次 {batch_index}{saved_count} 条结果")
is_first_batch = False
else:
print(f"批次 {batch_index} 无结果")
# 关闭所有驱动
print("关闭所有Chrome驱动...")
for i, driver in enumerate(drivers):
try:
driver.quit()
print(f"线程 {i+1} 驱动已关闭")
except Exception as e:
print(f"关闭线程 {i+1} 驱动时出错: {e}")
# 生成总结报告
end_time = time.time()
total_time = end_time - start_time
process_time = end_time - process_start_time
avg_time_per_batch = total_time / len(batches) if len(batches) > 0 else 0
print(f"\n{'='*50}")
print(f"📊 多线程批量查询完成报告")
print(f"{'='*50}")
print(f"总查询项: {len(query_items)}")
print(f"总批次数: {len(batches)}")
print(f"使用线程数: {MAX_THREADS}")
print(f"完成批次: {len(batches)}")
print(f"总保存结果数: {total_saved_results}")
print(f"⏱️ 总初始化耗时: {init_duration:.2f}")
print(f"⏱️ 并行登录耗时: {login_duration:.2f}")
print(f"⏱️ 处理耗时: {process_time:.2f}")
print(f"⏱️ 总耗时: {total_time:.2f}")
print(f"⏱️ 平均每批次: {avg_time_per_batch:.2f}")
# 计算实际性能提升
estimated_single_thread_time = avg_time_per_batch * len(batches)
actual_speedup = estimated_single_thread_time / total_time if total_time > 0 else 1
print(f"🚀 实际性能提升: {actual_speedup:.2f}x")
print(f"{'='*50}")
print(f"✅ 所有结果已按顺序保存到 {OUTPUT_CSV}")
if __name__ == "__main__":
main()