From 5eb13324c2aa1c93e5e41fa3e8522684c3d6bcfd Mon Sep 17 00:00:00 2001 From: Zhaojie Date: Sat, 31 Jan 2026 18:00:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B8=85=E7=90=86=E8=A1=A8=E6=83=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/llm_config.py | 10 +- data_analysis_agent.py | 119 ++++++++++------- main.py | 16 +-- merge_excel.py | 34 ++--- prompts.py | 22 ++-- sort_csv.py | 20 +-- test.py | 14 +- utils/cache_manager.py | 8 +- utils/code_executor.py | 8 +- utils/data_loader.py | 24 ++-- utils/data_quality.py | 18 +-- utils/fallback_openai_client.py | 32 ++--- utils/format_execution_result.py | 8 +- utils/llm_helper.py | 2 +- utils/script_generator.py | 215 +++++++++++++++++++++++++++++++ 15 files changed, 394 insertions(+), 156 deletions(-) create mode 100644 utils/script_generator.py diff --git a/config/llm_config.py b/config/llm_config.py index 2b2205c..a3429d2 100644 --- a/config/llm_config.py +++ b/config/llm_config.py @@ -18,11 +18,11 @@ class LLMConfig: """LLM配置""" provider: str = os.environ.get("LLM_PROVIDER", "openai") # openai, gemini, etc. - api_key: str = os.environ.get("OPENAI_API_KEY", "sk-Gce85QLROESeOWf3icd2mQnYHOrmMYojwVPQ0AubMjGQ5ZE2") - base_url: str = os.environ.get("OPENAI_BASE_URL", "https://gemini.jeason.online/v1") - model: str = os.environ.get("OPENAI_MODEL", "gemini-2.5-pro") + api_key: str = os.environ.get("OPENAI_API_KEY", "sk-2187174de21548b0b8b0c92129700199") + base_url: str = os.environ.get("OPENAI_BASE_URL", "http://127.0.0.1:9999/v1") + model: str = os.environ.get("OPENAI_MODEL", "claude-sonnet-4-5") temperature: float = 0.5 - max_tokens: int = 131072 + max_tokens: int = 8192 # 降低默认值,避免某些API不支持过大的值 def __post_init__(self): """配置初始化后的处理""" @@ -34,6 +34,8 @@ class LLMConfig: # Gemini 的 OpenAI 兼容接口地址 self.base_url = os.environ.get("GEMINI_BASE_URL", "https://gemini.jeason.online") self.model = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") + # Gemini 有更严格的 token 限制 + self.max_tokens = 8192 def to_dict(self) -> Dict[str, Any]: """转换为字典""" diff --git a/data_analysis_agent.py b/data_analysis_agent.py index c83db08..cc48bf3 100644 --- a/data_analysis_agent.py +++ b/data_analysis_agent.py @@ -18,6 +18,7 @@ from utils.extract_code import extract_code_from_response from utils.data_loader import load_and_profile_data from utils.llm_helper import LLMHelper from utils.code_executor import CodeExecutor +from utils.script_generator import generate_reusable_script from config.llm_config import LLMConfig from prompts import data_analysis_system_prompt, final_report_system_prompt, data_analysis_followup_prompt @@ -61,6 +62,8 @@ class DataAnalysisAgent: self.session_output_dir = None self.executor = None self.data_profile = "" # 存储数据画像 + self.data_files = [] # 存储数据文件列表 + self.user_requirement = "" # 存储用户需求 def _process_response(self, response: str) -> Dict[str, Any]: """ @@ -76,7 +79,7 @@ class DataAnalysisAgent: yaml_data = self.llm.parse_yaml_response(response) action = yaml_data.get("action", "generate_code") - print(f"🎯 检测到动作: {action}") + print(f"[TARGET] 检测到动作: {action}") if action == "analysis_complete": return self._handle_analysis_complete(response, yaml_data) @@ -85,11 +88,11 @@ class DataAnalysisAgent: elif action == "generate_code": return self._handle_generate_code(response, yaml_data) else: - print(f"⚠️ 未知动作类型: {action},按generate_code处理") + print(f"[WARN] 未知动作类型: {action},按generate_code处理") return self._handle_generate_code(response, yaml_data) except Exception as e: - print(f"⚠️ 解析响应失败: {str(e)},尝试提取代码并按generate_code处理") + print(f"[WARN] 解析响应失败: {str(e)},尝试提取代码并按generate_code处理") # 即使YAML解析失败,也尝试提取代码 extracted_code = extract_code_from_response(response) if extracted_code: @@ -100,7 +103,7 @@ class DataAnalysisAgent: self, response: str, yaml_data: Dict[str, Any] ) -> Dict[str, Any]: """处理分析完成动作""" - print("✅ 分析任务完成") + print("[OK] 分析任务完成") final_report = yaml_data.get("final_report", "分析完成,无最终报告") return { "action": "analysis_complete", @@ -113,7 +116,7 @@ class DataAnalysisAgent: self, response: str, yaml_data: Dict[str, Any] ) -> Dict[str, Any]: """处理图片收集动作""" - print("📊 开始收集图片") + print("[CHART] 开始收集图片") figures_to_collect = yaml_data.get("figures_to_collect", []) collected_figures = [] @@ -130,10 +133,10 @@ class DataAnalysisAgent: description = figure_info.get("description", "") analysis = figure_info.get("analysis", "") - print(f"📈 收集图片 {figure_number}: {filename}") - print(f" 📂 路径: {file_path}") - print(f" 📝 描述: {description}") - print(f" 🔍 分析: {analysis}") + print(f"[GRAPH] 收集图片 {figure_number}: {filename}") + print(f" [DIR] 路径: {file_path}") + print(f" [NOTE] 描述: {description}") + print(f" [SEARCH] 分析: {analysis}") # 使用seen_paths集合来去重,防止重复收集 @@ -145,7 +148,7 @@ class DataAnalysisAgent: # 检查是否已经收集过该路径 abs_path = os.path.abspath(file_path) if abs_path not in seen_paths: - print(f" ✅ 文件存在: {file_path}") + print(f" [OK] 文件存在: {file_path}") # 记录图片信息 collected_figures.append( { @@ -158,12 +161,12 @@ class DataAnalysisAgent: ) seen_paths.add(abs_path) else: - print(f" ⚠️ 跳过重复图片: {file_path}") + print(f" [WARN] 跳过重复图片: {file_path}") else: if file_path: - print(f" ⚠️ 文件不存在: {file_path}") + print(f" [WARN] 文件不存在: {file_path}") else: - print(f" ⚠️ 未提供文件路径") + print(f" [WARN] 未提供文件路径") return { "action": "collect_figures", @@ -195,7 +198,7 @@ class DataAnalysisAgent: code = code.strip() if code: - print(f"🔧 执行代码:\n{code}") + print(f"[TOOL] 执行代码:\n{code}") print("-" * 40) # 执行代码 @@ -203,7 +206,7 @@ class DataAnalysisAgent: # 格式化执行结果 feedback = format_execution_result(result) - print(f"📋 执行反馈:\n{feedback}") + print(f"[LIST] 执行反馈:\n{feedback}") return { "action": "generate_code", @@ -215,7 +218,7 @@ class DataAnalysisAgent: } else: # 如果没有代码,说明LLM响应格式有问题,需要重新生成 - print("⚠️ 未从响应中提取到可执行代码,要求LLM重新生成") + print("[WARN] 未从响应中提取到可执行代码,要求LLM重新生成") return { "action": "invalid_response", "error": "响应中缺少可执行代码", @@ -246,6 +249,8 @@ class DataAnalysisAgent: self.conversation_history = [] self.analysis_results = [] self.current_round = 0 + self.data_files = files or [] # 保存数据文件列表 + self.user_requirement = user_input # 保存用户需求 # 创建本次分析的专用输出目录 if session_output_dir: @@ -264,12 +269,12 @@ class DataAnalysisAgent: # 设用工具生成数据画像 data_profile = "" if files: - print("🔍 正在生成数据画像...") + print("[SEARCH] 正在生成数据画像...") try: data_profile = load_and_profile_data(files) - print("✅ 数据画像生成完毕") + print("[OK] 数据画像生成完毕") except Exception as e: - print(f"⚠️ 数据画像生成失败: {e}") + print(f"[WARN] 数据画像生成失败: {e}") # 保存到实例变量供最终报告使用 self.data_profile = data_profile @@ -282,11 +287,11 @@ class DataAnalysisAgent: if data_profile: initial_prompt += f"\n\n{data_profile}\n\n请根据上述【数据画像】中的统计信息(如高频值、缺失率、数据范围)来制定分析策略。如果发现明显的高频问题或异常分布,请优先进行深度分析。" - print(f"🚀 开始数据分析任务") - print(f"📝 用户需求: {user_input}") + print(f"[START] 开始数据分析任务") + print(f"[NOTE] 用户需求: {user_input}") if files: - print(f"📁 数据文件: {', '.join(files)}") - print(f"📂 输出目录: {self.session_output_dir}") + print(f"[FOLDER] 数据文件: {', '.join(files)}") + print(f"[DIR] 输出目录: {self.session_output_dir}") # 添加到对话历史 self.conversation_history.append({"role": "user", "content": initial_prompt}) @@ -297,8 +302,8 @@ class DataAnalysisAgent: if max_rounds is None: current_max_rounds = 10 # 追问通常不需要那么长的思考链,10轮足够 - print(f"\n🚀 继续分析任务 (追问模式)") - print(f"📝 后续需求: {user_input}") + print(f"\n[START] 继续分析任务 (追问模式)") + print(f"[NOTE] 后续需求: {user_input}") # 重置当前轮数计数器,以便给新任务足够的轮次 self.current_round = 0 @@ -308,18 +313,21 @@ class DataAnalysisAgent: follow_up_prompt = f"后续需求: {user_input}\n(注意:这是后续追问,请直接针对该问题进行分析,无需从头开始执行完整SOP。)" self.conversation_history.append({"role": "user", "content": follow_up_prompt}) - print(f"🔢 本次最大轮数: {current_max_rounds}") + print(f"[NUM] 本次最大轮数: {current_max_rounds}") if self.force_max_rounds: - print(f"⚡ 强制模式: 将运行满 {current_max_rounds} 轮(忽略AI完成信号)") + print(f"[FAST] 强制模式: 将运行满 {current_max_rounds} 轮(忽略AI完成信号)") print("=" * 60) # 保存原始 max_rounds 以便恢复(虽然 analyze 结束后不需要恢复,但为了逻辑严谨) original_max_rounds = self.max_rounds self.max_rounds = current_max_rounds + # 初始化连续失败计数器 + consecutive_failures = 0 + while self.current_round < self.max_rounds: self.current_round += 1 - print(f"\n🔄 第 {self.current_round} 轮分析") + print(f"\n[LOOP] 第 {self.current_round} 轮分析") # 调用LLM生成响应 try: # 获取当前执行环境的变量信息 notebook_variables = self.executor.get_environment_info() @@ -340,15 +348,15 @@ class DataAnalysisAgent: formatted_system_prompt = base_system_prompt.format( notebook_variables=notebook_variables ) - print(f"🐛 [DEBUG] System Prompt Head:\n{formatted_system_prompt[:500]}...\n[...]") - print(f"🐛 [DEBUG] System Prompt Rules Check: 'stop_words' in prompt? {'stop_words' in formatted_system_prompt}") + print(f"[DEBUG] [DEBUG] System Prompt Head:\n{formatted_system_prompt[:500]}...\n[...]") + print(f"[DEBUG] [DEBUG] System Prompt Rules Check: 'stop_words' in prompt? {'stop_words' in formatted_system_prompt}") response = self.llm.call( prompt=self._build_conversation_prompt(), system_prompt=formatted_system_prompt, ) - print(f"🤖 助手响应:\n{response}") + print(f"[AI] 助手响应:\n{response}") # 使用统一的响应处理方法 process_result = self._process_response(response) @@ -356,9 +364,9 @@ class DataAnalysisAgent: # 根据处理结果决定是否继续(仅在非强制模式下) if process_result.get("action") == "invalid_response": consecutive_failures += 1 - print(f"⚠️ 连续失败次数: {consecutive_failures}/3") + print(f"[WARN] 连续失败次数: {consecutive_failures}/3") if consecutive_failures >= 3: - print(f"❌ 连续3次无法获取有效响应,分析终止。请检查网络或配置。") + print(f"[ERROR] 连续3次无法获取有效响应,分析终止。请检查网络或配置。") break else: consecutive_failures = 0 # 重置计数器 @@ -366,7 +374,7 @@ class DataAnalysisAgent: if not self.force_max_rounds and not process_result.get( "continue", True ): - print(f"\n✅ 分析完成!") + print(f"\n[OK] 分析完成!") break # 添加到对话历史 @@ -398,7 +406,7 @@ class DataAnalysisAgent: feedback = f"已收集 {len(collected_figures)} 个有效图片及其分析。" if missing_figures: - feedback += f"\n⚠️ 以下图片未找到,请检查代码是否成功保存了这些图片: {missing_figures}" + feedback += f"\n[WARN] 以下图片未找到,请检查代码是否成功保存了这些图片: {missing_figures}" self.conversation_history.append( { @@ -421,7 +429,7 @@ class DataAnalysisAgent: except Exception as e: error_msg = f"LLM调用错误: {str(e)}" - print(f"❌ {error_msg}") + print(f"[ERROR] {error_msg}") self.conversation_history.append( { "role": "user", @@ -430,7 +438,7 @@ class DataAnalysisAgent: ) # 生成最终总结 if self.current_round >= self.max_rounds: - print(f"\n⚠️ 已达到最大轮数 ({self.max_rounds}),分析结束") + print(f"\n[WARN] 已达到最大轮数 ({self.max_rounds}),分析结束") return self._generate_final_report() @@ -456,8 +464,8 @@ class DataAnalysisAgent: if result.get("action") == "collect_figures": all_figures.extend(result.get("collected_figures", [])) - print(f"\n📊 开始生成最终分析报告...") - print(f"📂 输出目录: {self.session_output_dir}") + print(f"\n[CHART] 开始生成最终分析报告...") + print(f"[DIR] 输出目录: {self.session_output_dir}") # --- 自动补全/发现图片机制 --- # 扫描目录下所有的png文件 @@ -475,7 +483,7 @@ class DataAnalysisAgent: for png_path in existing_pngs: abs_png_path = os.path.abspath(png_path) if abs_png_path not in collected_paths: - print(f"🔍 [自动发现] 补充未显式收集的图片: {os.path.basename(png_path)}") + print(f"[SEARCH] [自动发现] 补充未显式收集的图片: {os.path.basename(png_path)}") all_figures.append({ "figure_number": "Auto", "filename": os.path.basename(png_path), @@ -484,11 +492,11 @@ class DataAnalysisAgent: "analysis": "(该图表由系统自动捕获,Agent未提供具体分析文本,请结合图表标题理解)" }) except Exception as e: - print(f"⚠️ 自动发现图片失败: {e}") + print(f"[WARN] 自动发现图片失败: {e}") # --------------------------- - print(f"🔢 总轮数: {self.current_round}") - print(f"📈 收集图片: {len(all_figures)} 个") + print(f"[NUM] 总轮数: {self.current_round}") + print(f"[GRAPH] 收集图片: {len(all_figures)} 个") # 构建用于生成最终报告的提示词 final_report_prompt = self._build_final_report_prompt(all_figures) @@ -512,12 +520,12 @@ class DataAnalysisAgent: except: pass # 解析失败则保持原样 - print("✅ 最终报告生成完成") + print("[OK] 最终报告生成完成") - print("✅ 最终报告生成完成") + print("[OK] 最终报告生成完成") except Exception as e: - print(f"❌ 生成最终报告时出错: {str(e)}") + print(f"[ERROR] 生成最终报告时出错: {str(e)}") final_report_content = f"报告生成失败: {str(e)}" # 保存最终报告到文件 @@ -525,9 +533,21 @@ class DataAnalysisAgent: try: with open(report_file_path, "w", encoding="utf-8") as f: f.write(final_report_content) - print(f"📄 最终报告已保存至: {report_file_path}") + print(f"[DOC] 最终报告已保存至: {report_file_path}") except Exception as e: - print(f"❌ 保存报告文件失败: {str(e)}") + print(f"[ERROR] 保存报告文件失败: {str(e)}") + + # 生成可复用脚本 + script_path = "" + try: + script_path = generate_reusable_script( + analysis_results=self.analysis_results, + data_files=self.data_files, + session_output_dir=self.session_output_dir, + user_requirement=self.user_requirement + ) + except Exception as e: + print(f"[WARN] 脚本生成失败: {e}") # 返回完整的分析结果 return { @@ -538,6 +558,7 @@ class DataAnalysisAgent: "conversation_history": self.conversation_history, "final_report": final_report_content, "report_file_path": report_file_path, + "reusable_script_path": script_path, } def _build_final_report_prompt(self, all_figures: List[Dict[str, Any]]) -> str: @@ -584,7 +605,7 @@ class DataAnalysisAgent: # 在提示词中明确要求使用相对路径 prompt += """ -📁 **图片路径使用说明**: +[FOLDER] **图片路径使用说明**: 报告和图片都在同一目录下,请在报告中使用相对路径引用图片: - 格式:![图片描述](./图片文件名.png) - 示例:![营业总收入趋势](./营业总收入趋势.png) diff --git a/main.py b/main.py index 239605e..cab1102 100644 --- a/main.py +++ b/main.py @@ -17,7 +17,7 @@ class DualLogger: def write(self, message): self.terminal.write(message) # 过滤掉生成的代码块,不写入日志文件 - if "🔧 执行代码:" in message: + if "[TOOL] 执行代码:" in message: return self.log.write(message) self.log.flush() @@ -34,7 +34,7 @@ def setup_logging(log_dir): # 可选:也将错误输出重定向 # sys.stderr = logger print(f"\n{'='*20} Run Started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {'='*20}\n") - print(f"📄 日志文件已保存至: {os.path.join(log_dir, 'log.txt')}") + print(f"[DOC] 日志文件已保存至: {os.path.join(log_dir, 'log.txt')}") def main(): @@ -52,10 +52,10 @@ def main(): files.extend(glob.glob(pattern)) if not files: - print("⚠️ 未在当前目录找到数据文件 (.csv, .xlsx),尝试使用默认文件") + print("[WARN] 未在当前目录找到数据文件 (.csv, .xlsx),尝试使用默认文件") files = ["./cleaned_data.csv"] else: - print(f"📂 自动识别到以下数据文件: {files}") + print(f"[DIR] 自动识别到以下数据文件: {files}") analysis_requirement = """ 基于所有运维工单,整理一份工单健康度报告,包括但不限于对所有车联网技术支持工单的全面数据分析, @@ -92,16 +92,16 @@ def main(): print("\n" + "="*30 + " 当前阶段分析完成 " + "="*30) # 询问用户是否继续 - print("\n💡 你可以继续对数据提出分析需求,或者输入 'exit'/'quit' 结束程序。") - user_response = input("👉 请输入后续分析需求 (直接回车退出): ").strip() + print("\n[TIP] 你可以继续对数据提出分析需求,或者输入 'exit'/'quit' 结束程序。") + user_response = input("[>] 请输入后续分析需求 (直接回车退出): ").strip() if not user_response or user_response.lower() in ['exit', 'quit', 'n', 'no']: - print("👋 分析结束,再见!") + print("[BYE] 分析结束,再见!") break # 更新需求,进入下一轮循环 analysis_requirement = user_response - print(f"\n🔄 收到新需求,正在继续分析...") + print(f"\n[LOOP] 收到新需求,正在继续分析...") if __name__ == "__main__": diff --git a/merge_excel.py b/merge_excel.py index 1894621..7233afa 100644 --- a/merge_excel.py +++ b/merge_excel.py @@ -7,7 +7,7 @@ def merge_excel_files(source_dir="remotecontrol", output_file="merged_all_files. """ 将指定目录下的所有 Excel 文件 (.xlsx, .xls) 合并为一个 CSV 文件。 """ - print(f"🔍 正在扫描目录: {source_dir} ...") + print(f"[SEARCH] 正在扫描目录: {source_dir} ...") # 支持 xlsx 和 xls files_xlsx = glob.glob(os.path.join(source_dir, "*.xlsx")) @@ -15,36 +15,36 @@ def merge_excel_files(source_dir="remotecontrol", output_file="merged_all_files. files = files_xlsx + files_xls if not files: - print("⚠️ 未找到 Excel 文件。") + print("[WARN] 未找到 Excel 文件。") return # 按文件名中的数字进行排序 (例如: 1.xlsx, 2.xlsx, ..., 10.xlsx) try: files.sort(key=lambda x: int(os.path.basename(x).split('.')[0])) - print("🔢 已按文件名数字顺序排序") + print("[NUM] 已按文件名数字顺序排序") except ValueError: # 如果文件名不是纯数字,退回到字母排序 files.sort() - print("🔤 已按文件名包含非数字字符,使用字母顺序排序") + print("[TEXT] 已按文件名包含非数字字符,使用字母顺序排序") - print(f"📂 找到 {len(files)} 个文件: {files}") + print(f"[DIR] 找到 {len(files)} 个文件: {files}") all_dfs = [] for file in files: try: - print(f"📖 读取: {file}") + print(f"[READ] 读取: {file}") # 使用 ExcelFile 读取所有 sheet xls = pd.ExcelFile(file) - print(f" 📑 包含 Sheets: {xls.sheet_names}") + print(f" [PAGES] 包含 Sheets: {xls.sheet_names}") file_dfs = [] for sheet_name in xls.sheet_names: df = pd.read_excel(xls, sheet_name=sheet_name) if not df.empty: - print(f" ✅ Sheet '{sheet_name}' 读取成功: {len(df)} 行") + print(f" [OK] Sheet '{sheet_name}' 读取成功: {len(df)} 行") file_dfs.append(df) else: - print(f" ⚠️ Sheet '{sheet_name}' 为空,跳过") + print(f" [WARN] Sheet '{sheet_name}' 为空,跳过") if file_dfs: # 合并该文件的所有非空 sheet @@ -53,30 +53,30 @@ def merge_excel_files(source_dir="remotecontrol", output_file="merged_all_files. file_merged_df['Source_File'] = os.path.basename(file) all_dfs.append(file_merged_df) else: - print(f"⚠️ 文件 {file} 所有 Sheet 均为空") + print(f"[WARN] 文件 {file} 所有 Sheet 均为空") except Exception as e: - print(f"❌ 读取 {file} 失败: {e}") + print(f"[ERROR] 读取 {file} 失败: {e}") if all_dfs: - print("🔄 正在合并数据...") + print("[LOOP] 正在合并数据...") merged_df = pd.concat(all_dfs, ignore_index=True) # 按 SendTime 排序 if 'SendTime' in merged_df.columns: - print("⏳ 正在按 SendTime 排序...") + print("[TIMER] 正在按 SendTime 排序...") merged_df['SendTime'] = pd.to_datetime(merged_df['SendTime'], errors='coerce') merged_df = merged_df.sort_values(by='SendTime') else: - print("⚠️ 未找到 SendTime 列,跳过排序") + print("[WARN] 未找到 SendTime 列,跳过排序") - print(f"💾 保存到: {output_file}") + print(f"[CACHE] 保存到: {output_file}") merged_df.to_csv(output_file, index=False, encoding="utf-8-sig") - print(f"✅ 合并及排序完成!总行数: {len(merged_df)}") + print(f"[OK] 合并及排序完成!总行数: {len(merged_df)}") print(f" 输出文件: {os.path.abspath(output_file)}") else: - print("⚠️ 没有成功读取到任何数据。") + print("[WARN] 没有成功读取到任何数据。") if __name__ == "__main__": # 如果需要在当前目录运行并合并 remotecontrol 文件夹下的内容 diff --git a/prompts.py b/prompts.py index 3a872d6..be103f9 100644 --- a/prompts.py +++ b/prompts.py @@ -1,11 +1,11 @@ data_analysis_system_prompt = """你是一个专业的数据分析助手,运行在Jupyter Notebook环境中,能够根据用户需求生成和执行Python数据分析代码。 -🎯 **核心使命**: +[TARGET] **核心使命**: - 接收自然语言需求,分阶段生成高效、安全的数据分析代码。 - 深度挖掘数据,不仅仅是绘图,更要发现数据背后的业务洞察。 - 输出高质量、可落地的业务分析报告。 -🔧 **核心能力**: +[TOOL] **核心能力**: 1. **代码执行**:自动编写并执行Pandas/Matplotlib代码。 2. **多模态分析**:支持时序预测、文本挖掘(N-gram)、多维交叉分析。 3. **智能纠错**:遇到报错自动分析原因并修复代码。 @@ -24,7 +24,7 @@ jupyter notebook环境当前变量: --- -🔧 **代码生成规则 (Code Generation Rules)**: +[TOOL] **代码生成规则 (Code Generation Rules)**: **1. 执行策略**: - **分步执行**:每次只专注一个分析阶段(如“清洗”或“可视化”),不要试图一次性写完所有代码。 @@ -54,7 +54,7 @@ jupyter notebook环境当前变量: --- -🚀 **标准化分析SOP (Standard Operating Procedure)**: +[START] **标准化分析SOP (Standard Operating Procedure)**: **阶段1:数据探索与智能加载** - 检查文件扩展名与实际格式是否一致(CSV vs Excel)。 @@ -86,7 +86,7 @@ jupyter notebook环境当前变量: --- -📋 **动作选择指南 (Action Selection)**: +[LIST] **动作选择指南 (Action Selection)**: 1. **generate_code** - 场景:需要执行代码(加载、分析、绘图)。 @@ -127,7 +127,7 @@ jupyter notebook环境当前变量: --- -⚠️ **特别提示**: +[WARN] **特别提示**: - **翻译要求**:报告中的英文专有名词(除了TSP, TBOX, HU等标准缩写)必须翻译成中文(Remote Control -> 远控)。 - **客观陈述**:不要使用"data shows", "plot indicates"等技术语言,直接陈述业务事实("X车型在Y模块故障率最高")。 - **鲁棒性**:如果代码报错,请深呼吸,分析错误日志,修改代码重试。不要重复无效代码。 @@ -253,11 +253,11 @@ final_report_system_prompt = """你是一位**资深数据分析专家 (Senior D data_analysis_followup_prompt = """你是一个专业的数据分析助手,运行在Jupyter Notebook环境中。 当前处于**追问模式 (Follow-up Mode)**。用户基于之前的分析结果提出了新的需求。 -🎯 **核心使命**: +[TARGET] **核心使命**: - 直接针对用户的后续需求进行解答,**无需**重新执行完整SOP。 - 只有当用户明确要求重新进行全流程分析时,才执行SOP。 -🔧 **核心能力**: +[TOOL] **核心能力**: 1. **代码执行**:自动编写并执行Pandas/Matplotlib代码。 2. **多模态分析**:支持时序预测、文本挖掘(N-gram)、多维交叉分析。 3. **智能纠错**:遇到报错自动分析原因并修复代码。 @@ -267,7 +267,7 @@ jupyter notebook环境当前变量(已包含之前分析的数据df): --- -🚨 **关键红线 (Critical Rules)**: +[ALERT] **关键红线 (Critical Rules)**: 1. **进程保护**:严禁使用 `exit()`、`quit()` 或 `sys.exit()`。 2. **数据安全**:严禁伪造数据。严禁写入非结果文件。 3. **文件验证**:所有文件操作前必须 `os.path.exists()`。 @@ -276,14 +276,14 @@ jupyter notebook环境当前变量(已包含之前分析的数据df): --- -🔧 **代码生成规则 (Reuse)**: +[TOOL] **代码生成规则 (Reuse)**: - **环境持久化**:直接使用已加载的 `df`,不要重复加载数据。 - **可视化规范**:中文字体配置、类别>5使用水平条形图、美学要求同上。 - **文本挖掘**:如需挖掘,继续遵守N-gram和停用词规则。 --- -📋 **动作选择指南**: +[LIST] **动作选择指南**: 1. **generate_code** - 场景:执行针对追问的代码。 diff --git a/sort_csv.py b/sort_csv.py index c03a1b3..1e07f62 100644 --- a/sort_csv.py +++ b/sort_csv.py @@ -7,39 +7,39 @@ def sort_csv_by_time(file_path="remotecontrol_merged.csv", time_col="SendTime"): 读取 CSV 文件,按时间列排序,并保存。 """ if not os.path.exists(file_path): - print(f"❌ 文件不存在: {file_path}") + print(f"[ERROR] 文件不存在: {file_path}") return - print(f"📖 正在读取 {file_path} ...") + print(f"[READ] 正在读取 {file_path} ...") try: # 读取 CSV df = pd.read_csv(file_path, low_memory=False) - print(f" 📊 数据行数: {len(df)}") + print(f" [CHART] 数据行数: {len(df)}") if time_col not in df.columns: - print(f"❌ 未找到时间列: {time_col}") + print(f"[ERROR] 未找到时间列: {time_col}") print(f" 可用列: {list(df.columns)}") return - print(f"🔄 正在解析时间列 '{time_col}' ...") + print(f"[LOOP] 正在解析时间列 '{time_col}' ...") # 转换为 datetime 对象,无法解析的设为 NaT df[time_col] = pd.to_datetime(df[time_col], errors='coerce') # 检查无效时间 nat_count = df[time_col].isna().sum() if nat_count > 0: - print(f"⚠️ 发现 {nat_count} 行无效时间数据,排序时将排在最后") + print(f"[WARN] 发现 {nat_count} 行无效时间数据,排序时将排在最后") - print("🔄 正在按时间排序...") + print("[LOOP] 正在按时间排序...") df_sorted = df.sort_values(by=time_col) - print(f"💾 正在保存及覆盖文件: {file_path} ...") + print(f"[CACHE] 正在保存及覆盖文件: {file_path} ...") df_sorted.to_csv(file_path, index=False, encoding="utf-8-sig") - print("✅ 排序并保存完成!") + print("[OK] 排序并保存完成!") except Exception as e: - print(f"❌处理失败: {e}") + print(f"[ERROR]处理失败: {e}") if __name__ == "__main__": sort_csv_by_time() diff --git a/test.py b/test.py index e29c938..802147b 100644 --- a/test.py +++ b/test.py @@ -1,13 +1,13 @@ +from openai import OpenAI -import openai - -client = openai.OpenAI( - api_key="sk-Gce85QLROESeOWf3icd2mQnYHOrmMYojwVPQ0AubMjGQ5ZE2", - base_url="https://gemini.jeason.online/v1" +client = OpenAI( + base_url="http://127.0.0.1:9999/v1", + api_key="sk-2187174de21548b0b8b0c92129700199" ) response = client.chat.completions.create( - model="gemini-2.5-pro", - messages=[{"role": "user", "content": "你好,请自我介绍"}] + model="claude-sonnet-4-5", + messages=[{"role": "user", "content": "Hello"}] ) + print(response.choices[0].message.content) \ No newline at end of file diff --git a/utils/cache_manager.py b/utils/cache_manager.py index 63215c5..9ad9685 100644 --- a/utils/cache_manager.py +++ b/utils/cache_manager.py @@ -42,7 +42,7 @@ class CacheManager: with open(cache_path, 'rb') as f: return pickle.load(f) except Exception as e: - print(f"⚠️ 读取缓存失败: {e}") + print(f"[WARN] 读取缓存失败: {e}") return None return None @@ -56,14 +56,14 @@ class CacheManager: with open(cache_path, 'wb') as f: pickle.dump(value, f) except Exception as e: - print(f"⚠️ 写入缓存失败: {e}") + print(f"[WARN] 写入缓存失败: {e}") def clear(self) -> None: """清空所有缓存""" if self.cache_dir.exists(): for cache_file in self.cache_dir.glob("*.pkl"): cache_file.unlink() - print("✅ 缓存已清空") + print("[OK] 缓存已清空") def cached(self, key_func: Optional[Callable] = None): """缓存装饰器""" @@ -82,7 +82,7 @@ class CacheManager: # 尝试从缓存获取 cached_value = self.get(cache_key) if cached_value is not None: - print(f"💾 使用缓存: {cache_key[:8]}...") + print(f"[CACHE] 使用缓存: {cache_key[:8]}...") return cached_value # 执行函数并缓存结果 diff --git a/utils/code_executor.py b/utils/code_executor.py index 17f5537..c2cab11 100644 --- a/utils/code_executor.py +++ b/utils/code_executor.py @@ -410,17 +410,17 @@ from IPython.display import display try: # 尝试保存 fig.savefig(auto_filepath, bbox_inches='tight') - print(f"💾 [Auto-Save] 检测到未闭合图表,已安全保存至: {auto_filepath}") + print(f"[CACHE] [Auto-Save] 检测到未闭合图表,已安全保存至: {auto_filepath}") # 添加到输出中,告知Agent - output += f"\n[Auto-Save] ⚠️ 检测到Figure {fig_num}未关闭,系统已自动保存为: {auto_filename}" + output += f"\n[Auto-Save] [WARN] 检测到Figure {fig_num}未关闭,系统已自动保存为: {auto_filename}" self.image_counter += 1 except Exception as e: - print(f"⚠️ [Auto-Save] 保存失败: {e}") + print(f"[WARN] [Auto-Save] 保存失败: {e}") finally: plt.close(fig_num) except Exception as e: - print(f"⚠️ [Auto-Save Global] 异常: {e}") + print(f"[WARN] [Auto-Save Global] 异常: {e}") # --- 自动保存机制 end --- return { diff --git a/utils/data_loader.py b/utils/data_loader.py index 9717779..5f413fd 100644 --- a/utils/data_loader.py +++ b/utils/data_loader.py @@ -34,7 +34,7 @@ def load_and_profile_data(file_paths: list) -> str: profile_summary += f"## 文件: {file_name}\n\n" if not os.path.exists(file_path): - profile_summary += f"⚠️ 文件不存在: {file_path}\n\n" + profile_summary += f"[WARN] 文件不存在: {file_path}\n\n" continue try: @@ -52,7 +52,7 @@ def load_and_profile_data(file_paths: list) -> str: elif ext in ['.xlsx', '.xls']: df = pd.read_excel(file_path) else: - profile_summary += f"⚠️ 不支持的文件格式: {ext}\n\n" + profile_summary += f"[WARN] 不支持的文件格式: {ext}\n\n" continue # 基础信息 @@ -70,7 +70,7 @@ def load_and_profile_data(file_paths: list) -> str: profile_summary += f"#### {col} ({dtype})\n" if null_count > 0: - profile_summary += f"- ⚠️ 空值: {null_count} ({null_ratio:.1f}%)\n" + profile_summary += f"- [WARN] 空值: {null_count} ({null_ratio:.1f}%)\n" # 数值列分析 if pd.api.types.is_numeric_dtype(dtype): @@ -96,7 +96,7 @@ def load_and_profile_data(file_paths: list) -> str: profile_summary += "\n" except Exception as e: - profile_summary += f"❌ 读取或分析文件失败: {str(e)}\n\n" + profile_summary += f"[ERROR] 读取或分析文件失败: {str(e)}\n\n" return profile_summary @@ -141,7 +141,7 @@ def load_data_chunked(file_path: str, chunksize: Optional[int] = None) -> Iterat except UnicodeDecodeError: continue except Exception as e: - print(f"❌ 读取CSV文件失败: {e}") + print(f"[ERROR] 读取CSV文件失败: {e}") break elif ext in ['.xlsx', '.xls']: # Excel文件不支持chunksize,直接读取 @@ -151,7 +151,7 @@ def load_data_chunked(file_path: str, chunksize: Optional[int] = None) -> Iterat for i in range(0, len(df), chunksize): yield df.iloc[i:i+chunksize] except Exception as e: - print(f"❌ 读取Excel文件失败: {e}") + print(f"[ERROR] 读取Excel文件失败: {e}") def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional[pd.DataFrame]: @@ -166,7 +166,7 @@ def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional DataFrame或None """ if not os.path.exists(file_path): - print(f"⚠️ 文件不存在: {file_path}") + print(f"[WARN] 文件不存在: {file_path}") return None # 检查文件大小 @@ -174,7 +174,7 @@ def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional # 对于大文件,建议使用流式处理 if file_size_mb > app_config.max_file_size_mb: - print(f"⚠️ 文件过大 ({file_size_mb:.1f}MB),建议使用 load_data_chunked() 流式处理") + print(f"[WARN] 文件过大 ({file_size_mb:.1f}MB),建议使用 load_data_chunked() 流式处理") # 生成缓存键 cache_key = get_file_hash(file_path) @@ -183,7 +183,7 @@ def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional if not force_reload and app_config.data_cache_enabled: cached_data = data_cache.get(cache_key) if cached_data is not None: - print(f"💾 从缓存加载数据: {os.path.basename(file_path)}") + print(f"[CACHE] 从缓存加载数据: {os.path.basename(file_path)}") return cached_data # 加载数据 @@ -202,16 +202,16 @@ def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional elif ext in ['.xlsx', '.xls']: df = pd.read_excel(file_path) else: - print(f"⚠️ 不支持的文件格式: {ext}") + print(f"[WARN] 不支持的文件格式: {ext}") return None # 缓存数据 if df is not None and app_config.data_cache_enabled: data_cache.set(cache_key, df) - print(f"✅ 数据已缓存: {os.path.basename(file_path)}") + print(f"[OK] 数据已缓存: {os.path.basename(file_path)}") return df except Exception as e: - print(f"❌ 加载数据失败: {e}") + print(f"[ERROR] 加载数据失败: {e}") return None diff --git a/utils/data_quality.py b/utils/data_quality.py index 4458d62..4919b37 100644 --- a/utils/data_quality.py +++ b/utils/data_quality.py @@ -192,27 +192,27 @@ class DataQualityChecker: summary += f"**质量评分**: {self.quality_score:.1f}/100\n\n" if self.quality_score >= 90: - summary += "✅ **评级**: 优秀 - 数据质量很好\n\n" + summary += "[OK] **评级**: 优秀 - 数据质量很好\n\n" elif self.quality_score >= 75: - summary += "⚠️ **评级**: 良好 - 存在一些小问题\n\n" + summary += "[WARN] **评级**: 良好 - 存在一些小问题\n\n" elif self.quality_score >= 60: - summary += "⚠️ **评级**: 一般 - 需要处理多个问题\n\n" + summary += "[WARN] **评级**: 一般 - 需要处理多个问题\n\n" else: - summary += "❌ **评级**: 差 - 数据质量问题严重\n\n" + summary += "[ERROR] **评级**: 差 - 数据质量问题严重\n\n" summary += f"**问题统计**: 共 {len(self.issues)} 个质量问题\n" - summary += f"- 🔴 高严重性: {len([i for i in self.issues if i.severity == 'high'])} 个\n" - summary += f"- 🟡 中严重性: {len([i for i in self.issues if i.severity == 'medium'])} 个\n" - summary += f"- 🟢 低严重性: {len([i for i in self.issues if i.severity == 'low'])} 个\n\n" + summary += f"- [RED] 高严重性: {len([i for i in self.issues if i.severity == 'high'])} 个\n" + summary += f"- [YELLOW] 中严重性: {len([i for i in self.issues if i.severity == 'medium'])} 个\n" + summary += f"- [GREEN] 低严重性: {len([i for i in self.issues if i.severity == 'low'])} 个\n\n" if self.issues: summary += "### 主要问题:\n\n" # 只显示高和中严重性的问题 for issue in self.issues: if issue.severity in ["high", "medium"]: - emoji = "🔴" if issue.severity == "high" else "🟡" + emoji = "[RED]" if issue.severity == "high" else "[YELLOW]" summary += f"{emoji} **{issue.column}** - {issue.description}\n" - summary += f" 💡 {issue.suggestion}\n\n" + summary += f" [TIP] {issue.suggestion}\n\n" return summary diff --git a/utils/fallback_openai_client.py b/utils/fallback_openai_client.py index 0caed5a..005f137 100644 --- a/utils/fallback_openai_client.py +++ b/utils/fallback_openai_client.py @@ -57,7 +57,7 @@ class AsyncFallbackOpenAIClient: self.fallback_client = AsyncOpenAI(api_key=fallback_api_key, base_url=fallback_base_url, **_fallback_args) self.fallback_model_name = fallback_model_name else: - print("⚠️ 警告: 未完全配置备用 API 客户端。如果主 API 失败,将无法进行回退。") + print("[WARN] 警告: 未完全配置备用 API 客户端。如果主 API 失败,将无法进行回退。") self.content_filter_error_code = content_filter_error_code self.content_filter_error_field = content_filter_error_field @@ -90,11 +90,11 @@ class AsyncFallbackOpenAIClient: return completion except (APIConnectionError, APITimeoutError) as e: # 通常可以重试的网络错误 last_exception = e - print(f"⚠️ {api_name} API 调用时发生可重试错误 ({type(e).__name__}): {e}. 尝试次数 {attempt + 1}/{max_retries + 1}") + print(f"[WARN] {api_name} API 调用时发生可重试错误 ({type(e).__name__}): {e}. 尝试次数 {attempt + 1}/{max_retries + 1}") if attempt < max_retries: await asyncio.sleep(self.retry_delay_seconds * (attempt + 1)) # 增加延迟 else: - print(f"❌ {api_name} API 在达到最大重试次数后仍然失败。") + print(f"[ERROR] {api_name} API 在达到最大重试次数后仍然失败。") except APIStatusError as e: # API 返回的特定状态码错误 is_content_filter_error = False retry_after = None @@ -118,7 +118,7 @@ class AsyncFallbackOpenAIClient: if delay_str.endswith("s"): try: retry_after = float(delay_str[:-1]) - print(f"⏳ 收到服务器 RetryInfo,等待时间: {retry_after}秒") + print(f"[TIMER] 收到服务器 RetryInfo,等待时间: {retry_after}秒") except ValueError: pass except Exception: @@ -128,7 +128,7 @@ class AsyncFallbackOpenAIClient: raise e last_exception = e - print(f"⚠️ {api_name} API 调用时发生 APIStatusError ({e.status_code}): {e}. 尝试次数 {attempt + 1}/{max_retries + 1}") + print(f"[WARN] {api_name} API 调用时发生 APIStatusError ({e.status_code}): {e}. 尝试次数 {attempt + 1}/{max_retries + 1}") if attempt < max_retries: # 如果获取到了明确的 retry_after,则使用它;否则使用默认的指数退避 @@ -137,13 +137,13 @@ class AsyncFallbackOpenAIClient: if e.status_code == 429 and retry_after is None: wait_time = max(wait_time, 5.0 * (attempt + 1)) # 429 默认至少等 5 秒 - print(f"💤 将等待 {wait_time:.2f} 秒后重试...") + print(f"[WAIT] 将等待 {wait_time:.2f} 秒后重试...") await asyncio.sleep(wait_time) else: - print(f"❌ {api_name} API 在达到最大重试次数后仍然失败 (APIStatusError)。") + print(f"[ERROR] {api_name} API 在达到最大重试次数后仍然失败 (APIStatusError)。") except APIError as e: # 其他不可轻易重试的 OpenAI 错误 last_exception = e - print(f"❌ {api_name} API 调用时发生不可重试错误 ({type(e).__name__}): {e}") + print(f"[ERROR] {api_name} API 调用时发生不可重试错误 ({type(e).__name__}): {e}") break # 不再重试此类错误 if last_exception: @@ -196,7 +196,7 @@ class AsyncFallbackOpenAIClient: pass if is_content_filter_error and self.fallback_client and self.fallback_model_name: - print(f"ℹ️ 主 API 内容过滤错误 ({e_primary.status_code})。尝试切换到备用 API ({self.fallback_client.base_url})...") + print(f"[INFO] 主 API 内容过滤错误 ({e_primary.status_code})。尝试切换到备用 API ({self.fallback_client.base_url})...") try: fallback_completion = await self._attempt_api_call( client=self.fallback_client, @@ -206,20 +206,20 @@ class AsyncFallbackOpenAIClient: api_name="备用", **kwargs.copy() ) - print(f"✅ 备用 API 调用成功。") + print(f"[OK] 备用 API 调用成功。") return fallback_completion except APIError as e_fallback: - print(f"❌ 备用 API 调用最终失败: {type(e_fallback).__name__} - {e_fallback}") + print(f"[ERROR] 备用 API 调用最终失败: {type(e_fallback).__name__} - {e_fallback}") raise e_fallback else: if not (self.fallback_client and self.fallback_model_name and is_content_filter_error): # 如果不是内容过滤错误,或者没有可用的备用API,则记录主API的原始错误 - print(f"ℹ️ 主 API 错误 ({type(e_primary).__name__}: {e_primary}), 且不满足备用条件或备用API未配置。") + print(f"[INFO] 主 API 错误 ({type(e_primary).__name__}: {e_primary}), 且不满足备用条件或备用API未配置。") raise e_primary except APIError as e_primary_other: - print(f"❌ 主 API 调用最终失败 (非内容过滤,错误类型: {type(e_primary_other).__name__}): {e_primary_other}") + print(f"[ERROR] 主 API 调用最终失败 (非内容过滤,错误类型: {type(e_primary_other).__name__}): {e_primary_other}") if self.fallback_client and self.fallback_model_name: - print(f"ℹ️ 主 API 失败,尝试切换到备用 API ({self.fallback_client.base_url})...") + print(f"[INFO] 主 API 失败,尝试切换到备用 API ({self.fallback_client.base_url})...") try: fallback_completion = await self._attempt_api_call( client=self.fallback_client, @@ -229,10 +229,10 @@ class AsyncFallbackOpenAIClient: api_name="备用", **kwargs.copy() ) - print(f"✅ 备用 API 调用成功。") + print(f"[OK] 备用 API 调用成功。") return fallback_completion except APIError as e_fallback_after_primary_fail: - print(f"❌ 备用 API 在主 API 失败后也调用失败: {type(e_fallback_after_primary_fail).__name__} - {e_fallback_after_primary_fail}") + print(f"[ERROR] 备用 API 在主 API 失败后也调用失败: {type(e_fallback_after_primary_fail).__name__} - {e_fallback_after_primary_fail}") raise e_fallback_after_primary_fail else: raise e_primary_other diff --git a/utils/format_execution_result.py b/utils/format_execution_result.py index 7706d92..d886b10 100644 --- a/utils/format_execution_result.py +++ b/utils/format_execution_result.py @@ -7,17 +7,17 @@ def format_execution_result(result: Dict[str, Any]) -> str: feedback = [] if result['success']: - feedback.append("✅ 代码执行成功") + feedback.append("[OK] 代码执行成功") if result['output']: - feedback.append(f"📊 输出结果:\n{result['output']}") + feedback.append(f"[CHART] 输出结果:\n{result['output']}") if result.get('variables'): - feedback.append("📋 新生成的变量:") + feedback.append("[LIST] 新生成的变量:") for var_name, var_info in result['variables'].items(): feedback.append(f" - {var_name}: {var_info}") else: - feedback.append("❌ 代码执行失败") + feedback.append("[ERROR] 代码执行失败") feedback.append(f"错误信息: {result['error']}") if result['output']: feedback.append(f"部分输出: {result['output']}") diff --git a/utils/llm_helper.py b/utils/llm_helper.py index 34b50e0..70868e2 100644 --- a/utils/llm_helper.py +++ b/utils/llm_helper.py @@ -117,7 +117,7 @@ class LLMHelper: if use_cache and app_config.llm_cache_enabled: cached_response = llm_cache.get(cache_key) if cached_response: - print("💾 使用LLM缓存响应") + print("[CACHE] 使用LLM缓存响应") return cached_response # 调用LLM diff --git a/utils/script_generator.py b/utils/script_generator.py new file mode 100644 index 0000000..f6ebee4 --- /dev/null +++ b/utils/script_generator.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +""" +可复用脚本生成器 + +从分析会话的执行历史中提取成功执行的代码, +合并去重后生成可独立运行的 .py 脚本文件。 +""" + +import os +import re +from datetime import datetime +from typing import List, Dict, Any, Set + + +def extract_imports(code: str) -> Set[str]: + """从代码中提取所有 import 语句""" + imports = set() + lines = code.split('\n') + for line in lines: + stripped = line.strip() + if stripped.startswith('import ') or stripped.startswith('from '): + # 标准化 import 语句 + imports.add(stripped) + return imports + + +def remove_imports(code: str) -> str: + """从代码中移除所有 import 语句""" + lines = code.split('\n') + result_lines = [] + for line in lines: + stripped = line.strip() + if not stripped.startswith('import ') and not stripped.startswith('from '): + result_lines.append(line) + return '\n'.join(result_lines) + + +def clean_code_block(code: str) -> str: + """清理代码块,移除不必要的内容""" + # 移除可能的重复配置代码 + patterns_to_skip = [ + r"plt\.rcParams\['font\.sans-serif'\]", # 字体配置在模板中统一处理 + r"plt\.rcParams\['axes\.unicode_minus'\]", + ] + + lines = code.split('\n') + result_lines = [] + skip_until_empty = False + + for line in lines: + stripped = line.strip() + + # 跳过空行连续的情况 + if not stripped: + if skip_until_empty: + skip_until_empty = False + continue + result_lines.append(line) + continue + + # 检查是否需要跳过的模式 + should_skip = False + for pattern in patterns_to_skip: + if re.search(pattern, stripped): + should_skip = True + break + + if not should_skip: + result_lines.append(line) + + return '\n'.join(result_lines) + + +def generate_reusable_script( + analysis_results: List[Dict[str, Any]], + data_files: List[str], + session_output_dir: str, + user_requirement: str = "" +) -> str: + """ + 从分析结果中生成可复用的 Python 脚本 + + Args: + analysis_results: 分析过程中记录的结果列表,每个元素包含 'code', 'result' 等 + data_files: 原始数据文件路径列表 + session_output_dir: 会话输出目录 + user_requirement: 用户的原始需求描述 + + Returns: + 生成的脚本文件路径 + """ + # 收集所有成功执行的代码 + all_imports = set() + code_blocks = [] + + for result in analysis_results: + # 只处理 generate_code 类型的结果 + if result.get("action") == "collect_figures": + continue + + code = result.get("code", "") + exec_result = result.get("result", {}) + + # 只收集成功执行的代码 + if code and exec_result.get("success", False): + # 提取 imports + imports = extract_imports(code) + all_imports.update(imports) + + # 清理代码块 + cleaned_code = remove_imports(code) + cleaned_code = clean_code_block(cleaned_code) + + # 只添加非空的代码块 + if cleaned_code.strip(): + code_blocks.append({ + "round": result.get("round", 0), + "code": cleaned_code.strip() + }) + + if not code_blocks: + print("[WARN] 没有成功执行的代码块,跳过脚本生成") + return "" + + # 生成脚本内容 + now = datetime.now() + timestamp = now.strftime("%Y%m%d_%H%M%S") + + # 构建脚本头部 + script_header = f'''#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +数据分析脚本 - 自动生成 +===================================== +原始数据文件: {', '.join(data_files)} +生成时间: {now.strftime("%Y-%m-%d %H:%M:%S")} +原始需求: {user_requirement[:200] + '...' if len(user_requirement) > 200 else user_requirement} +===================================== + +使用方法: +1. 修改下方 DATA_FILES 列表中的文件路径 +2. 修改 OUTPUT_DIR 指定输出目录 +3. 运行: python {os.path.basename(session_output_dir)}_分析脚本.py +""" + +import os +''' + + # 添加标准 imports(去重后排序) + standard_imports = sorted([imp for imp in all_imports if imp.startswith('import ')]) + from_imports = sorted([imp for imp in all_imports if imp.startswith('from ')]) + + imports_section = '\n'.join(standard_imports + from_imports) + + # 配置区域 + config_section = f''' +# ========== 配置区域 (可修改) ========== + +# 数据文件路径 - 修改此处以分析不同的数据 +DATA_FILES = {repr(data_files)} + +# 输出目录 - 图片和报告将保存在此目录 +OUTPUT_DIR = "./analysis_output" + +# 创建输出目录 +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# ========== 字体配置 (中文显示) ========== +import platform +import matplotlib.pyplot as plt + +system_name = platform.system() +if system_name == 'Darwin': + plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'PingFang SC', 'sans-serif'] +elif system_name == 'Windows': + plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'sans-serif'] +else: + plt.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei', 'sans-serif'] +plt.rcParams['axes.unicode_minus'] = False + +# 设置 session_output_dir 变量(兼容原始代码) +session_output_dir = OUTPUT_DIR +''' + + # 合并代码块 + code_section = "\n# ========== 分析代码 ==========\n\n" + + for i, block in enumerate(code_blocks, 1): + code_section += f"# --- 第 {block['round']} 轮分析 ---\n" + code_section += block['code'] + "\n\n" + + # 脚本尾部 + script_footer = ''' +# ========== 完成 ========== +print("\\n" + "=" * 50) +print("[OK] 分析完成!") +print(f"[OUTPUT] 输出目录: {os.path.abspath(OUTPUT_DIR)}") +print("=" * 50) +''' + + # 组装完整脚本 + full_script = script_header + imports_section + config_section + code_section + script_footer + + # 保存脚本文件 + script_filename = f"分析脚本_{timestamp}.py" + script_path = os.path.join(session_output_dir, script_filename) + + try: + with open(script_path, 'w', encoding='utf-8') as f: + f.write(full_script) + print(f"[OK] 可复用脚本已生成: {script_path}") + return script_path + except Exception as e: + print(f"[ERROR] 保存脚本失败: {e}") + return ""