Files
pa/pa.py

289 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# batch_query.py - 精简版
import os
import time
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
# 配置
URL = "https://prod-eu-cmp.simbalinkglobal.com"
INPUT_XPATH = '//textarea[@id="iccidList"] | //input[@id="iccidList"] | //*[@id="iccidList"]'
CONFIRM_BTN_XPATH = '//button[contains(text(),"查询")] | //button[contains(text(),"确认")] | //*[@id="pop-confirm"]//button[2] | //button[@type="submit"]'
BATCH_QUERY_BTN_XPATH = '/html/body/div/div[2]/main/div/div/div/div/div[2]/div[2]/div[1]/div[2]/div/button'
RESULT_ROWS_XPATH = [
'//table[contains(@class,"ant-table")]//tbody/tr',
'//table//tbody/tr',
'//div[contains(@class,"ant-table")]//tbody/tr',
'//table[@class="table"]//tbody/tr',
'//*[contains(@class,"table")]//tr[position()>1]'
]
MAX_PER_BATCH = 50
OUTPUT_CSV = "results.csv"
ICCID_FILE = "text.txt"
HEADLESS = False
BATCH_RETRY_COUNT = 2
COOKIES = {
'platformUser_session': 'eyJsYXN0QWNjZXNzZWQiOjE3NTkxNDc4NjYzMzJ9.2gNtuRzCQH%2BoNra1%2B1WXxcDtTmW91yYVAOLbH6Ry%2BLM',
'_manage_session': 'eyJ0b2tlbiI6ImV5SmhiR2NpT2lKSVV6STFOaUlzSW5SNWNDSTZJa3BYVkNKOS5leUoxYzJWeUlqcDdJblZ6WlhKZmFXUWlPaUpaU0RFNE9URXpOVFk1TkRVNU9EWTNOVEkxTVRJaWZTd2lRM0psWVhSbFZHbHRaU0k2SWpJd01qVXRNRGt0TWpsVU1URTZNVGs2TkRJdU9UZzJNRGt4TWpNekt6QXhPakF3SW4wLll6eWtYZGlweUFfaWN4TGxkX3MwS2dWQU5LM2JkZU1fNjM3NDV1ckxQNkEiLCJleHBpcmVUaW1lIjowLCJ1c2VySWQiOiJZSDE4OTEzNTY5NDU5ODY3NTI1MTIiLCJkYXRhTGltaXQiOiJjdXN0b21lIiwidHlwZSI6MSwibGV2ZWwiOjIsInBVc2VySWQiOiJZSDE3Njk5MTg2MjkxMjAyNDAzMjEiLCJsb2dpbk5hbWUiOiJ4aW9uZ3NoaV95dW53ZWkiLCJyb2xlSWQiOiJSTDE5NjI3MDM5MDkxNTU5MDE0NDAiLCJjbGllbnRJRHMiOlsiZXVfY2hlcnkiLCJlYnJvX2NoZXJ5Il0sImNsaWVudElkcyI6ImVicm9fY2hlcnkifQ%3D%3D.jBwQkblyoEP6t7OELXxUMKkoU9%2FJWWQsZPg25SZSz5o'
}
def read_query_items(path):
"""读取查询项目文件"""
encodings = ['utf-8', 'gbk', 'utf-8-sig', 'cp1252']
for encoding in encodings:
try:
with open(path, 'r', encoding=encoding) as f:
lines = [l.strip() for l in f.readlines() if l.strip()]
if lines:
print(f"使用编码 {encoding} 成功读取 {len(lines)} 个查询项")
return lines
except Exception:
continue
raise Exception("无法读取文件")
def save_results_to_csv(results, filename):
"""保存结果到CSV文件"""
if not results:
return 0
df_data = []
for result in results:
row_data = {"batch": result["batch"]}
cells = result["cells"]
if len(cells) >= 2:
row_data["ICCID"] = cells[0]
row_data["租户"] = cells[1]
for i, cell in enumerate(cells[2:], start=2):
row_data[f"{i+1}"] = cell
df_data.append(row_data)
df = pd.DataFrame(df_data)
if os.path.exists(filename):
existing_df = pd.read_csv(filename, encoding='utf-8-sig')
df = pd.concat([existing_df, df], ignore_index=True)
df.to_csv(filename, index=False, encoding='utf-8-sig')
return len(df_data)
def clear_input_box(driver, input_element):
"""清空输入框"""
try:
input_element.clear()
input_element.send_keys(Keys.CONTROL + "a")
input_element.send_keys(Keys.DELETE)
driver.execute_script("arguments[0].value = '';", input_element)
time.sleep(0.1)
return True
except Exception:
return False
def init_driver(headless=False):
"""初始化Chrome驱动"""
chrome_opts = Options()
if headless:
chrome_opts.add_argument("--headless=new")
# 优化参数
chrome_opts.add_argument("--no-sandbox")
chrome_opts.add_argument("--disable-dev-shm-usage")
chrome_opts.add_argument("--disable-logging")
chrome_opts.add_argument("--disable-gpu-logging")
chrome_opts.add_argument("--log-level=3")
chrome_opts.add_argument("--silent")
chrome_opts.add_argument("--disable-images")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_opts)
driver.maximize_window()
# 添加Cookie
driver.get(URL)
for name, value in COOKIES.items():
driver.add_cookie({"name": name, "value": value})
return driver
def scrape_results_from_table(driver):
"""抓取表格结果"""
results = []
# 尝试不同的表格XPath
for xpath in RESULT_ROWS_XPATH:
try:
rows = WebDriverWait(driver, 5).until(EC.presence_of_all_elements_located((By.XPATH, xpath)))
if rows:
break
except Exception:
continue
else:
print("未找到结果表格")
return []
for r in rows:
try:
cells = r.find_elements(By.TAG_NAME, "td")
if not cells:
cells = r.find_elements(By.TAG_NAME, "th")
if not cells:
cells = r.find_elements(By.XPATH, ".//div")
texts = [c.text.strip() for c in cells if c.text.strip()]
if texts:
results.append(texts)
except Exception:
continue
return results
def submit_batch_and_collect(driver, batch_items):
"""提交批次查询并收集结果"""
if len(batch_items) > MAX_PER_BATCH:
batch_items = batch_items[:MAX_PER_BATCH]
# 尝试打开批量查询弹窗
try:
batch_btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH)))
batch_btn.click()
time.sleep(0.5)
except Exception:
pass
# 查找输入框
try:
inp = WebDriverWait(driver, 5).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH)))
except Exception:
print("未找到输入框")
return []
# 清空并输入数据
if not clear_input_box(driver, inp):
return []
payload = "\n".join(batch_items)
inp.send_keys(payload)
# 点击查询按钮
try:
btn = WebDriverWait(driver, 3).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH)))
btn.click()
except Exception:
return []
# 等待结果并抓取
time.sleep(1)
results = scrape_results_from_table(driver)
# 清理输入框
try:
inp = driver.find_element(By.XPATH, INPUT_XPATH)
clear_input_box(driver, inp)
except Exception:
pass
return results
def main():
"""主函数"""
query_items = read_query_items(ICCID_FILE)
if not query_items:
print(f"{ICCID_FILE} 中未找到查询项")
return
print(f"总共读取到 {len(query_items)} 个查询项")
batches = [query_items[i:i+MAX_PER_BATCH] for i in range(0, len(query_items), MAX_PER_BATCH)]
print(f"将分为 {len(batches)} 个批次处理")
driver = init_driver(HEADLESS)
total_saved_results = 0
failed_batches = []
try:
driver.refresh()
time.sleep(1.5)
print("已通过Cookie自动登录开始批量查询...")
for batch_index, batch in enumerate(batches, 1):
print(f"\n=== 处理批次 {batch_index}/{len(batches)} ===")
print(f"本批次包含 {len(batch)} 个查询项")
# 处理大批次分割
if len(batch) > MAX_PER_BATCH:
print(f"⚠️ 批次大小超过限制,自动分割")
sub_batches = [batch[i:i+MAX_PER_BATCH] for i in range(0, len(batch), MAX_PER_BATCH)]
for sub_idx, sub_batch in enumerate(sub_batches):
print(f"处理子批次 {sub_idx + 1}/{len(sub_batches)}")
try:
results = submit_batch_and_collect(driver, sub_batch)
if results:
batch_results = [{"batch": f"{batch_index}-{sub_idx + 1}", "cells": row_cells} for row_cells in results]
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV)
total_saved_results += saved_count
print(f"子批次获得 {len(results)} 条结果,已保存")
time.sleep(0.5)
except Exception as e:
print(f"子批次 {sub_idx + 1} 处理失败: {e}")
failed_batches.append(f"{batch_index}-{sub_idx + 1}")
continue
# 重试机制
success = False
for retry in range(BATCH_RETRY_COUNT + 1):
try:
if retry > 0:
print(f"重试第 {retry} 次...")
time.sleep(2)
results = submit_batch_and_collect(driver, batch)
print(f"本批次获得 {len(results)} 条结果")
# 立即保存结果
if results:
batch_results = [{"batch": batch_index, "cells": row_cells} for row_cells in results]
saved_count = save_results_to_csv(batch_results, OUTPUT_CSV)
total_saved_results += saved_count
print(f"✅ 已保存 {saved_count} 条结果到 {OUTPUT_CSV}")
success = True
break
except Exception as e:
print(f"批次 {batch_index}{retry + 1} 次尝试失败: {e}")
if retry == BATCH_RETRY_COUNT:
failed_batches.append(batch_index)
print(f"❌ 批次 {batch_index} 重试失败")
# 批次间等待
if batch_index < len(batches):
time.sleep(1)
# 生成总结报告
print(f"\n{'='*50}")
print(f"📊 批量查询完成报告")
print(f"{'='*50}")
print(f"总查询项: {len(query_items)}")
print(f"总批次数: {len(batches)}")
print(f"成功批次: {len(batches) - len(failed_batches)}")
print(f"失败批次: {len(failed_batches)}")
print(f"成功率: {((len(batches) - len(failed_batches)) / len(batches) * 100):.1f}%")
print(f"总保存结果数: {total_saved_results}")
if failed_batches:
print(f"失败批次列表: {failed_batches}")
print(f"{'='*50}")
print(f"✅ 所有结果已实时保存到 {OUTPUT_CSV}")
except Exception as e:
print(f"程序执行出错: {e}")
finally:
driver.quit()
if __name__ == "__main__":
main()