From 621e546b433526e75e8206ee99a543e53c86ca6f Mon Sep 17 00:00:00 2001 From: Zhaojie Date: Wed, 7 Jan 2026 16:41:38 +0800 Subject: [PATCH] feat: Update core agent logic, code execution utilities, and LLM configuration. --- .gitignore | 2 ++ config/llm_config.py | 15 +++++++++-- data_analysis_agent.py | 6 ++++- main.py | 2 +- utils/code_executor.py | 1 + utils/extract_code.py | 16 ++++++++++++ utils/fallback_openai_client.py | 45 +++++++++++++++++++++++++-------- 7 files changed, 73 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 94f8d30..fc89384 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,8 @@ __pycache__/ # C extensions *.so + + # Distribution / packaging .Python build/ diff --git a/config/llm_config.py b/config/llm_config.py index 764c25a..ffadb9a 100644 --- a/config/llm_config.py +++ b/config/llm_config.py @@ -17,13 +17,24 @@ load_dotenv() class LLMConfig: """LLM配置""" - provider: str = "openai" # openai, anthropic, etc. - api_key: str = os.environ.get("OPENAI_API_KEY", "sk-c44i1hy64xgzwox6x08o4zug93frq6rgn84oqugf2pje1tg4") + provider: str = os.environ.get("LLM_PROVIDER", "gemini") # openai, gemini, etc. + api_key: str = os.environ.get("OPENAI_API_KEY", "sk---c44i1hy64xgzwox6x08o4zug93frq6rgn84oqugf2pje1tg4") base_url: str = os.environ.get("OPENAI_BASE_URL", "https://api.xiaomimimo.com/v1") model: str = os.environ.get("OPENAI_MODEL", "mimo-v2-flash") temperature: float = 0.5 max_tokens: int = 131072 + def __post_init__(self): + """配置初始化后的处理""" + if self.provider == "gemini": + # 如果使用 Gemini,尝试从环境变量加载 Gemini 配置,或者使用默认的 Gemini 配置 + # 注意:如果 OPENAI_API_KEY 已设置且 GEMINI_API_KEY 未设置,可能会沿用 OpenAI 的 Key, + # 但既然用户切换了 provider,通常会有配套的 Key。 + self.api_key = os.environ.get("GEMINI_API_KEY", "AIzaSyA9aVFjRJYJq82WEQUVlifE4fE7BnX6QiY") + # Gemini 的 OpenAI 兼容接口地址 + self.base_url = os.environ.get("GEMINI_BASE_URL", "https://gemini.jeason.online") + self.model = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") + def to_dict(self) -> Dict[str, Any]: """转换为字典""" return asdict(self) diff --git a/data_analysis_agent.py b/data_analysis_agent.py index 7014e94..2087845 100644 --- a/data_analysis_agent.py +++ b/data_analysis_agent.py @@ -89,7 +89,11 @@ class DataAnalysisAgent: return self._handle_generate_code(response, yaml_data) except Exception as e: - print(f"⚠️ 解析响应失败: {str(e)},按generate_code处理") + print(f"⚠️ 解析响应失败: {str(e)},尝试提取代码并按generate_code处理") + # 即使YAML解析失败,也尝试提取代码 + extracted_code = extract_code_from_response(response) + if extracted_code: + return self._handle_generate_code(response, {"code": extracted_code}) return self._handle_generate_code(response, {}) def _handle_analysis_complete( diff --git a/main.py b/main.py index 8fb67ab..7075138 100644 --- a/main.py +++ b/main.py @@ -39,7 +39,7 @@ def setup_logging(log_dir): def main(): llm_config = LLMConfig() - files = ["./UB IOV Support_TR.csv"] + files = ["./cleaned_data.csv"] analysis_requirement = """ 基于所有运维工单,整理一份工单健康度报告,包括但不限于对所有车联网技术支持工单的全面数据分析, 深入挖掘工单处理过程中的关键问题、效率瓶颈及改进机会。涵盖工单状态、问题类型、模块分布、严重程度、责任人负载、车型分布、来源渠道及处理时长等多个维度。 diff --git a/utils/code_executor.py b/utils/code_executor.py index e139485..b3d774c 100644 --- a/utils/code_executor.py +++ b/utils/code_executor.py @@ -35,6 +35,7 @@ class CodeExecutor: "duckdb", "scipy", "sklearn", + "sklearn.feature_extraction.text", "statsmodels", "plotly", "dash", diff --git a/utils/extract_code.py b/utils/extract_code.py index bd2420f..f40cedf 100644 --- a/utils/extract_code.py +++ b/utils/extract_code.py @@ -29,6 +29,22 @@ def extract_code_from_response(response: str) -> Optional[str]: end = response.find('```', start) if end != -1: return response[start:end].strip() + + # 尝试提取 code: | 形式的代码块(针对YAML格式错误但结构清晰的情况) + import re + # 匹配 code: | 后面的内容,直到遇到下一个键(next_key:)或结尾 + # 假设代码块至少缩进2个空格 + pattern = r'code:\s*\|\s*\n((?: {2,}.*\n?)+)' + match = re.search(pattern, response) + if match: + code_block = match.group(1) + # 尝试去除公共缩进 + try: + import textwrap + return textwrap.dedent(code_block).strip() + except: + return code_block.strip() + elif '```' in response: start = response.find('```') + 3 end = response.find('```', start) diff --git a/utils/fallback_openai_client.py b/utils/fallback_openai_client.py index 2101f22..0caed5a 100644 --- a/utils/fallback_openai_client.py +++ b/utils/fallback_openai_client.py @@ -97,23 +97,48 @@ class AsyncFallbackOpenAIClient: print(f"❌ {api_name} API 在达到最大重试次数后仍然失败。") except APIStatusError as e: # API 返回的特定状态码错误 is_content_filter_error = False - if e.status_code == 400: - try: - error_json = e.response.json() - error_details = error_json.get("error", {}) - if (error_details.get("code") == self.content_filter_error_code and - self.content_filter_error_field in error_json): - is_content_filter_error = True - except Exception: - pass # 解析错误响应失败,不认为是内容过滤错误 + retry_after = None + + # 尝试解析错误详情以获取更多信息(如 Google RPC RetryInfo) + try: + error_json = e.response.json() + error_details = error_json.get("error", {}) + + # 检查内容过滤错误(针对特定服务商) + if (error_details.get("code") == self.content_filter_error_code and + self.content_filter_error_field in error_json): + is_content_filter_error = True + + # 检查 Google RPC RetryInfo + # 格式示例: {'error': {'details': [{'@type': 'type.googleapis.com/google.rpc.RetryInfo', 'retryDelay': '38s'}]}} + if "details" in error_details: + for detail in error_details["details"]: + if detail.get("@type") == "type.googleapis.com/google.rpc.RetryInfo": + delay_str = detail.get("retryDelay", "") + if delay_str.endswith("s"): + try: + retry_after = float(delay_str[:-1]) + print(f"⏳ 收到服务器 RetryInfo,等待时间: {retry_after}秒") + except ValueError: + pass + except Exception: + pass # 解析错误响应失败,忽略 if is_content_filter_error and api_name == "主": # 如果是主 API 的内容过滤错误,则直接抛出以便回退 raise e last_exception = e print(f"⚠️ {api_name} API 调用时发生 APIStatusError ({e.status_code}): {e}. 尝试次数 {attempt + 1}/{max_retries + 1}") + if attempt < max_retries: - await asyncio.sleep(self.retry_delay_seconds * (attempt + 1)) + # 如果获取到了明确的 retry_after,则使用它;否则使用默认的指数退避 + wait_time = retry_after if retry_after is not None else (self.retry_delay_seconds * (attempt + 1)) + # 如果是 429 Too Many Requests 且没有解析出 retry_after,建议加大等待时间 + if e.status_code == 429 and retry_after is None: + wait_time = max(wait_time, 5.0 * (attempt + 1)) # 429 默认至少等 5 秒 + + print(f"💤 将等待 {wait_time:.2f} 秒后重试...") + await asyncio.sleep(wait_time) else: print(f"❌ {api_name} API 在达到最大重试次数后仍然失败 (APIStatusError)。") except APIError as e: # 其他不可轻易重试的 OpenAI 错误