更新 pa.py

This commit is contained in:
2025-10-01 00:12:18 +08:00
parent 2c77fd783d
commit 43d9dad47c

597
pa.py
View File

@@ -1,4 +1,4 @@
# batch_query.py - 精简版
#单线程硬怼
import os
import time
import pandas as pd
@@ -27,13 +27,17 @@ RESULT_ROWS_XPATH = [
MAX_PER_BATCH = 50
OUTPUT_CSV = "results.csv"
ICCID_FILE = "text.txt"
HEADLESS = False
HEADLESS = True # Linux环境默认启用无头模式
BATCH_RETRY_COUNT = 2
COOKIES = {
'platformUser_session': 'eyJsYXN0QWNjZXNzZWQiOjE3NTkxNDc4NjYzMzJ9.2gNtuRzCQH%2BoNra1%2B1WXxcDtTmW91yYVAOLbH6Ry%2BLM',
'_manage_session': 'eyJ0b2tlbiI6ImV5SmhiR2NpT2lKSVV6STFOaUlzSW5SNWNDSTZJa3BYVkNKOS5leUoxYzJWeUlqcDdJblZ6WlhKZmFXUWlPaUpaU0RFNE9URXpOVFk1TkRVNU9EWTNOVEkxTVRJaWZTd2lRM0psWVhSbFZHbHRaU0k2SWpJd01qVXRNRGt0TWpsVU1URTZNVGs2TkRJdU9UZzJNRGt4TWpNekt6QXhPakF3SW4wLll6eWtYZGlweUFfaWN4TGxkX3MwS2dWQU5LM2JkZU1fNjM3NDV1ckxQNkEiLCJleHBpcmVUaW1lIjowLCJ1c2VySWQiOiJZSDE4OTEzNTY5NDU5ODY3NTI1MTIiLCJkYXRhTGltaXQiOiJjdXN0b21lIiwidHlwZSI6MSwibGV2ZWwiOjIsInBVc2VySWQiOiJZSDE3Njk5MTg2MjkxMjAyNDAzMjEiLCJsb2dpbk5hbWUiOiJ4aW9uZ3NoaV95dW53ZWkiLCJyb2xlSWQiOiJSTDE5NjI3MDM5MDkxNTU5MDE0NDAiLCJjbGllbnRJRHMiOlsiZXVfY2hlcnkiLCJlYnJvX2NoZXJ5Il0sImNsaWVudElkcyI6ImVicm9fY2hlcnkifQ%3D%3D.jBwQkblyoEP6t7OELXxUMKkoU9%2FJWWQsZPg25SZSz5o'
}
# 登录配置
USERNAME = "xiongshi_yunwei" # 请替换为实际用户名
PASSWORD = "Cm20p20Nmo!25" # 请替换为实际密码
# 登录页面元素XPath - 根据实际页面元素更新
USERNAME_XPATH = "/html/body/div/div[2]/div/form/div[1]/div/div/div/div/span/input"
PASSWORD_XPATH = "//*[@id='password']"
LOGIN_BUTTON_XPATH = "/html/body/div/div[2]/div/form/button"
def read_query_items(path):
"""读取查询项目文件"""
@@ -49,30 +53,240 @@ def read_query_items(path):
continue
raise Exception("无法读取文件")
def save_results_to_csv(results, filename):
def save_results_to_csv(results, filename, is_first_batch=False):
"""保存结果到CSV文件"""
if not results:
return 0
# 构建DataFrame
df_data = []
for result in results:
row_data = {"batch": result["batch"]}
cells = result["cells"]
if len(cells) >= 2:
row_data["ICCID"] = cells[0]
row_data["租户"] = cells[1]
for i, cell in enumerate(cells[2:], start=2):
row_data[f"{i+1}"] = cell
#
for i, cell in enumerate(cells):
row_data[f"col_{i}"] = cell
df_data.append(row_data)
# 创建DataFrame
df = pd.DataFrame(df_data)
if os.path.exists(filename):
existing_df = pd.read_csv(filename, encoding='utf-8-sig')
df = pd.concat([existing_df, df], ignore_index=True)
df.to_csv(filename, index=False, encoding='utf-8-sig')
try:
if is_first_batch:
# 第一次保存时,手动添加表头
header_data = {
"batch": "批次号",
}
# 创建表头DataFrame
header_df = pd.DataFrame([header_data])
# 先写入表头,再写入数据
header_df.to_csv(filename, index=False, encoding='utf-8-sig')
df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig')
else:
# 后续批次只追加数据
df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig')
except Exception as e:
print(f"保存文件时出错: {e}")
return 0
return len(df_data)
def login(driver, username, password):
"""使用用户名密码登录"""
try:
print("开始登录...")
# 等待页面完全加载
time.sleep(3)
# 检查当前页面URL
current_url = driver.current_url
print(f"当前页面URL: {current_url}")
# 查找用户名输入框 - 使用更灵活的方式
username_input = None
username_selectors = [
USERNAME_XPATH, # 您提供的精确XPath
"//input[@id='username']", # 根据您提供的id
"//input[@placeholder='请输入账号']", # 根据您提供的placeholder
"//input[@type='text']",
"//input[@placeholder*='用户名']",
"//input[@placeholder*='账号']",
"//input[@placeholder*='user']",
"//input[@placeholder*='email']",
"//input[@name='username']",
"//input[@name='user']"
]
for selector in username_selectors:
try:
username_input = WebDriverWait(driver, 2).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
print(f"找到用户名输入框: {selector}")
break
except Exception:
continue
if not username_input:
print("未找到用户名输入框")
return False
# 查找密码输入框 - 使用更灵活的方式
password_input = None
password_selectors = [
PASSWORD_XPATH, # 您提供的精确XPath
"//input[@id='password']", # 根据您提供的id
"//input[@placeholder='请输入密码']", # 根据您提供的placeholder
"//input[@type='password']",
"//input[@placeholder*='密码']",
"//input[@placeholder*='password']",
"//input[@name='password']",
"//input[@name='pwd']"
]
for selector in password_selectors:
try:
password_input = WebDriverWait(driver, 2).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
print(f"找到密码输入框: {selector}")
break
except Exception:
continue
if not password_input:
print("未找到密码输入框")
return False
# 查找登录按钮 - 使用更灵活的方式
login_button = None
login_selectors = [
LOGIN_BUTTON_XPATH, # 您提供的精确XPath
"//button[@type='submit']",
"//button[contains(text(),'登录')]",
"//button[contains(text(),'Login')]",
"//button[contains(text(),'登入')]",
"//input[@type='submit']",
"//button[contains(@class,'login')]",
"//button[contains(@class,'submit')]"
]
for selector in login_selectors:
try:
login_button = WebDriverWait(driver, 2).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
print(f"找到登录按钮: {selector}")
break
except Exception:
continue
if not login_button:
print("未找到登录按钮")
return False
# 清空并输入用户名
try:
# 先点击输入框确保焦点
username_input.click()
time.sleep(0.5)
# 清空输入框
username_input.clear()
time.sleep(0.1)
# 使用JavaScript设置值更可靠
driver.execute_script("arguments[0].value = '';", username_input)
username_input.send_keys(username)
time.sleep(0.5)
print(f"已输入用户名: {username}")
except Exception as e:
print(f"输入用户名失败: {e}")
return False
# 清空并输入密码
try:
# 先点击输入框确保焦点
password_input.click()
time.sleep(0.2)
# 清空输入框
password_input.clear()
time.sleep(0.1)
# 使用JavaScript设置值更可靠
driver.execute_script("arguments[0].value = '';", password_input)
password_input.send_keys(password)
time.sleep(0.5)
print("已输入密码")
except Exception as e:
print(f"输入密码失败: {e}")
return False
# 点击登录按钮
try:
# 确保按钮可见和可点击
driver.execute_script("arguments[0].scrollIntoView(true);", login_button)
time.sleep(0.5)
# 尝试JavaScript点击更可靠
driver.execute_script("arguments[0].click();", login_button)
print("已点击登录按钮")
time.sleep(2)
except Exception as e:
print(f"点击登录按钮失败: {e}")
# 回退到普通点击
try:
login_button.click()
print("使用普通点击成功")
time.sleep(2)
except Exception as e2:
print(f"普通点击也失败: {e2}")
return False
# 等待登录完成,检查是否跳转到主页面
try:
print("等待登录完成...")
# 等待页面跳转或出现成功标识
WebDriverWait(driver, 15).until(
lambda driver: driver.current_url != current_url or
"login" not in driver.current_url.lower() or
len(driver.find_elements(By.XPATH, "//*[contains(text(),'登录成功') or contains(text(),'欢迎')]")) > 0
)
new_url = driver.current_url
print(f"登录后页面URL: {new_url}")
# 检查是否有错误信息
error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]")
if error_elements:
print(f"登录错误信息: {error_elements[0].text}")
return False
print("登录成功!")
return True
except Exception as e:
print(f"登录验证失败: {e}")
# 检查是否有错误信息
try:
error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]")
if error_elements:
print(f"登录错误信息: {error_elements[0].text}")
else:
print("未发现明显错误信息,可能登录成功")
return True
except:
pass
return False
except Exception as e:
print(f"登录过程中出错: {e}")
return False
def clear_input_box(driver, input_element):
"""清空输入框"""
try:
@@ -86,27 +300,146 @@ def clear_input_box(driver, input_element):
return False
def init_driver(headless=False):
"""初始化Chrome驱动"""
chrome_opts = Options()
if headless:
chrome_opts.add_argument("--headless=new")
"""初始化Chrome驱动 - 支持Linux无头模式"""
import platform
import random
# 优化参数
chrome_opts = Options()
# 检测操作系统
system = platform.system().lower()
is_linux = system == 'linux'
# 强制无头模式Linux环境或指定headless
if is_linux or headless:
chrome_opts.add_argument("--headless=new")
print("启用无头模式")
# Linux环境专用优化
if is_linux:
chrome_opts.add_argument("--no-sandbox")
chrome_opts.add_argument("--disable-dev-shm-usage")
chrome_opts.add_argument("--disable-logging")
chrome_opts.add_argument("--disable-gpu-logging")
chrome_opts.add_argument("--disable-gpu")
chrome_opts.add_argument("--disable-software-rasterizer")
chrome_opts.add_argument("--disable-background-timer-throttling")
chrome_opts.add_argument("--disable-backgrounding-occluded-windows")
chrome_opts.add_argument("--disable-renderer-backgrounding")
chrome_opts.add_argument("--disable-features=TranslateUI")
chrome_opts.add_argument("--disable-ipc-flooding-protection")
chrome_opts.add_argument("--single-process") # Linux单进程模式
chrome_opts.add_argument("--memory-pressure-off")
chrome_opts.add_argument("--max_old_space_size=4096")
# 反爬虫机制规避
# 随机用户代理
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0"
]
selected_ua = random.choice(user_agents)
chrome_opts.add_argument(f"--user-agent={selected_ua}")
# 反检测措施
chrome_opts.add_argument("--disable-blink-features=AutomationControlled")
chrome_opts.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_opts.add_experimental_option('useAutomationExtension', False)
# 窗口和显示设置
chrome_opts.add_argument("--window-size=1920,1080")
chrome_opts.add_argument("--start-maximized")
chrome_opts.add_argument("--disable-infobars")
chrome_opts.add_argument("--disable-notifications")
chrome_opts.add_argument("--disable-popup-blocking")
# 性能优化
chrome_opts.add_argument("--disable-images") # 禁用图片加载
chrome_opts.add_argument("--disable-javascript") # 禁用JavaScript如果需要的话
chrome_opts.add_argument("--disable-plugins")
chrome_opts.add_argument("--disable-extensions")
chrome_opts.add_argument("--disable-default-apps")
chrome_opts.add_argument("--disable-sync")
chrome_opts.add_argument("--disable-translate")
chrome_opts.add_argument("--hide-scrollbars")
chrome_opts.add_argument("--mute-audio")
chrome_opts.add_argument("--no-first-run")
# 网络和安全设置
chrome_opts.add_argument("--disable-web-security")
chrome_opts.add_argument("--disable-features=VizDisplayCompositor")
chrome_opts.add_argument("--disable-client-side-phishing-detection")
chrome_opts.add_argument("--disable-component-extensions-with-background-pages")
chrome_opts.add_argument("--disable-background-networking")
# 日志控制
chrome_opts.add_argument("--log-level=3")
chrome_opts.add_argument("--silent")
chrome_opts.add_argument("--disable-images")
chrome_opts.add_experimental_option('excludeSwitches', ['enable-logging'])
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_opts)
# 随机端口避免冲突
debug_port = random.randint(9222, 9999)
chrome_opts.add_argument(f"--remote-debugging-port={debug_port}")
try:
# 尝试使用ChromeDriverManager自动下载驱动
print("正在初始化Chrome驱动...")
if is_linux:
print("检测到Linux环境使用Linux优化配置")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_opts)
print("Chrome驱动初始化成功")
# 执行反检测脚本
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})")
except Exception as e:
print(f"ChromeDriverManager失败: {e}")
print("尝试使用系统PATH中的chromedriver...")
try:
# 回退到系统PATH中的chromedriver
driver = webdriver.Chrome(options=chrome_opts)
print("使用系统chromedriver成功")
# 执行反检测脚本
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})")
driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})")
except Exception as e2:
print(f"系统chromedriver也失败: {e2}")
if is_linux:
print("done")
else:
print("请确保已安装Chrome浏览器和chromedriver")
raise Exception("无法启动Chrome驱动请检查Chrome浏览器和chromedriver安装")
# 设置窗口大小(无头模式也需要)
try:
if not is_linux and not headless:
driver.maximize_window()
else:
driver.set_window_size(1920, 1080)
# 添加Cookie
except Exception as e:
print(f"窗口设置失败: {e}")
# 访问登录页面
try:
print(f"正在访问: {URL}")
driver.get(URL)
for name, value in COOKIES.items():
driver.add_cookie({"name": name, "value": value})
# 随机等待,模拟人类行为
wait_time = random.uniform(1, 3)
time.sleep(wait_time)
except Exception as e:
print(f"页面加载失败: {e}")
raise
return driver
@@ -114,11 +447,59 @@ def scrape_results_from_table(driver):
"""抓取表格结果"""
results = []
# 尝试不同的表格XPath
# 使用JavaScript直接获取表格数据
try:
# 尝试JavaScript方式获取表格数据
table_data = driver.execute_script("""
var tables = document.querySelectorAll('table, .ant-table');
for (var i = 0; i < tables.length; i++) {
var table = tables[i];
var rows = table.querySelectorAll('tbody tr, tr');
if (rows.length > 0) {
var data = [];
for (var j = 0; j < rows.length; j++) {
var cells = rows[j].querySelectorAll('td, th, div');
var rowData = [];
for (var k = 0; k < cells.length; k++) {
var text = cells[k].textContent || cells[k].innerText;
if (text && text.trim()) {
rowData.push(text.trim());
}
}
if (rowData.length > 0) {
// 过滤掉表头行(包含"ICCID""设备ID"等关键词的行)
var isHeader = false;
var headerKeywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI'];
for (var h = 0; h < headerKeywords.length; h++) {
if (rowData.some(function(cell) { return cell.includes(headerKeywords[h]); })) {
isHeader = true;
break;
}
}
if (!isHeader) {
data.push(rowData);
}
}
}
if (data.length > 0) {
return data;
}
}
}
return [];
""")
if table_data and len(table_data) > 0:
return table_data
except Exception as e:
print(f"JavaScript方式失败: {e}")
# 如果JavaScript失败回退到Selenium方式但优化等待时间
for xpath in RESULT_ROWS_XPATH:
try:
rows = WebDriverWait(driver, 5).until(EC.presence_of_all_elements_located((By.XPATH, xpath)))
rows = WebDriverWait(driver, 2).until(EC.presence_of_all_elements_located((By.XPATH, xpath))) # 减少等待时间
if rows:
print(f"Selenium方式找到 {len(rows)} 行数据")
break
except Exception:
continue
@@ -126,7 +507,30 @@ def scrape_results_from_table(driver):
print("未找到结果表格")
return []
# 优化单元格获取方式
for r in rows:
try:
# 优先使用JavaScript获取文本更快
cell_texts = driver.execute_script("""
var cells = arguments[0].querySelectorAll('td, th, div');
var texts = [];
for (var i = 0; i < cells.length; i++) {
var text = cells[i].textContent || cells[i].innerText;
if (text && text.trim()) {
texts.push(text.trim());
}
}
return texts;
""", r)
if cell_texts and len(cell_texts) > 0:
# 过滤掉表头行
header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI']
is_header = any(keyword in cell for cell in cell_texts for keyword in header_keywords)
if not is_header:
results.append(cell_texts)
except Exception:
# 回退到传统方式
try:
cells = r.find_elements(By.TAG_NAME, "td")
if not cells:
@@ -136,6 +540,10 @@ def scrape_results_from_table(driver):
texts = [c.text.strip() for c in cells if c.text.strip()]
if texts:
# 过滤掉表头行
header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI']
is_header = any(keyword in text for text in texts for keyword in header_keywords)
if not is_header:
results.append(texts)
except Exception:
continue
@@ -143,21 +551,27 @@ def scrape_results_from_table(driver):
return results
def submit_batch_and_collect(driver, batch_items):
"""提交批次查询并收集结果"""
"""提交批次查询并收集结果 - 增强反爬虫机制"""
import random
if len(batch_items) > MAX_PER_BATCH:
batch_items = batch_items[:MAX_PER_BATCH]
# 随机等待,模拟人类行为
wait_time = random.uniform(0.5, 2.0)
time.sleep(wait_time)
# 尝试打开批量查询弹窗
try:
batch_btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH)))
batch_btn = WebDriverWait(driver, 0.3).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH)))
batch_btn.click()
time.sleep(0.5)
time.sleep(random.uniform(0.3, 0.8))
except Exception:
pass
# 查找输入框
try:
inp = WebDriverWait(driver, 5).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH)))
inp = WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH)))
except Exception:
print("未找到输入框")
return []
@@ -166,24 +580,76 @@ def submit_batch_and_collect(driver, batch_items):
if not clear_input_box(driver, inp):
return []
# 模拟人类输入行为 - 分批输入
payload = "\n".join(batch_items)
inp.send_keys(payload)
# 分批输入,模拟人类打字
chunk_size = 10 # 每批输入10个ICCID
chunks = [batch_items[i:i+chunk_size] for i in range(0, len(batch_items), chunk_size)]
for i, chunk in enumerate(chunks):
chunk_payload = "\n".join(chunk)
if i == 0:
inp.send_keys(chunk_payload)
else:
inp.send_keys("\n" + chunk_payload)
# 随机等待,模拟人类输入间隔
if i < len(chunks) - 1:
wait_time = random.uniform(0.1, 0.3)
time.sleep(wait_time)
# 随机等待,模拟人类思考时间
time.sleep(random.uniform(0.5, 1.5))
# 点击查询按钮
try:
btn = WebDriverWait(driver, 3).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH)))
btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH)))
btn.click()
except Exception:
return []
# 等待结果并抓取
time.sleep(1)
# 随机等待后处理
time.sleep(random.uniform(0.5, 1.0))
# 检查并关闭可能出现的"不存在"弹窗
try:
# 使用精确的XPath关闭弹窗
close_btn_xpath = '/html/body/div[2]/div/div[2]/div/div[2]/div/button'
try:
close_btn = WebDriverWait(driver, 0.5).until(EC.element_to_be_clickable((By.XPATH, close_btn_xpath)))
close_btn.click()
print("已关闭'不存在'弹窗")
time.sleep(0.5)
except Exception:
# 如果精确XPath失败尝试其他可能的关闭按钮
alternative_selectors = [
'//div[contains(text(),"批量查询结果")]//button',
'//div[contains(text(),"以下SIM卡不存在")]//button',
'//div[contains(@class,"modal")]//button[contains(@class,"close")]',
'//div[contains(@class,"ant-modal")]//button[contains(@class,"close")]'
]
for selector in alternative_selectors:
try:
close_btn = WebDriverWait(driver, 0.2).until(EC.element_to_be_clickable((By.XPATH, selector)))
close_btn.click()
print("已关闭'不存在'弹窗(备用方式)")
time.sleep(0.1)
break
except Exception:
continue
except Exception:
pass
# 抓取结果
results = scrape_results_from_table(driver)
# 清理输入框
try:
inp = driver.find_element(By.XPATH, INPUT_XPATH)
clear_input_box(driver, inp)
driver.execute_script("arguments[0].value = '';", inp)
except Exception:
pass
@@ -191,6 +657,8 @@ def submit_batch_and_collect(driver, batch_items):
def main():
"""主函数"""
start_time = time.time()
query_items = read_query_items(ICCID_FILE)
if not query_items:
print(f"{ICCID_FILE} 中未找到查询项")
@@ -200,14 +668,36 @@ def main():
batches = [query_items[i:i+MAX_PER_BATCH] for i in range(0, len(query_items), MAX_PER_BATCH)]
print(f"将分为 {len(batches)} 个批次处理")
# 初始化驱动,添加重试机制
driver = None
max_retries = 3
for retry in range(max_retries):
try:
print(f"尝试初始化Chrome驱动 (第 {retry + 1}/{max_retries} 次)...")
driver = init_driver(HEADLESS)
break
except Exception as e:
print(f"{retry + 1} 次初始化失败: {e}")
if retry == max_retries - 1:
print("所有初始化尝试都失败了,程序退出")
return
print("等待3秒后重试...")
if driver is None:
print("无法初始化Chrome驱动程序退出")
return
total_saved_results = 0
failed_batches = []
is_first_batch = True # 标记是否是第一批次
try:
driver.refresh()
time.sleep(1.5)
print("已通过Cookie自动登录开始批量查询...")
# 执行登录
if not login(driver, USERNAME, PASSWORD):
print("登录失败,程序退出")
return
print("登录成功,开始批量查询...")
for batch_index, batch in enumerate(batches, 1):
print(f"\n=== 处理批次 {batch_index}/{len(batches)} ===")
@@ -223,10 +713,11 @@ def main():
results = submit_batch_and_collect(driver, sub_batch)
if results:
batch_results = [{"batch": f"{batch_index}-{sub_idx + 1}", "cells": row_cells} for row_cells in results]
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV)
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV, is_first_batch)
total_saved_results += saved_count
print(f"子批次获得 {len(results)} 条结果,已保存")
time.sleep(0.5)
is_first_batch = False # 后续批次不再写入表头
time.sleep(0.1)
except Exception as e:
print(f"子批次 {sub_idx + 1} 处理失败: {e}")
failed_batches.append(f"{batch_index}-{sub_idx + 1}")
@@ -238,7 +729,7 @@ def main():
try:
if retry > 0:
print(f"重试第 {retry} 次...")
time.sleep(2)
time.sleep(0.1)
results = submit_batch_and_collect(driver, batch)
print(f"本批次获得 {len(results)} 条结果")
@@ -246,9 +737,10 @@ def main():
# 立即保存结果
if results:
batch_results = [{"batch": batch_index, "cells": row_cells} for row_cells in results]
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV)
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV, is_first_batch)
total_saved_results += saved_count
print(f"✅ 已保存 {saved_count} 条结果到 {OUTPUT_CSV}")
is_first_batch = False # 后续批次不再写入表头
success = True
break
@@ -259,11 +751,18 @@ def main():
failed_batches.append(batch_index)
print(f"❌ 批次 {batch_index} 重试失败")
# 批次间等待
# 批次间随机等待,模拟人类行为
if batch_index < len(batches):
time.sleep(1)
import random
wait_time = random.uniform(0.5, 1.0)
print(f"批次间等待 {wait_time:.1f} 秒...")
time.sleep(wait_time)
# 生成总结报告
end_time = time.time()
total_time = end_time - start_time
avg_time_per_batch = total_time / len(batches) if len(batches) > 0 else 0
print(f"\n{'='*50}")
print(f"📊 批量查询完成报告")
print(f"{'='*50}")
@@ -273,6 +772,8 @@ def main():
print(f"失败批次: {len(failed_batches)}")
print(f"成功率: {((len(batches) - len(failed_batches)) / len(batches) * 100):.1f}%")
print(f"总保存结果数: {total_saved_results}")
print(f"⏱️ 总耗时: {total_time:.2f}")
print(f"⏱️ 平均每批次: {avg_time_per_batch:.2f}")
if failed_batches:
print(f"失败批次列表: {failed_batches}")