From 43d9dad47cd2f9e3087496ff02c0016ff8bff8b4 Mon Sep 17 00:00:00 2001 From: zhaojie <1710884619@qq.com> Date: Wed, 1 Oct 2025 00:12:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20pa.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pa.py | 627 ++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 564 insertions(+), 63 deletions(-) diff --git a/pa.py b/pa.py index f00251b..258a8f2 100644 --- a/pa.py +++ b/pa.py @@ -1,4 +1,4 @@ -# batch_query.py - 精简版 +#单线程硬怼 import os import time import pandas as pd @@ -27,13 +27,17 @@ RESULT_ROWS_XPATH = [ MAX_PER_BATCH = 50 OUTPUT_CSV = "results.csv" ICCID_FILE = "text.txt" -HEADLESS = False +HEADLESS = True # Linux环境默认启用无头模式 BATCH_RETRY_COUNT = 2 -COOKIES = { - 'platformUser_session': 'eyJsYXN0QWNjZXNzZWQiOjE3NTkxNDc4NjYzMzJ9.2gNtuRzCQH%2BoNra1%2B1WXxcDtTmW91yYVAOLbH6Ry%2BLM', - '_manage_session': 'eyJ0b2tlbiI6ImV5SmhiR2NpT2lKSVV6STFOaUlzSW5SNWNDSTZJa3BYVkNKOS5leUoxYzJWeUlqcDdJblZ6WlhKZmFXUWlPaUpaU0RFNE9URXpOVFk1TkRVNU9EWTNOVEkxTVRJaWZTd2lRM0psWVhSbFZHbHRaU0k2SWpJd01qVXRNRGt0TWpsVU1URTZNVGs2TkRJdU9UZzJNRGt4TWpNekt6QXhPakF3SW4wLll6eWtYZGlweUFfaWN4TGxkX3MwS2dWQU5LM2JkZU1fNjM3NDV1ckxQNkEiLCJleHBpcmVUaW1lIjowLCJ1c2VySWQiOiJZSDE4OTEzNTY5NDU5ODY3NTI1MTIiLCJkYXRhTGltaXQiOiJjdXN0b21lIiwidHlwZSI6MSwibGV2ZWwiOjIsInBVc2VySWQiOiJZSDE3Njk5MTg2MjkxMjAyNDAzMjEiLCJsb2dpbk5hbWUiOiJ4aW9uZ3NoaV95dW53ZWkiLCJyb2xlSWQiOiJSTDE5NjI3MDM5MDkxNTU5MDE0NDAiLCJjbGllbnRJRHMiOlsiZXVfY2hlcnkiLCJlYnJvX2NoZXJ5Il0sImNsaWVudElkcyI6ImVicm9fY2hlcnkifQ%3D%3D.jBwQkblyoEP6t7OELXxUMKkoU9%2FJWWQsZPg25SZSz5o' -} +# 登录配置 +USERNAME = "xiongshi_yunwei" # 请替换为实际用户名 +PASSWORD = "Cm20p20Nmo!25" # 请替换为实际密码 + +# 登录页面元素XPath - 根据实际页面元素更新 +USERNAME_XPATH = "/html/body/div/div[2]/div/form/div[1]/div/div/div/div/span/input" +PASSWORD_XPATH = "//*[@id='password']" +LOGIN_BUTTON_XPATH = "/html/body/div/div[2]/div/form/button" def read_query_items(path): """读取查询项目文件""" @@ -49,30 +53,240 @@ def read_query_items(path): continue raise Exception("无法读取文件") -def save_results_to_csv(results, filename): +def save_results_to_csv(results, filename, is_first_batch=False): """保存结果到CSV文件""" if not results: return 0 + # 构建DataFrame df_data = [] for result in results: row_data = {"batch": result["batch"]} cells = result["cells"] - if len(cells) >= 2: - row_data["ICCID"] = cells[0] - row_data["租户"] = cells[1] - for i, cell in enumerate(cells[2:], start=2): - row_data[f"列{i+1}"] = cell + # + for i, cell in enumerate(cells): + row_data[f"col_{i}"] = cell df_data.append(row_data) + # 创建DataFrame df = pd.DataFrame(df_data) - if os.path.exists(filename): - existing_df = pd.read_csv(filename, encoding='utf-8-sig') - df = pd.concat([existing_df, df], ignore_index=True) - df.to_csv(filename, index=False, encoding='utf-8-sig') + try: + if is_first_batch: + # 第一次保存时,手动添加表头 + header_data = { + "batch": "批次号", + } + # 创建表头DataFrame + header_df = pd.DataFrame([header_data]) + # 先写入表头,再写入数据 + header_df.to_csv(filename, index=False, encoding='utf-8-sig') + df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig') + else: + # 后续批次只追加数据 + df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig') + except Exception as e: + print(f"保存文件时出错: {e}") + return 0 + return len(df_data) +def login(driver, username, password): + """使用用户名密码登录""" + try: + print("开始登录...") + + # 等待页面完全加载 + time.sleep(3) + + # 检查当前页面URL + current_url = driver.current_url + print(f"当前页面URL: {current_url}") + + # 查找用户名输入框 - 使用更灵活的方式 + username_input = None + username_selectors = [ + USERNAME_XPATH, # 您提供的精确XPath + "//input[@id='username']", # 根据您提供的id + "//input[@placeholder='请输入账号']", # 根据您提供的placeholder + "//input[@type='text']", + "//input[@placeholder*='用户名']", + "//input[@placeholder*='账号']", + "//input[@placeholder*='user']", + "//input[@placeholder*='email']", + "//input[@name='username']", + "//input[@name='user']" + ] + + for selector in username_selectors: + try: + username_input = WebDriverWait(driver, 2).until( + EC.element_to_be_clickable((By.XPATH, selector)) + ) + print(f"找到用户名输入框: {selector}") + break + except Exception: + continue + + if not username_input: + print("未找到用户名输入框") + return False + + # 查找密码输入框 - 使用更灵活的方式 + password_input = None + password_selectors = [ + PASSWORD_XPATH, # 您提供的精确XPath + "//input[@id='password']", # 根据您提供的id + "//input[@placeholder='请输入密码']", # 根据您提供的placeholder + "//input[@type='password']", + "//input[@placeholder*='密码']", + "//input[@placeholder*='password']", + "//input[@name='password']", + "//input[@name='pwd']" + ] + + for selector in password_selectors: + try: + password_input = WebDriverWait(driver, 2).until( + EC.element_to_be_clickable((By.XPATH, selector)) + ) + print(f"找到密码输入框: {selector}") + break + except Exception: + continue + + if not password_input: + print("未找到密码输入框") + return False + + # 查找登录按钮 - 使用更灵活的方式 + login_button = None + login_selectors = [ + LOGIN_BUTTON_XPATH, # 您提供的精确XPath + "//button[@type='submit']", + "//button[contains(text(),'登录')]", + "//button[contains(text(),'Login')]", + "//button[contains(text(),'登入')]", + "//input[@type='submit']", + "//button[contains(@class,'login')]", + "//button[contains(@class,'submit')]" + ] + + for selector in login_selectors: + try: + login_button = WebDriverWait(driver, 2).until( + EC.element_to_be_clickable((By.XPATH, selector)) + ) + print(f"找到登录按钮: {selector}") + break + except Exception: + continue + + if not login_button: + print("未找到登录按钮") + return False + + # 清空并输入用户名 + try: + # 先点击输入框确保焦点 + username_input.click() + time.sleep(0.5) + + # 清空输入框 + username_input.clear() + time.sleep(0.1) + + # 使用JavaScript设置值(更可靠) + driver.execute_script("arguments[0].value = '';", username_input) + username_input.send_keys(username) + time.sleep(0.5) + + print(f"已输入用户名: {username}") + except Exception as e: + print(f"输入用户名失败: {e}") + return False + + # 清空并输入密码 + try: + # 先点击输入框确保焦点 + password_input.click() + time.sleep(0.2) + + # 清空输入框 + password_input.clear() + time.sleep(0.1) + + # 使用JavaScript设置值(更可靠) + driver.execute_script("arguments[0].value = '';", password_input) + password_input.send_keys(password) + time.sleep(0.5) + + print("已输入密码") + except Exception as e: + print(f"输入密码失败: {e}") + return False + + # 点击登录按钮 + try: + # 确保按钮可见和可点击 + driver.execute_script("arguments[0].scrollIntoView(true);", login_button) + time.sleep(0.5) + + # 尝试JavaScript点击(更可靠) + driver.execute_script("arguments[0].click();", login_button) + print("已点击登录按钮") + time.sleep(2) + except Exception as e: + print(f"点击登录按钮失败: {e}") + # 回退到普通点击 + try: + login_button.click() + print("使用普通点击成功") + time.sleep(2) + except Exception as e2: + print(f"普通点击也失败: {e2}") + return False + + # 等待登录完成,检查是否跳转到主页面 + try: + print("等待登录完成...") + # 等待页面跳转或出现成功标识 + WebDriverWait(driver, 15).until( + lambda driver: driver.current_url != current_url or + "login" not in driver.current_url.lower() or + len(driver.find_elements(By.XPATH, "//*[contains(text(),'登录成功') or contains(text(),'欢迎')]")) > 0 + ) + + new_url = driver.current_url + print(f"登录后页面URL: {new_url}") + + # 检查是否有错误信息 + error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]") + if error_elements: + print(f"登录错误信息: {error_elements[0].text}") + return False + + print("登录成功!") + return True + + except Exception as e: + print(f"登录验证失败: {e}") + # 检查是否有错误信息 + try: + error_elements = driver.find_elements(By.XPATH, "//*[contains(text(),'错误') or contains(text(),'失败') or contains(text(),'invalid') or contains(text(),'incorrect')]") + if error_elements: + print(f"登录错误信息: {error_elements[0].text}") + else: + print("未发现明显错误信息,可能登录成功") + return True + except: + pass + return False + + except Exception as e: + print(f"登录过程中出错: {e}") + return False + def clear_input_box(driver, input_element): """清空输入框""" try: @@ -86,27 +300,146 @@ def clear_input_box(driver, input_element): return False def init_driver(headless=False): - """初始化Chrome驱动""" - chrome_opts = Options() - if headless: - chrome_opts.add_argument("--headless=new") + """初始化Chrome驱动 - 支持Linux无头模式""" + import platform + import random - # 优化参数 - chrome_opts.add_argument("--no-sandbox") - chrome_opts.add_argument("--disable-dev-shm-usage") - chrome_opts.add_argument("--disable-logging") - chrome_opts.add_argument("--disable-gpu-logging") + chrome_opts = Options() + + # 检测操作系统 + system = platform.system().lower() + is_linux = system == 'linux' + + # 强制无头模式(Linux环境或指定headless) + if is_linux or headless: + chrome_opts.add_argument("--headless=new") + print("启用无头模式") + + # Linux环境专用优化 + if is_linux: + chrome_opts.add_argument("--no-sandbox") + chrome_opts.add_argument("--disable-dev-shm-usage") + chrome_opts.add_argument("--disable-gpu") + chrome_opts.add_argument("--disable-software-rasterizer") + chrome_opts.add_argument("--disable-background-timer-throttling") + chrome_opts.add_argument("--disable-backgrounding-occluded-windows") + chrome_opts.add_argument("--disable-renderer-backgrounding") + chrome_opts.add_argument("--disable-features=TranslateUI") + chrome_opts.add_argument("--disable-ipc-flooding-protection") + chrome_opts.add_argument("--single-process") # Linux单进程模式 + chrome_opts.add_argument("--memory-pressure-off") + chrome_opts.add_argument("--max_old_space_size=4096") + + # 反爬虫机制规避 + # 随机用户代理 + user_agents = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0" + ] + selected_ua = random.choice(user_agents) + chrome_opts.add_argument(f"--user-agent={selected_ua}") + + # 反检测措施 + chrome_opts.add_argument("--disable-blink-features=AutomationControlled") + chrome_opts.add_experimental_option("excludeSwitches", ["enable-automation"]) + chrome_opts.add_experimental_option('useAutomationExtension', False) + + # 窗口和显示设置 + chrome_opts.add_argument("--window-size=1920,1080") + chrome_opts.add_argument("--start-maximized") + chrome_opts.add_argument("--disable-infobars") + chrome_opts.add_argument("--disable-notifications") + chrome_opts.add_argument("--disable-popup-blocking") + + # 性能优化 + chrome_opts.add_argument("--disable-images") # 禁用图片加载 + chrome_opts.add_argument("--disable-javascript") # 禁用JavaScript(如果需要的话) + chrome_opts.add_argument("--disable-plugins") + chrome_opts.add_argument("--disable-extensions") + chrome_opts.add_argument("--disable-default-apps") + chrome_opts.add_argument("--disable-sync") + chrome_opts.add_argument("--disable-translate") + chrome_opts.add_argument("--hide-scrollbars") + chrome_opts.add_argument("--mute-audio") + chrome_opts.add_argument("--no-first-run") + + # 网络和安全设置 + chrome_opts.add_argument("--disable-web-security") + chrome_opts.add_argument("--disable-features=VizDisplayCompositor") + chrome_opts.add_argument("--disable-client-side-phishing-detection") + chrome_opts.add_argument("--disable-component-extensions-with-background-pages") + chrome_opts.add_argument("--disable-background-networking") + + # 日志控制 chrome_opts.add_argument("--log-level=3") chrome_opts.add_argument("--silent") - chrome_opts.add_argument("--disable-images") + chrome_opts.add_experimental_option('excludeSwitches', ['enable-logging']) - driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_opts) - driver.maximize_window() + # 随机端口避免冲突 + debug_port = random.randint(9222, 9999) + chrome_opts.add_argument(f"--remote-debugging-port={debug_port}") - # 添加Cookie - driver.get(URL) - for name, value in COOKIES.items(): - driver.add_cookie({"name": name, "value": value}) + try: + # 尝试使用ChromeDriverManager自动下载驱动 + print("正在初始化Chrome驱动...") + if is_linux: + print("检测到Linux环境,使用Linux优化配置") + + service = Service(ChromeDriverManager().install()) + driver = webdriver.Chrome(service=service, options=chrome_opts) + print("Chrome驱动初始化成功") + + # 执行反检测脚本 + driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})") + driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})") + + except Exception as e: + print(f"ChromeDriverManager失败: {e}") + print("尝试使用系统PATH中的chromedriver...") + try: + # 回退到系统PATH中的chromedriver + driver = webdriver.Chrome(options=chrome_opts) + print("使用系统chromedriver成功") + + # 执行反检测脚本 + driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + driver.execute_script("Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]})") + driver.execute_script("Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']})") + + except Exception as e2: + print(f"系统chromedriver也失败: {e2}") + if is_linux: + print("done") + else: + print("请确保已安装Chrome浏览器和chromedriver") + raise Exception("无法启动Chrome驱动,请检查Chrome浏览器和chromedriver安装") + + # 设置窗口大小(无头模式也需要) + try: + if not is_linux and not headless: + driver.maximize_window() + else: + driver.set_window_size(1920, 1080) + + except Exception as e: + print(f"窗口设置失败: {e}") + + # 访问登录页面 + try: + print(f"正在访问: {URL}") + driver.get(URL) + + # 随机等待,模拟人类行为 + wait_time = random.uniform(1, 3) + time.sleep(wait_time) + + except Exception as e: + print(f"页面加载失败: {e}") + raise return driver @@ -114,11 +447,59 @@ def scrape_results_from_table(driver): """抓取表格结果""" results = [] - # 尝试不同的表格XPath + # 使用JavaScript直接获取表格数据 + try: + # 尝试JavaScript方式获取表格数据 + table_data = driver.execute_script(""" + var tables = document.querySelectorAll('table, .ant-table'); + for (var i = 0; i < tables.length; i++) { + var table = tables[i]; + var rows = table.querySelectorAll('tbody tr, tr'); + if (rows.length > 0) { + var data = []; + for (var j = 0; j < rows.length; j++) { + var cells = rows[j].querySelectorAll('td, th, div'); + var rowData = []; + for (var k = 0; k < cells.length; k++) { + var text = cells[k].textContent || cells[k].innerText; + if (text && text.trim()) { + rowData.push(text.trim()); + } + } + if (rowData.length > 0) { + // 过滤掉表头行(包含"ICCID"、"设备ID"等关键词的行) + var isHeader = false; + var headerKeywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI']; + for (var h = 0; h < headerKeywords.length; h++) { + if (rowData.some(function(cell) { return cell.includes(headerKeywords[h]); })) { + isHeader = true; + break; + } + } + if (!isHeader) { + data.push(rowData); + } + } + } + if (data.length > 0) { + return data; + } + } + } + return []; + """) + + if table_data and len(table_data) > 0: + return table_data + except Exception as e: + print(f"JavaScript方式失败: {e}") + + # 如果JavaScript失败,回退到Selenium方式(但优化等待时间) for xpath in RESULT_ROWS_XPATH: try: - rows = WebDriverWait(driver, 5).until(EC.presence_of_all_elements_located((By.XPATH, xpath))) + rows = WebDriverWait(driver, 2).until(EC.presence_of_all_elements_located((By.XPATH, xpath))) # 减少等待时间 if rows: + print(f"Selenium方式找到 {len(rows)} 行数据") break except Exception: continue @@ -126,38 +507,71 @@ def scrape_results_from_table(driver): print("未找到结果表格") return [] + # 优化单元格获取方式 for r in rows: try: - cells = r.find_elements(By.TAG_NAME, "td") - if not cells: - cells = r.find_elements(By.TAG_NAME, "th") - if not cells: - cells = r.find_elements(By.XPATH, ".//div") + # 优先使用JavaScript获取文本,更快 + cell_texts = driver.execute_script(""" + var cells = arguments[0].querySelectorAll('td, th, div'); + var texts = []; + for (var i = 0; i < cells.length; i++) { + var text = cells[i].textContent || cells[i].innerText; + if (text && text.trim()) { + texts.push(text.trim()); + } + } + return texts; + """, r) - texts = [c.text.strip() for c in cells if c.text.strip()] - if texts: - results.append(texts) + if cell_texts and len(cell_texts) > 0: + # 过滤掉表头行 + header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI'] + is_header = any(keyword in cell for cell in cell_texts for keyword in header_keywords) + if not is_header: + results.append(cell_texts) except Exception: - continue + # 回退到传统方式 + try: + cells = r.find_elements(By.TAG_NAME, "td") + if not cells: + cells = r.find_elements(By.TAG_NAME, "th") + if not cells: + cells = r.find_elements(By.XPATH, ".//div") + + texts = [c.text.strip() for c in cells if c.text.strip()] + if texts: + # 过滤掉表头行 + header_keywords = ['ICCID', '设备ID', '生命周期', '周期用量', '流量上限', '在用套餐', '租户', '服务状态', '激活时间', 'MSISDN', 'IMSI'] + is_header = any(keyword in text for text in texts for keyword in header_keywords) + if not is_header: + results.append(texts) + except Exception: + continue return results def submit_batch_and_collect(driver, batch_items): - """提交批次查询并收集结果""" + """提交批次查询并收集结果 - 增强反爬虫机制""" + import random + if len(batch_items) > MAX_PER_BATCH: batch_items = batch_items[:MAX_PER_BATCH] + # 随机等待,模拟人类行为 + wait_time = random.uniform(0.5, 2.0) + time.sleep(wait_time) + # 尝试打开批量查询弹窗 try: - batch_btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH))) + batch_btn = WebDriverWait(driver, 0.3).until(EC.element_to_be_clickable((By.XPATH, BATCH_QUERY_BTN_XPATH))) batch_btn.click() - time.sleep(0.5) + time.sleep(random.uniform(0.3, 0.8)) except Exception: pass # 查找输入框 try: - inp = WebDriverWait(driver, 5).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH))) + inp = WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.XPATH, INPUT_XPATH))) except Exception: print("未找到输入框") return [] @@ -166,24 +580,76 @@ def submit_batch_and_collect(driver, batch_items): if not clear_input_box(driver, inp): return [] + # 模拟人类输入行为 - 分批输入 payload = "\n".join(batch_items) - inp.send_keys(payload) + + # 分批输入,模拟人类打字 + chunk_size = 10 # 每批输入10个ICCID + chunks = [batch_items[i:i+chunk_size] for i in range(0, len(batch_items), chunk_size)] + + for i, chunk in enumerate(chunks): + chunk_payload = "\n".join(chunk) + if i == 0: + inp.send_keys(chunk_payload) + else: + inp.send_keys("\n" + chunk_payload) + + # 随机等待,模拟人类输入间隔 + if i < len(chunks) - 1: + wait_time = random.uniform(0.1, 0.3) + time.sleep(wait_time) + + # 随机等待,模拟人类思考时间 + time.sleep(random.uniform(0.5, 1.5)) # 点击查询按钮 try: - btn = WebDriverWait(driver, 3).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH))) + btn = WebDriverWait(driver, 1).until(EC.element_to_be_clickable((By.XPATH, CONFIRM_BTN_XPATH))) btn.click() except Exception: return [] - # 等待结果并抓取 - time.sleep(1) + # 随机等待后处理 + time.sleep(random.uniform(0.5, 1.0)) + + # 检查并关闭可能出现的"不存在"弹窗 + try: + # 使用精确的XPath关闭弹窗 + close_btn_xpath = '/html/body/div[2]/div/div[2]/div/div[2]/div/button' + try: + close_btn = WebDriverWait(driver, 0.5).until(EC.element_to_be_clickable((By.XPATH, close_btn_xpath))) + close_btn.click() + print("已关闭'不存在'弹窗") + time.sleep(0.5) + except Exception: + # 如果精确XPath失败,尝试其他可能的关闭按钮 + alternative_selectors = [ + '//div[contains(text(),"批量查询结果")]//button', + '//div[contains(text(),"以下SIM卡不存在")]//button', + '//div[contains(@class,"modal")]//button[contains(@class,"close")]', + '//div[contains(@class,"ant-modal")]//button[contains(@class,"close")]' + ] + + for selector in alternative_selectors: + try: + close_btn = WebDriverWait(driver, 0.2).until(EC.element_to_be_clickable((By.XPATH, selector))) + close_btn.click() + print("已关闭'不存在'弹窗(备用方式)") + time.sleep(0.1) + break + except Exception: + continue + + except Exception: + pass + + # 抓取结果 results = scrape_results_from_table(driver) # 清理输入框 try: inp = driver.find_element(By.XPATH, INPUT_XPATH) - clear_input_box(driver, inp) + driver.execute_script("arguments[0].value = '';", inp) except Exception: pass @@ -191,6 +657,8 @@ def submit_batch_and_collect(driver, batch_items): def main(): """主函数""" + start_time = time.time() + query_items = read_query_items(ICCID_FILE) if not query_items: print(f"在 {ICCID_FILE} 中未找到查询项") @@ -200,14 +668,36 @@ def main(): batches = [query_items[i:i+MAX_PER_BATCH] for i in range(0, len(query_items), MAX_PER_BATCH)] print(f"将分为 {len(batches)} 个批次处理") - driver = init_driver(HEADLESS) + # 初始化驱动,添加重试机制 + driver = None + max_retries = 3 + for retry in range(max_retries): + try: + print(f"尝试初始化Chrome驱动 (第 {retry + 1}/{max_retries} 次)...") + driver = init_driver(HEADLESS) + break + except Exception as e: + print(f"第 {retry + 1} 次初始化失败: {e}") + if retry == max_retries - 1: + print("所有初始化尝试都失败了,程序退出") + return + print("等待3秒后重试...") + + if driver is None: + print("无法初始化Chrome驱动,程序退出") + return + total_saved_results = 0 failed_batches = [] + is_first_batch = True # 标记是否是第一批次 try: - driver.refresh() - time.sleep(1.5) - print("已通过Cookie自动登录,开始批量查询...") + # 执行登录 + if not login(driver, USERNAME, PASSWORD): + print("登录失败,程序退出") + return + + print("登录成功,开始批量查询...") for batch_index, batch in enumerate(batches, 1): print(f"\n=== 处理批次 {batch_index}/{len(batches)} ===") @@ -223,10 +713,11 @@ def main(): results = submit_batch_and_collect(driver, sub_batch) if results: batch_results = [{"batch": f"{batch_index}-{sub_idx + 1}", "cells": row_cells} for row_cells in results] - saved_count = save_results_to_csv(batch_results, OUTPUT_CSV) + saved_count = save_results_to_csv(batch_results, OUTPUT_CSV, is_first_batch) total_saved_results += saved_count print(f"子批次获得 {len(results)} 条结果,已保存") - time.sleep(0.5) + is_first_batch = False # 后续批次不再写入表头 + time.sleep(0.1) except Exception as e: print(f"子批次 {sub_idx + 1} 处理失败: {e}") failed_batches.append(f"{batch_index}-{sub_idx + 1}") @@ -238,7 +729,7 @@ def main(): try: if retry > 0: print(f"重试第 {retry} 次...") - time.sleep(2) + time.sleep(0.1) results = submit_batch_and_collect(driver, batch) print(f"本批次获得 {len(results)} 条结果") @@ -246,9 +737,10 @@ def main(): # 立即保存结果 if results: batch_results = [{"batch": batch_index, "cells": row_cells} for row_cells in results] - saved_count = save_results_to_csv(batch_results, OUTPUT_CSV) + saved_count = save_results_to_csv(batch_results, OUTPUT_CSV, is_first_batch) total_saved_results += saved_count print(f"✅ 已保存 {saved_count} 条结果到 {OUTPUT_CSV}") + is_first_batch = False # 后续批次不再写入表头 success = True break @@ -259,11 +751,18 @@ def main(): failed_batches.append(batch_index) print(f"❌ 批次 {batch_index} 重试失败") - # 批次间等待 + # 批次间随机等待,模拟人类行为 if batch_index < len(batches): - time.sleep(1) + import random + wait_time = random.uniform(0.5, 1.0) + print(f"批次间等待 {wait_time:.1f} 秒...") + time.sleep(wait_time) # 生成总结报告 + end_time = time.time() + total_time = end_time - start_time + avg_time_per_batch = total_time / len(batches) if len(batches) > 0 else 0 + print(f"\n{'='*50}") print(f"📊 批量查询完成报告") print(f"{'='*50}") @@ -273,6 +772,8 @@ def main(): print(f"失败批次: {len(failed_batches)}") print(f"成功率: {((len(batches) - len(failed_batches)) / len(batches) * 100):.1f}%") print(f"总保存结果数: {total_saved_results}") + print(f"⏱️ 总耗时: {total_time:.2f}秒") + print(f"⏱️ 平均每批次: {avg_time_per_batch:.2f}秒") if failed_batches: print(f"失败批次列表: {failed_batches}") @@ -286,4 +787,4 @@ def main(): driver.quit() if __name__ == "__main__": - main() \ No newline at end of file + main()