886 lines
34 KiB
Python
886 lines
34 KiB
Python
#多线程
|
||
import os
|
||
import time
|
||
import pandas as pd
|
||
import threading
|
||
import queue
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.common.keys import Keys
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
from selenium.webdriver.chrome.service import Service
|
||
|
||
# 配置
|
||
URL = "https://prod-eu-cmp.simbalinkglobal.com"
|
||
INPUT_XPATH = '//textarea[@id="iccidList"] | //input[@id="iccidList"] | //*[@id="iccidList"]'
|
||
CONFIRM_BTN_XPATH = '//button[contains(text(),"查询")] | //button[contains(text(),"确认")] | //*[@id="pop-confirm"]//button[2] | //button[@type="submit"]'
|
||
BATCH_QUERY_BTN_XPATH = '/html/body/div/div[2]/main/div/div/div/div/div[2]/div[2]/div[1]/div[2]/div/button'
|
||
RESULT_ROWS_XPATH = [
|
||
'//table[contains(@class,"ant-table")]//tbody/tr',
|
||
'//table//tbody/tr',
|
||
'//div[contains(@class,"ant-table")]//tbody/tr',
|
||
'//table[@class="table"]//tbody/tr',
|
||
'//*[contains(@class,"table")]//tr[position()>1]'
|
||
]
|
||
|
||
MAX_PER_BATCH = 50
|
||
OUTPUT_CSV = "results.csv"
|
||
ICCID_FILE = "text.txt"
|
||
HEADLESS = True # Linux环境默认启用无头模式
|
||
BATCH_RETRY_COUNT = 2
|
||
|
||
# 多线程配置
|
||
MAX_THREADS = 3 # 最大线程数(优化:减少线程数)
|
||
THREAD_BATCH_SIZE = 100 # 每个线程处理的批次大小
|
||
|
||
# 登录配置
|
||
USERNAME = "xiongshi_yunwei" # 请替换为实际用户名
|
||
PASSWORD = "Cm20p20Nmo!25" # 请替换为实际密码
|
||
|
||
# 登录页面元素XPath - 根据实际页面元素更新
|
||
USERNAME_XPATH = "/html/body/div/div[2]/div/form/div[1]/div/div/div/div/span/input"
|
||
PASSWORD_XPATH = "//*[@id='password']"
|
||
LOGIN_BUTTON_XPATH = "/html/body/div/div[2]/div/form/button"
|
||
|
||
def read_query_items(path):
|
||
"""读取查询项目文件"""
|
||
encodings = ['utf-8', 'gbk', 'utf-8-sig', 'cp1252']
|
||
for encoding in encodings:
|
||
try:
|
||
with open(path, 'r', encoding=encoding) as f:
|
||
lines = [l.strip() for l in f.readlines() if l.strip()]
|
||
if lines:
|
||
return lines
|
||
except Exception:
|
||
continue
|
||
raise Exception("无法读取文件")
|
||
|
||
# 线程安全的文件写入锁
|
||
file_lock = threading.Lock()
|
||
|
||
def save_results_to_csv(results, filename, is_first_batch=False):
|
||
"""线程安全地保存结果到CSV文件"""
|
||
if not results:
|
||
return 0
|
||
|
||
# 构建DataFrame
|
||
df_data = []
|
||
for result in results:
|
||
row_data = {"batch": result["batch"]}
|
||
cells = result["cells"]
|
||
#
|
||
for i, cell in enumerate(cells):
|
||
row_data[f"col_{i}"] = cell
|
||
df_data.append(row_data)
|
||
|
||
# 创建DataFrame
|
||
df = pd.DataFrame(df_data)
|
||
|
||
# 使用锁确保线程安全
|
||
with file_lock:
|
||
try:
|
||
if is_first_batch:
|
||
# 第一次保存时,手动添加表头
|
||
header_data = {
|
||
"batch": "批次号",
|
||
"col_0": "ICCID",
|
||
"col_1": "设备ID",
|
||
"col_2": "生命周期",
|
||
"col_3": "周期用量(MB)",
|
||
"col_4": "流量上限(MB)",
|
||
"col_5": "在用套餐",
|
||
"col_6": "租户",
|
||
"col_7": "服务状态",
|
||
"col_8": "激活时间",
|
||
"col_9": "MSISDN",
|
||
"col_10": "IMSI"
|
||
}
|
||
# 创建表头DataFrame
|
||
header_df = pd.DataFrame([header_data])
|
||
# 先写入表头,再写入数据
|
||
header_df.to_csv(filename, index=False, encoding='utf-8-sig')
|
||
df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig')
|
||
else:
|
||
# 后续批次只追加数据
|
||
df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig')
|
||
except Exception as e:
|
||
print(f"保存文件时出错: {e}")
|
||
return 0
|
||
|
||
return len(df_data)
|
||
|
||
def login(driver, username, password):
|
||
"""使用用户名密码登录"""
|
||
try:
|
||
time.sleep(3)
|
||
current_url = driver.current_url
|
||
|
||
# 查找用户名输入框 - 使用更灵活的方式
|
||
username_input = None
|
||
username_selectors = [
|
||
USERNAME_XPATH, # 您提供的精确XPath
|
||
"//input[@id='username']", # 根据您提供的id
|
||
"//input[@placeholder='请输入账号']", # 根据您提供的placeholder
|
||
"//input[@type='text']",
|
||
"//input[@placeholder*='用户名']",
|
||
"//input[@placeholder*='账号']",
|
||
"//input[@placeholder*='user']",
|
||
"//input[@placeholder*='email']",
|
||
"//input[@name='username']",
|
||
"//input[@name='user']"
|
||
]
|
||
|
||
for selector in username_selectors:
|
||
try:
|
||
username_input = WebDriverWait(driver, 2).until(
|
||
EC.element_to_be_clickable((By.XPATH, selector))
|
||
)
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
if not username_input:
|
||
print("未找到用户名输入框")
|
||
return False
|
||
|
||
# 查找密码输入框 - 使用更灵活的方式
|
||
password_input = None
|
||
password_selectors = [
|
||
PASSWORD_XPATH, # 您提供的精确XPath
|
||
"//input[@id='password']", # 根据您提供的id
|
||
"//input[@placeholder='请输入密码']", # 根据您提供的placeholder
|
||
"//input[@type='password']",
|
||
"//input[@placeholder*='密码']",
|
||
"//input[@placeholder*='password']",
|
||
"//input[@name='password']",
|
||
"//input[@name='pwd']"
|
||
]
|
||
|
||
for selector in password_selectors:
|
||
try:
|
||
password_input = WebDriverWait(driver, 2).until(
|
||
EC.element_to_be_clickable((By.XPATH, selector))
|
||
)
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
if not password_input:
|
||
print("未找到密码输入框")
|
||
return False
|
||
|
||
# 查找登录按钮 - 使用更灵活的方式
|
||
login_button = None
|
||
login_selectors = [
|
||
LOGIN_BUTTON_XPATH, # 您提供的精确XPath
|
||
"//button[@type='submit']",
|
||
"//button[contains(text(),'登录')]",
|
||
"//button[contains(text(),'Login')]",
|
||
"//button[contains(text(),'登入')]",
|
||
"//input[@type='submit']",
|
||
"//button[contains(@class,'login')]",
|
||
"//button[contains(@class,'submit')]"
|
||
]
|
||
|
||
for selector in login_selectors:
|
||
try:
|
||
login_button = WebDriverWait(driver, 2).until(
|
||
EC.element_to_be_clickable((By.XPATH, selector))
|
||
)
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
if not login_button:
|
||
print("未找到登录按钮")
|
||
return False
|
||
|
||
# 清空并输入用户名
|
||
try:
|
||
# 先点击输入框确保焦点
|
||
username_input.click()
|
||
time.sleep(0.5)
|
||
|
||
# 清空输入框
|
||
username_input.clear()
|
||
time.sleep(0.1)
|
||
|
||
# 使用JavaScript设置值(更可靠)
|
||
driver.execute_script("arguments[0].value = '';", username_input)
|
||
username_input.send_keys(username)
|
||
time.sleep(0.5)
|
||
|
||
print(f"已输入用户名: {username}")
|
||
except Exception as e:
|
||
print(f"输入用户名失败: {e}")
|
||
return False
|
||
|
||
# 清空并输入密码
|
||
try:
|
||
# 先点击输入框确保焦点
|
||
password_input.click()
|
||
time.sleep(0.2)
|
||
|
||
# 清空输入框
|
||
password_input.clear()
|
||
time.sleep(0.1)
|
||
|
||
# 使用JavaScript设置值(更可靠)
|
||
driver.execute_script("arguments[0].value = '';", password_input)
|
||
password_input.send_keys(password)
|
||
time.sleep(0.5)
|
||
except Exception as e:
|
||
print(f"输入密码失败: {e}")
|
||
return False
|
||
|
||
# 点击登录按钮
|
||
try:
|
||
# 确保按钮可见和可点击
|
||
driver.execute_script("arguments[0].scrollIntoView(true);", login_button)
|
||
time.sleep(0.5)
|
||
|
||
# 尝试JavaScript点击(更可靠)
|
||
driver.execute_script("arguments[0].click();", login_button)
|
||
time.sleep(2)
|
||
except Exception as e:
|
||
print(f"点击登录按钮失败: {e}")
|
||
# 回退到普通点击
|
||
try:
|
||
login_button.click()
|
||
time.sleep(2)
|
||
except Exception as e2:
|
||
print(f"普通点击也失败: {e2}")
|
||
return False
|
||
|
||
# 等待登录完成,检查是否跳转到主页面
|
||
try:
|
||
# 等待页面跳转或出现成功标识
|
||
WebDriverWait(driver, 15).until(
|
||
lambda driver: driver.current_url != current_url or
|
||
"login" not in driver.current_url.lower() or
|
||
len(driver.find_elements(By.XPATH, "//*[contains(text(),'登录成功') or contains(text(),'欢迎')]")) > 0
|
||
)
|
||
|
||
new_url = driver.current_url
|
||
|
||
# 检查是否有错误信息
|
||
error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]")
|
||
if error_elements:
|
||
print(f"登录错误信息: {error_elements[0].text}")
|
||
return False
|
||
|
||
print("登录成功!")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"登录验证失败: {e}")
|
||
# 检查是否有错误信息
|
||
try:
|
||
error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]")
|
||
if error_elements:
|
||
print(f"登录错误信息: {error_elements[0].text}")
|
||
else:
|
||
print("未发现明显错误信息,可能登录成功")
|
||
return True
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"登录过程中出错: {e}")
|
||
return False
|
||
|
||
def clear_input_box(driver, input_element):
|
||
"""清空输入框"""
|
||
try:
|
||
input_element.clear()
|
||
input_element.send_keys(Keys.CONTROL + "a")
|
||
input_element.send_keys(Keys.DELETE)
|
||
driver.execute_script("arguments[0].value = '';", input_element)
|
||
time.sleep(0.1)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
def init_driver(headless=False):
|
||
"""初始化Chrome驱动 - 支持Linux无头模式"""
|
||
import platform
|
||
import random
|
||
|
||
chrome_opts = Options()
|
||
|
||
# 检测操作系统
|
||
system = platform.system().lower()
|
||
is_linux = system == 'linux'
|
||
|
||
# 强制无头模式(Linux环境或指定headless)
|
||
if is_linux or headless:
|
||
chrome_opts.add_argument("--headless=new")
|
||
print("启用无头模式")
|
||
|
||
# Linux环境专用优化
|
||
if is_linux:
|
||
chrome_opts.add_argument("--no-sandbox")
|
||
chrome_opts.add_argument("--disable-dev-shm-usage")
|
||
chrome_opts.add_argument("--disable-gpu")
|
||
chrome_opts.add_argument("--disable-software-rasterizer")
|
||
chrome_opts.add_argument("--disable-background-timer-throttling")
|
||
chrome_opts.add_argument("--disable-backgrounding-occluded-windows")
|
||
chrome_opts.add_argument("--disable-renderer-backgrounding")
|
||
chrome_opts.add_argument("--disable-features=TranslateUI")
|
||
chrome_opts.add_argument("--disable-ipc-flooding-protection")
|
||
chrome_opts.add_argument("--single-process") # Linux单进程模式
|
||
chrome_opts.add_argument("--memory-pressure-off")
|
||
chrome_opts.add_argument("--max_old_space_size=4096")
|
||
|
||
# 反爬虫机制规避
|
||
# 随机用户代理
|
||
user_agents = [
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0"
|
||
]
|
||
selected_ua = random.choice(user_agents)
|
||
chrome_opts.add_argument(f"--user-agent={selected_ua}")
|
||
|
||
# 反检测措施
|
||
chrome_opts.add_argument("--disable-blink-features=AutomationControlled")
|
||
chrome_opts.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||
chrome_opts.add_experimental_option('useAutomationExtension', False)
|
||
|
||
# 窗口和显示设置
|
||
chrome_opts.add_argument("--window-size=1920,1080")
|
||
chrome_opts.add_argument("--start-maximized")
|
||
chrome_opts.add_argument("--disable-infobars")
|
||
chrome_opts.add_argument("--disable-notifications")
|
||
chrome_opts.add_argument("--disable-popup-blocking")
|
||
|
||
# 性能优化
|
||
chrome_opts.add_argument("--disable-images") # 禁用图片加载
|
||
chrome_opts.add_argument("--disable-javascript") # 禁用JavaScript(如果需要的话)
|
||
chrome_opts.add_argument("--disable-plugins")
|
||
chrome_opts.add_argument("--disable-extensions")
|
||
chrome_opts.add_argument("--disable-default-apps")
|
||
chrome_opts.add_argument("--disable-sync")
|
||
chrome_opts.add_argument("--disable-translate")
|
||
chrome_opts.add_argument("--hide-scrollbars")
|
||
chrome_opts.add_argument("--mute-audio")
|
||
chrome_opts.add_argument("--no-first-run")
|
||
|
||
# 网络和安全设置
|
||
chrome_opts.add_argument("--disable-web-security")
|
||
chrome_opts.add_argument("--disable-features=VizDisplayCompositor")
|
||
chrome_opts.add_argument("--disable-client-side-phishing-detection")
|
||
chrome_opts.add_argument("--disable-component-extensions-with-background-pages")
|
||
chrome_opts.add_argument("--disable-background-networking")
|
||
|
||
# 日志控制
|
||
chrome_opts.add_argument("--log-level=3")
|
||
chrome_opts.add_argument("--silent")
|
||
chrome_opts.add_experimental_option('excludeSwitches', ['enable-logging'])
|
||
|
||
# 随机端口避免冲突
|
||
debug_port = random.randint(9222, 9999)
|
||
chrome_opts.add_argument(f"--remote-debugging-port={debug_port}")
|
||
|
||
try:
|
||
# 尝试使用ChromeDriverManager自动下载驱动
|
||
print("正在初始化Chrome驱动...")
|
||
if is_linux:
|
||
print("检测到Linux环境,使用Linux优化配置")
|
||
|
||
service = Service(ChromeDriverManager().install())
|
||
driver = webdriver.Chrome(service=service, options=chrome_opts)
|
||
print("Chrome驱动初始化成功")
|
||
|
||
# 执行反检测脚本
|
||
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
||
driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
|
||
driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})")
|
||
|
||
except Exception as e:
|
||
print(f"ChromeDriverManager失败: {e}")
|
||
print("尝试使用系统PATH中的chromedriver...")
|
||
try:
|
||
# 回退到系统PATH中的chromedriver
|
||
driver = webdriver.Chrome(options=chrome_opts)
|
||
print("使用系统chromedriver成功")
|
||
|
||
# 执行反检测脚本
|
||
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
||
driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
|
||
driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})")
|
||
|
||
except Exception as e2:
|
||
print(f"系统chromedriver也失败: {e2}")
|
||
if is_linux:
|
||
print("done")
|
||
else:
|
||
print("检查Chrome浏览器和chromedriver")
|
||
raise Exception("检查Chrome浏览器和chromedriver安装")
|
||
|
||
# 设置窗口大小(无头模式也需要)
|
||
try:
|
||
if not is_linux and not headless:
|
||
driver.maximize_window()
|
||
else:
|
||
driver.set_window_size(1920, 1080)
|
||
|
||
except Exception as e:
|
||
print(f"窗口设置失败: {e}")
|
||
|
||
# 访问登录页面
|
||
try:
|
||
print(f"正在访问: {URL}")
|
||
driver.get(URL)
|
||
print("页面加载成功")
|
||
|
||
# 随机等待,模拟人类行为
|
||
wait_time = random.uniform(1, 3)
|
||
time.sleep(wait_time)
|
||
|
||
except Exception as e:
|
||
print(f"页面加载失败: {e}")
|
||
raise
|
||
|
||
return driver
|
||
|
||
def scrape_results_from_table(driver):
|
||
"""抓取表格结果"""
|
||
results = []
|
||
|
||
# 使用JavaScript直接获取表格数据
|
||
try:
|
||
# 尝试JavaScript方式获取表格数据
|
||
table_data = driver.execute_script("""
|
||
var tables = document.querySelectorAll('table, .ant-table');
|
||
for (var i = 0; i < tables.length; i++) {
|
||
var table = tables[i];
|
||
var rows = table.querySelectorAll('tbody tr, tr');
|
||
if (rows.length > 0) {
|
||
var data = [];
|
||
for (var j = 0; j < rows.length; j++) {
|
||
var cells = rows[j].querySelectorAll('td, th, div');
|
||
var rowData = [];
|
||
for (var k = 0; k < cells.length; k++) {
|
||
var text = cells[k].textContent || cells[k].innerText;
|
||
if (text && text.trim()) {
|
||
rowData.push(text.trim());
|
||
}
|
||
}
|
||
if (rowData.length > 0) {
|
||
// 过滤掉表头行(包含"ICCID"、"设备ID"等关键词的行)
|
||
var isHeader = false;
|
||
var headerKeywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI'];
|
||
for (var h = 0; h < headerKeywords.length; h++) {
|
||
if (rowData.some(function(cell) { return cell.includes(headerKeywords[h]); })) {
|
||
isHeader = true;
|
||
break;
|
||
}
|
||
}
|
||
if (!isHeader) {
|
||
data.push(rowData);
|
||
}
|
||
}
|
||
}
|
||
if (data.length > 0) {
|
||
return data;
|
||
}
|
||
}
|
||
}
|
||
return [];
|
||
""")
|
||
|
||
if table_data and len(table_data) > 0:
|
||
return table_data
|
||
except Exception as e:
|
||
print(f"JavaScript方式失败: {e}")
|
||
|
||
# 如果JavaScript失败,回退到Selenium方式(但优化等待时间)
|
||
for xpath in RESULT_ROWS_XPATH:
|
||
try:
|
||
rows = WebDriverWait(driver, 2).until(EC.presence_of_all_elements_located((By.XPATH, xpath))) # 减少等待时间
|
||
if rows:
|
||
print(f"Selenium方式找到 {len(rows)} 行数据")
|
||
break
|
||
except Exception:
|
||
continue
|
||
else:
|
||
print("未找到结果表格")
|
||
return []
|
||
|
||
# 优化单元格获取方式
|
||
for r in rows:
|
||
try:
|
||
# 优先使用JavaScript获取文本,更快
|
||
cell_texts = driver.execute_script("""
|
||
var cells = arguments[0].querySelectorAll('td, th, div');
|
||
var texts = [];
|
||
for (var i = 0; i < cells.length; i++) {
|
||
var text = cells[i].textContent || cells[i].innerText;
|
||
if (text && text.trim()) {
|
||
texts.push(text.trim());
|
||
}
|
||
}
|
||
return texts;
|
||
""", r)
|
||
|
||
if cell_texts and len(cell_texts) > 0:
|
||
# 过滤掉表头行
|
||
header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI']
|
||
is_header = any(keyword in cell for cell in cell_texts for keyword in header_keywords)
|
||
if not is_header:
|
||
results.append(cell_texts)
|
||
except Exception:
|
||
# 回退到传统方式
|
||
try:
|
||
cells = r.find_elements(By.TAG_NAME, "td")
|
||
if not cells:
|
||
cells = r.find_elements(By.TAG_NAME, "th")
|
||
if not cells:
|
||
cells = r.find_elements(By.XPATH, ".//div")
|
||
|
||
texts = [c.text.strip() for c in cells if c.text.strip()]
|
||
if texts:
|
||
# 过滤掉表头行
|
||
header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI']
|
||
is_header = any(keyword in text for text in texts for keyword in header_keywords)
|
||
if not is_header:
|
||
results.append(texts)
|
||
except Exception:
|
||
continue
|
||
|
||
return results
|
||
|
||
def submit_batch_and_collect(driver, batch_items):
|
||
"""提交批次查询并收集结果 - 增强反爬虫机制"""
|
||
import random
|
||
|
||
if len(batch_items) > MAX_PER_BATCH:
|
||
batch_items = batch_items[:MAX_PER_BATCH]
|
||
|
||
# 随机等待,模拟人类行为
|
||
wait_time = random.uniform(0.5, 2.0)
|
||
time.sleep(wait_time)
|
||
|
||
# 尝试打开批量查询弹窗
|
||
try:
|
||
batch_btn = WebDriverWait(driver, 0.3).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH)))
|
||
batch_btn.click()
|
||
time.sleep(random.uniform(0.3, 0.8))
|
||
except Exception:
|
||
pass
|
||
|
||
# 查找输入框
|
||
try:
|
||
inp = WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH)))
|
||
except Exception:
|
||
print("未找到输入框")
|
||
return []
|
||
|
||
# 清空并输入数据
|
||
if not clear_input_box(driver, inp):
|
||
return []
|
||
|
||
# 模拟人类输入行为 - 分批输入
|
||
payload = "\n".join(batch_items)
|
||
|
||
# 分批输入,模拟人类打字
|
||
chunk_size = 50 # 每批输入10个ICCID
|
||
chunks = [batch_items[i:i+chunk_size] for i in range(0, len(batch_items), chunk_size)]
|
||
|
||
for i, chunk in enumerate(chunks):
|
||
chunk_payload = "\n".join(chunk)
|
||
if i == 0:
|
||
inp.send_keys(chunk_payload)
|
||
else:
|
||
inp.send_keys("\n" + chunk_payload)
|
||
|
||
# 随机等待,模拟人类输入间隔
|
||
if i < len(chunks) - 1:
|
||
wait_time = random.uniform(0.1, 0.3)
|
||
time.sleep(wait_time)
|
||
|
||
# 随机等待,模拟人类思考时间
|
||
time.sleep(random.uniform(0.5, 1.5))
|
||
|
||
# 点击查询按钮
|
||
try:
|
||
btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH)))
|
||
btn.click()
|
||
except Exception:
|
||
return []
|
||
|
||
# 随机等待后处理
|
||
time.sleep(random.uniform(0.5, 1.0))
|
||
|
||
# 检查并关闭可能出现的"不存在"弹窗
|
||
try:
|
||
# 使用精确的XPath关闭弹窗
|
||
close_btn_xpath = '/html/body/div[2]/div/div[2]/div/div[2]/div/button'
|
||
try:
|
||
close_btn = WebDriverWait(driver, 0.5).until(EC.element_to_be_clickable((By.XPATH, close_btn_xpath)))
|
||
close_btn.click()
|
||
print("已关闭'不存在'弹窗")
|
||
time.sleep(0.5)
|
||
except Exception:
|
||
# 如果精确XPath失败,尝试其他可能的关闭按钮
|
||
alternative_selectors = [
|
||
'//div[contains(text(),"批量查询结果")]//button',
|
||
'//div[contains(text(),"以下SIM卡不存在")]//button',
|
||
'//div[contains(@class,"modal")]//button[contains(@class,"close")]',
|
||
'//div[contains(@class,"ant-modal")]//button[contains(@class,"close")]'
|
||
]
|
||
|
||
for selector in alternative_selectors:
|
||
try:
|
||
close_btn = WebDriverWait(driver, 0.2).until(EC.element_to_be_clickable((By.XPATH, selector)))
|
||
close_btn.click()
|
||
print("已关闭'不存在'弹窗(备用方式)")
|
||
time.sleep(0.1)
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
except Exception:
|
||
pass
|
||
|
||
# 抓取结果
|
||
results = scrape_results_from_table(driver)
|
||
|
||
# 清理输入框
|
||
try:
|
||
inp = driver.find_element(By.XPATH, INPUT_XPATH)
|
||
driver.execute_script("arguments[0].value = '';", inp)
|
||
except Exception:
|
||
pass
|
||
|
||
return results
|
||
|
||
def worker_thread(thread_id, assigned_batches, driver, shared_results, lock):
|
||
"""工作线程函数 - 处理预先分配的批次任务"""
|
||
print(f"线程 {thread_id} 启动,分配了 {len(assigned_batches)} 个批次")
|
||
|
||
thread_results = []
|
||
|
||
for batch_index, batch_items in assigned_batches:
|
||
print(f"线程 {thread_id} 开始处理批次 {batch_index}")
|
||
|
||
try:
|
||
results = submit_batch_and_collect(driver, batch_items)
|
||
if results:
|
||
batch_results = [{"batch": batch_index, "cells": row_cells} for row_cells in results]
|
||
thread_results.append((batch_index, batch_results, len(results)))
|
||
print(f"线程 {thread_id} 完成批次 {batch_index},获得 {len(results)} 条结果")
|
||
else:
|
||
thread_results.append((batch_index, [], 0))
|
||
print(f"线程 {thread_id} 完成批次 {batch_index},无结果")
|
||
|
||
except Exception as e:
|
||
print(f"线程 {thread_id} 处理批次 {batch_index} 失败: {e}")
|
||
thread_results.append((batch_index, [], 0))
|
||
|
||
# 减少等待时间,提高效率
|
||
import random
|
||
time.sleep(random.uniform(0.1, 0.3))
|
||
|
||
# 线程安全地将结果添加到共享列表
|
||
with lock:
|
||
shared_results.extend(thread_results)
|
||
|
||
print(f"线程 {thread_id} 完成所有分配任务,共处理 {len(thread_results)} 个批次")
|
||
|
||
def main():
|
||
"""多线程主函数"""
|
||
start_time = time.time()
|
||
init_start_time = time.time()
|
||
|
||
query_items = read_query_items(ICCID_FILE)
|
||
if not query_items:
|
||
print(f"在 {ICCID_FILE} 中未找到查询项")
|
||
return
|
||
|
||
print(f"总共读取到 {len(query_items)} 个查询项")
|
||
batches = [query_items[i:i+MAX_PER_BATCH] for i in range(0, len(query_items), MAX_PER_BATCH)]
|
||
print(f"将分为 {len(batches)} 个批次处理")
|
||
print(f"使用 {MAX_THREADS} 个线程并行处理")
|
||
|
||
# 预先分配批次给各个线程
|
||
batches_with_index = [(i+1, batch) for i, batch in enumerate(batches)]
|
||
|
||
# 将批次分配给线程
|
||
thread_assignments = []
|
||
batches_per_thread = len(batches) // MAX_THREADS
|
||
remaining_batches = len(batches) % MAX_THREADS
|
||
|
||
start_idx = 0
|
||
for thread_id in range(MAX_THREADS):
|
||
# 计算当前线程应处理的批次数量
|
||
current_batch_count = batches_per_thread
|
||
if thread_id < remaining_batches:
|
||
current_batch_count += 1
|
||
|
||
# 分配批次
|
||
assigned_batches = batches_with_index[start_idx:start_idx + current_batch_count]
|
||
thread_assignments.append(assigned_batches)
|
||
start_idx += current_batch_count
|
||
|
||
print(f"线程 {thread_id + 1} 分配了 {len(assigned_batches)} 个批次: {[b[0] for b in assigned_batches]}")
|
||
|
||
# 创建共享结果列表和锁
|
||
shared_results = []
|
||
results_lock = threading.Lock()
|
||
|
||
# 初始化多个驱动实例
|
||
drivers = []
|
||
for i in range(MAX_THREADS):
|
||
driver = None
|
||
max_retries = 3
|
||
for retry in range(max_retries):
|
||
try:
|
||
print(f"初始化线程 {i+1} 的Chrome驱动 (第 {retry + 1}/{max_retries} 次)...")
|
||
driver = init_driver(HEADLESS)
|
||
break
|
||
except Exception as e:
|
||
print(f"线程 {i+1} 第 {retry + 1} 次初始化失败: {e}")
|
||
if retry == max_retries - 1:
|
||
print(f"线程 {i+1} 所有初始化尝试都失败了")
|
||
return
|
||
time.sleep(3)
|
||
|
||
if driver is None:
|
||
print(f"线程 {i+1} 无法初始化Chrome驱动,程序退出")
|
||
return
|
||
|
||
drivers.append(driver)
|
||
|
||
# 并行登录所有驱动
|
||
print("开始并行登录所有线程...")
|
||
login_start_time = time.time()
|
||
|
||
def login_driver(driver_info):
|
||
"""登录单个驱动的函数"""
|
||
thread_id, driver = driver_info
|
||
try:
|
||
if login(driver, USERNAME, PASSWORD):
|
||
print(f"线程 {thread_id} 登录成功")
|
||
return thread_id, True, None
|
||
else:
|
||
print(f"线程 {thread_id} 登录失败")
|
||
return thread_id, False, "登录失败"
|
||
except Exception as e:
|
||
print(f"线程 {thread_id} 登录异常: {e}")
|
||
return thread_id, False, str(e)
|
||
|
||
# 使用线程池并行登录
|
||
login_results = []
|
||
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
|
||
# 提交所有登录任务
|
||
driver_infos = [(i+1, driver) for i, driver in enumerate(drivers)]
|
||
future_to_thread = {executor.submit(login_driver, info): info[0] for info in driver_infos}
|
||
|
||
# 收集登录结果
|
||
for future in as_completed(future_to_thread):
|
||
thread_id, success, error = future.result()
|
||
login_results.append((thread_id, success, error))
|
||
|
||
# 检查登录结果
|
||
failed_logins = [result for result in login_results if not result[1]]
|
||
if failed_logins:
|
||
print(f"❌ {len(failed_logins)} 个线程登录失败:")
|
||
for thread_id, _, error in failed_logins:
|
||
print(f" - 线程 {thread_id}: {error}")
|
||
print("程序退出")
|
||
return
|
||
|
||
login_end_time = time.time()
|
||
login_duration = login_end_time - login_start_time
|
||
print(f"✅ 所有 {MAX_THREADS} 个线程登录成功")
|
||
print(f"⏱️ 并行登录耗时: {login_duration:.2f}秒")
|
||
print("开始批量查询...")
|
||
|
||
# 记录初始化完成时间
|
||
init_end_time = time.time()
|
||
init_duration = init_end_time - init_start_time
|
||
print(f"⏱️ 总初始化耗时: {init_duration:.2f}秒")
|
||
|
||
# 启动工作线程
|
||
process_start_time = time.time()
|
||
threads = []
|
||
for i in range(MAX_THREADS):
|
||
thread = threading.Thread(
|
||
target=worker_thread,
|
||
args=(i+1, thread_assignments[i], drivers[i], shared_results, results_lock)
|
||
)
|
||
thread.start()
|
||
threads.append(thread)
|
||
|
||
print("所有线程已启动,等待完成...")
|
||
|
||
# 等待所有线程完成
|
||
for thread in threads:
|
||
thread.join()
|
||
|
||
print("所有线程已完成,开始按批次顺序保存结果...")
|
||
|
||
# 按批次号排序并保存结果
|
||
shared_results.sort(key=lambda x: x[0]) # 按批次号排序
|
||
|
||
total_saved_results = 0
|
||
is_first_batch = True
|
||
|
||
for batch_index, batch_results, result_count in shared_results:
|
||
if batch_results:
|
||
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV, is_first_batch)
|
||
total_saved_results += saved_count
|
||
print(f"✅ 已保存批次 {batch_index} 的 {saved_count} 条结果")
|
||
is_first_batch = False
|
||
else:
|
||
print(f"批次 {batch_index} 无结果")
|
||
|
||
|
||
# 关闭所有驱动
|
||
print("关闭所有Chrome驱动...")
|
||
for i, driver in enumerate(drivers):
|
||
try:
|
||
driver.quit()
|
||
print(f"线程 {i+1} 驱动已关闭")
|
||
except Exception as e:
|
||
print(f"关闭线程 {i+1} 驱动时出错: {e}")
|
||
|
||
# 生成总结报告
|
||
end_time = time.time()
|
||
total_time = end_time - start_time
|
||
process_time = end_time - process_start_time
|
||
avg_time_per_batch = total_time / len(batches) if len(batches) > 0 else 0
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"📊 多线程批量查询完成报告")
|
||
print(f"{'='*50}")
|
||
print(f"总查询项: {len(query_items)}")
|
||
print(f"总批次数: {len(batches)}")
|
||
print(f"使用线程数: {MAX_THREADS}")
|
||
print(f"完成批次: {len(batches)}")
|
||
print(f"总保存结果数: {total_saved_results}")
|
||
print(f"⏱️ 总初始化耗时: {init_duration:.2f}秒")
|
||
print(f"⏱️ 并行登录耗时: {login_duration:.2f}秒")
|
||
print(f"⏱️ 处理耗时: {process_time:.2f}秒")
|
||
print(f"⏱️ 总耗时: {total_time:.2f}秒")
|
||
print(f"⏱️ 平均每批次: {avg_time_per_batch:.2f}秒")
|
||
|
||
# 计算实际性能提升
|
||
estimated_single_thread_time = avg_time_per_batch * len(batches)
|
||
actual_speedup = estimated_single_thread_time / total_time if total_time > 0 else 1
|
||
print(f"🚀 实际性能提升: {actual_speedup:.2f}x")
|
||
|
||
print(f"{'='*50}")
|
||
print(f"✅ 所有结果已按顺序保存到 {OUTPUT_CSV}")
|
||
|
||
if __name__ == "__main__":
|
||
main() |