289 lines
11 KiB
Python
289 lines
11 KiB
Python
# batch_query.py - 精简版
|
||
import os
|
||
import time
|
||
import pandas as pd
|
||
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.common.keys import Keys
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
from selenium.webdriver.chrome.service import Service
|
||
|
||
# 配置
|
||
URL = "https://prod-eu-cmp.simbalinkglobal.com"
|
||
INPUT_XPATH = '//textarea[@id="iccidList"] | //input[@id="iccidList"] | //*[@id="iccidList"]'
|
||
CONFIRM_BTN_XPATH = '//button[contains(text(),"查询")] | //button[contains(text(),"确认")] | //*[@id="pop-confirm"]//button[2] | //button[@type="submit"]'
|
||
BATCH_QUERY_BTN_XPATH = '/html/body/div/div[2]/main/div/div/div/div/div[2]/div[2]/div[1]/div[2]/div/button'
|
||
RESULT_ROWS_XPATH = [
|
||
'//table[contains(@class,"ant-table")]//tbody/tr',
|
||
'//table//tbody/tr',
|
||
'//div[contains(@class,"ant-table")]//tbody/tr',
|
||
'//table[@class="table"]//tbody/tr',
|
||
'//*[contains(@class,"table")]//tr[position()>1]'
|
||
]
|
||
|
||
MAX_PER_BATCH = 50
|
||
OUTPUT_CSV = "results.csv"
|
||
ICCID_FILE = "text.txt"
|
||
HEADLESS = False
|
||
BATCH_RETRY_COUNT = 2
|
||
|
||
COOKIES = {
|
||
'platformUser_session': 'eyJsYXN0QWNjZXNzZWQiOjE3NTkxNDc4NjYzMzJ9.2gNtuRzCQH%2BoNra1%2B1WXxcDtTmW91yYVAOLbH6Ry%2BLM',
|
||
'_manage_session': 'eyJ0b2tlbiI6ImV5SmhiR2NpT2lKSVV6STFOaUlzSW5SNWNDSTZJa3BYVkNKOS5leUoxYzJWeUlqcDdJblZ6WlhKZmFXUWlPaUpaU0RFNE9URXpOVFk1TkRVNU9EWTNOVEkxTVRJaWZTd2lRM0psWVhSbFZHbHRaU0k2SWpJd01qVXRNRGt0TWpsVU1URTZNVGs2TkRJdU9UZzJNRGt4TWpNekt6QXhPakF3SW4wLll6eWtYZGlweUFfaWN4TGxkX3MwS2dWQU5LM2JkZU1fNjM3NDV1ckxQNkEiLCJleHBpcmVUaW1lIjowLCJ1c2VySWQiOiJZSDE4OTEzNTY5NDU5ODY3NTI1MTIiLCJkYXRhTGltaXQiOiJjdXN0b21lIiwidHlwZSI6MSwibGV2ZWwiOjIsInBVc2VySWQiOiJZSDE3Njk5MTg2MjkxMjAyNDAzMjEiLCJsb2dpbk5hbWUiOiJ4aW9uZ3NoaV95dW53ZWkiLCJyb2xlSWQiOiJSTDE5NjI3MDM5MDkxNTU5MDE0NDAiLCJjbGllbnRJRHMiOlsiZXVfY2hlcnkiLCJlYnJvX2NoZXJ5Il0sImNsaWVudElkcyI6ImVicm9fY2hlcnkifQ%3D%3D.jBwQkblyoEP6t7OELXxUMKkoU9%2FJWWQsZPg25SZSz5o'
|
||
}
|
||
|
||
def read_query_items(path):
|
||
"""读取查询项目文件"""
|
||
encodings = ['utf-8', 'gbk', 'utf-8-sig', 'cp1252']
|
||
for encoding in encodings:
|
||
try:
|
||
with open(path, 'r', encoding=encoding) as f:
|
||
lines = [l.strip() for l in f.readlines() if l.strip()]
|
||
if lines:
|
||
print(f"使用编码 {encoding} 成功读取 {len(lines)} 个查询项")
|
||
return lines
|
||
except Exception:
|
||
continue
|
||
raise Exception("无法读取文件")
|
||
|
||
def save_results_to_csv(results, filename):
|
||
"""保存结果到CSV文件"""
|
||
if not results:
|
||
return 0
|
||
|
||
df_data = []
|
||
for result in results:
|
||
row_data = {"batch": result["batch"]}
|
||
cells = result["cells"]
|
||
if len(cells) >= 2:
|
||
row_data["ICCID"] = cells[0]
|
||
row_data["租户"] = cells[1]
|
||
for i, cell in enumerate(cells[2:], start=2):
|
||
row_data[f"列{i+1}"] = cell
|
||
df_data.append(row_data)
|
||
|
||
df = pd.DataFrame(df_data)
|
||
if os.path.exists(filename):
|
||
existing_df = pd.read_csv(filename, encoding='utf-8-sig')
|
||
df = pd.concat([existing_df, df], ignore_index=True)
|
||
|
||
df.to_csv(filename, index=False, encoding='utf-8-sig')
|
||
return len(df_data)
|
||
|
||
def clear_input_box(driver, input_element):
|
||
"""清空输入框"""
|
||
try:
|
||
input_element.clear()
|
||
input_element.send_keys(Keys.CONTROL + "a")
|
||
input_element.send_keys(Keys.DELETE)
|
||
driver.execute_script("arguments[0].value = '';", input_element)
|
||
time.sleep(0.1)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
def init_driver(headless=False):
|
||
"""初始化Chrome驱动"""
|
||
chrome_opts = Options()
|
||
if headless:
|
||
chrome_opts.add_argument("--headless=new")
|
||
|
||
# 优化参数
|
||
chrome_opts.add_argument("--no-sandbox")
|
||
chrome_opts.add_argument("--disable-dev-shm-usage")
|
||
chrome_opts.add_argument("--disable-logging")
|
||
chrome_opts.add_argument("--disable-gpu-logging")
|
||
chrome_opts.add_argument("--log-level=3")
|
||
chrome_opts.add_argument("--silent")
|
||
chrome_opts.add_argument("--disable-images")
|
||
|
||
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_opts)
|
||
driver.maximize_window()
|
||
|
||
# 添加Cookie
|
||
driver.get(URL)
|
||
for name, value in COOKIES.items():
|
||
driver.add_cookie({"name": name, "value": value})
|
||
|
||
return driver
|
||
|
||
def scrape_results_from_table(driver):
|
||
"""抓取表格结果"""
|
||
results = []
|
||
|
||
# 尝试不同的表格XPath
|
||
for xpath in RESULT_ROWS_XPATH:
|
||
try:
|
||
rows = WebDriverWait(driver, 5).until(EC.presence_of_all_elements_located((By.XPATH, xpath)))
|
||
if rows:
|
||
break
|
||
except Exception:
|
||
continue
|
||
else:
|
||
print("未找到结果表格")
|
||
return []
|
||
|
||
for r in rows:
|
||
try:
|
||
cells = r.find_elements(By.TAG_NAME, "td")
|
||
if not cells:
|
||
cells = r.find_elements(By.TAG_NAME, "th")
|
||
if not cells:
|
||
cells = r.find_elements(By.XPATH, ".//div")
|
||
|
||
texts = [c.text.strip() for c in cells if c.text.strip()]
|
||
if texts:
|
||
results.append(texts)
|
||
except Exception:
|
||
continue
|
||
|
||
return results
|
||
|
||
def submit_batch_and_collect(driver, batch_items):
|
||
"""提交批次查询并收集结果"""
|
||
if len(batch_items) > MAX_PER_BATCH:
|
||
batch_items = batch_items[:MAX_PER_BATCH]
|
||
|
||
# 尝试打开批量查询弹窗
|
||
try:
|
||
batch_btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH)))
|
||
batch_btn.click()
|
||
time.sleep(0.5)
|
||
except Exception:
|
||
pass
|
||
|
||
# 查找输入框
|
||
try:
|
||
inp = WebDriverWait(driver, 5).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH)))
|
||
except Exception:
|
||
print("未找到输入框")
|
||
return []
|
||
|
||
# 清空并输入数据
|
||
if not clear_input_box(driver, inp):
|
||
return []
|
||
|
||
payload = "\n".join(batch_items)
|
||
inp.send_keys(payload)
|
||
|
||
# 点击查询按钮
|
||
try:
|
||
btn = WebDriverWait(driver, 3).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH)))
|
||
btn.click()
|
||
except Exception:
|
||
return []
|
||
|
||
# 等待结果并抓取
|
||
time.sleep(1)
|
||
results = scrape_results_from_table(driver)
|
||
|
||
# 清理输入框
|
||
try:
|
||
inp = driver.find_element(By.XPATH, INPUT_XPATH)
|
||
clear_input_box(driver, inp)
|
||
except Exception:
|
||
pass
|
||
|
||
return results
|
||
|
||
def main():
|
||
"""主函数"""
|
||
query_items = read_query_items(ICCID_FILE)
|
||
if not query_items:
|
||
print(f"在 {ICCID_FILE} 中未找到查询项")
|
||
return
|
||
|
||
print(f"总共读取到 {len(query_items)} 个查询项")
|
||
batches = [query_items[i:i+MAX_PER_BATCH] for i in range(0, len(query_items), MAX_PER_BATCH)]
|
||
print(f"将分为 {len(batches)} 个批次处理")
|
||
|
||
driver = init_driver(HEADLESS)
|
||
total_saved_results = 0
|
||
failed_batches = []
|
||
|
||
try:
|
||
driver.refresh()
|
||
time.sleep(1.5)
|
||
print("已通过Cookie自动登录,开始批量查询...")
|
||
|
||
for batch_index, batch in enumerate(batches, 1):
|
||
print(f"\n=== 处理批次 {batch_index}/{len(batches)} ===")
|
||
print(f"本批次包含 {len(batch)} 个查询项")
|
||
|
||
# 处理大批次分割
|
||
if len(batch) > MAX_PER_BATCH:
|
||
print(f"⚠️ 批次大小超过限制,自动分割")
|
||
sub_batches = [batch[i:i+MAX_PER_BATCH] for i in range(0, len(batch), MAX_PER_BATCH)]
|
||
for sub_idx, sub_batch in enumerate(sub_batches):
|
||
print(f"处理子批次 {sub_idx + 1}/{len(sub_batches)}")
|
||
try:
|
||
results = submit_batch_and_collect(driver, sub_batch)
|
||
if results:
|
||
batch_results = [{"batch": f"{batch_index}-{sub_idx + 1}", "cells": row_cells} for row_cells in results]
|
||
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV)
|
||
total_saved_results += saved_count
|
||
print(f"子批次获得 {len(results)} 条结果,已保存")
|
||
time.sleep(0.5)
|
||
except Exception as e:
|
||
print(f"子批次 {sub_idx + 1} 处理失败: {e}")
|
||
failed_batches.append(f"{batch_index}-{sub_idx + 1}")
|
||
continue
|
||
|
||
# 重试机制
|
||
success = False
|
||
for retry in range(BATCH_RETRY_COUNT + 1):
|
||
try:
|
||
if retry > 0:
|
||
print(f"重试第 {retry} 次...")
|
||
time.sleep(2)
|
||
|
||
results = submit_batch_and_collect(driver, batch)
|
||
print(f"本批次获得 {len(results)} 条结果")
|
||
|
||
# 立即保存结果
|
||
if results:
|
||
batch_results = [{"batch": batch_index, "cells": row_cells} for row_cells in results]
|
||
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV)
|
||
total_saved_results += saved_count
|
||
print(f"✅ 已保存 {saved_count} 条结果到 {OUTPUT_CSV}")
|
||
|
||
success = True
|
||
break
|
||
|
||
except Exception as e:
|
||
print(f"批次 {batch_index} 第 {retry + 1} 次尝试失败: {e}")
|
||
if retry == BATCH_RETRY_COUNT:
|
||
failed_batches.append(batch_index)
|
||
print(f"❌ 批次 {batch_index} 重试失败")
|
||
|
||
# 批次间等待
|
||
if batch_index < len(batches):
|
||
time.sleep(1)
|
||
|
||
# 生成总结报告
|
||
print(f"\n{'='*50}")
|
||
print(f"📊 批量查询完成报告")
|
||
print(f"{'='*50}")
|
||
print(f"总查询项: {len(query_items)}")
|
||
print(f"总批次数: {len(batches)}")
|
||
print(f"成功批次: {len(batches) - len(failed_batches)}")
|
||
print(f"失败批次: {len(failed_batches)}")
|
||
print(f"成功率: {((len(batches) - len(failed_batches)) / len(batches) * 100):.1f}%")
|
||
print(f"总保存结果数: {total_saved_results}")
|
||
|
||
if failed_batches:
|
||
print(f"失败批次列表: {failed_batches}")
|
||
|
||
print(f"{'='*50}")
|
||
print(f"✅ 所有结果已实时保存到 {OUTPUT_CSV}")
|
||
|
||
except Exception as e:
|
||
print(f"程序执行出错: {e}")
|
||
finally:
|
||
driver.quit()
|
||
|
||
if __name__ == "__main__":
|
||
main() |