# -*- coding: utf-8 -*- """ ReAct Agent - 基于 ReAct 模式的智能代理 用单次 LLM 调用 + 工具循环替代原有的多步流水线 """ import logging import json import re from typing import Dict, Any, List, Optional from datetime import datetime from src.agent.llm_client import LLMManager from src.config.unified_config import get_config logger = logging.getLogger(__name__) # ── 工具定义(供 LLM 理解可用能力) ────────────────────────── TOOL_DEFINITIONS = [ { "name": "search_knowledge", "description": "搜索知识库,根据关键词查找相关的问题和答案", "parameters": { "query": {"type": "string", "description": "搜索关键词", "required": True}, "top_k": {"type": "integer", "description": "返回结果数量,默认3", "required": False} } }, { "name": "add_knowledge", "description": "向知识库添加新的问答条目", "parameters": { "question": {"type": "string", "description": "问题", "required": True}, "answer": {"type": "string", "description": "答案", "required": True}, "category": {"type": "string", "description": "分类", "required": False} } }, { "name": "query_vehicle", "description": "查询车辆信息,支持按VIN码或车牌号查询", "parameters": { "vin": {"type": "string", "description": "VIN码", "required": False}, "plate_number": {"type": "string", "description": "车牌号", "required": False} } }, { "name": "get_analytics", "description": "获取系统数据分析报告,如每日统计、分类统计等", "parameters": { "report_type": { "type": "string", "description": "报告类型: daily_analytics / summary / category_performance", "required": True } } }, { "name": "send_feishu_message", "description": "通过飞书发送消息通知", "parameters": { "message": {"type": "string", "description": "消息内容", "required": True}, "chat_id": {"type": "string", "description": "飞书群聊ID(可选)", "required": False} } }, ] def _build_tools_prompt() -> str: """构建工具描述文本供 system prompt 使用""" lines = [] for t in TOOL_DEFINITIONS: params_desc = [] for pname, pinfo in t["parameters"].items(): req = "必填" if pinfo.get("required") else "可选" params_desc.append(f" - {pname} ({pinfo['type']}, {req}): {pinfo['description']}") lines.append(f"- {t['name']}: {t['description']}\n 参数:\n" + "\n".join(params_desc)) return "\n".join(lines) SYSTEM_PROMPT = f"""你是 TSP 智能客服助手,帮助用户解决车辆售后问题、查询知识库、管理客诉信息。 你可以使用以下工具来完成任务: {_build_tools_prompt()} ## 回复规则 1. 如果你需要使用工具,请严格按以下 JSON 格式回复(不要包含其他内容): ```json {{"tool": "工具名", "parameters": {{"参数名": "参数值"}}}} ``` 2. 如果你不需要使用工具,可以直接用自然语言回复用户。 3. 每次只调用一个工具。 4. 根据工具返回的结果,综合生成最终回复。 5. 回复要简洁专业,使用中文。 """ class ReactAgent: """基于 ReAct 模式的 Agent""" MAX_TOOL_ROUNDS = 5 # 最多工具调用轮次,防止死循环 def __init__(self): config = get_config() self.llm = LLMManager(config.llm) self._tool_handlers = self._register_tool_handlers() self.execution_history: List[Dict[str, Any]] = [] logger.info("ReactAgent 初始化完成") # ── 工具处理器注册 ────────────────────────────────────── def _register_tool_handlers(self) -> Dict[str, Any]: return { "search_knowledge": self._tool_search_knowledge, "add_knowledge": self._tool_add_knowledge, "query_vehicle": self._tool_query_vehicle, "get_analytics": self._tool_get_analytics, "send_feishu_message": self._tool_send_feishu_message, } # ── 主处理入口 ────────────────────────────────────────── async def chat( self, message: str, user_id: str = "anonymous", conversation_history: Optional[List[Dict[str, str]]] = None, ) -> Dict[str, Any]: """处理用户消息,返回最终回复""" messages = [{"role": "system", "content": SYSTEM_PROMPT}] # 加入历史对话(最近 10 轮) if conversation_history: messages.extend(conversation_history[-10:]) messages.append({"role": "user", "content": message}) tool_calls_log = [] for round_idx in range(self.MAX_TOOL_ROUNDS): # 调用 LLM try: response_text = await self.llm.chat(messages, temperature=0.3, max_tokens=2000) except Exception as e: logger.error(f"LLM 调用失败: {e}") return self._error_response(str(e)) # 尝试解析工具调用 tool_call = self._parse_tool_call(response_text) if tool_call is None: # 没有工具调用 → 这是最终回复 self._record_execution(message, user_id, tool_calls_log, response_text) return { "success": True, "response": response_text, "tool_calls": tool_calls_log, "rounds": round_idx + 1, } # 执行工具 tool_name = tool_call["tool"] tool_params = tool_call.get("parameters", {}) logger.info(f"[Round {round_idx+1}] 调用工具: {tool_name}, 参数: {tool_params}") tool_result = await self._execute_tool(tool_name, tool_params) tool_calls_log.append({ "tool": tool_name, "parameters": tool_params, "result": tool_result, "round": round_idx + 1, }) # 把工具调用和结果加入对话上下文 messages.append({"role": "assistant", "content": response_text}) messages.append({ "role": "user", "content": f"工具 `{tool_name}` 返回结果:\n```json\n{json.dumps(tool_result, ensure_ascii=False, default=str)}\n```\n请根据以上结果回复用户。" }) # 超过最大轮次 self._record_execution(message, user_id, tool_calls_log, "[达到最大工具调用轮次]") return { "success": True, "response": "抱歉,处理过程较复杂,请稍后重试或换个方式描述您的问题。", "tool_calls": tool_calls_log, "rounds": self.MAX_TOOL_ROUNDS, } # ── 工具调用解析 ────────────────────────────────────── def _parse_tool_call(self, text: str) -> Optional[Dict[str, Any]]: """从 LLM 回复中解析工具调用 JSON""" if not text: return None # 尝试从 ```json ... ``` 代码块中提取 code_block = re.search(r'```json\s*(\{.*?\})\s*```', text, re.DOTALL) if code_block: try: data = json.loads(code_block.group(1)) if "tool" in data: return data except json.JSONDecodeError: pass # 尝试直接解析整段文本为 JSON try: data = json.loads(text.strip()) if isinstance(data, dict) and "tool" in data: return data except json.JSONDecodeError: pass # 尝试从文本中找到第一个 JSON 对象 json_match = re.search(r'\{[^{}]*"tool"\s*:\s*"[^"]+?"[^{}]*\}', text, re.DOTALL) if json_match: try: data = json.loads(json_match.group()) if "tool" in data: return data except json.JSONDecodeError: pass return None # ── 工具执行 ────────────────────────────────────────── async def _execute_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]: """执行指定工具""" handler = self._tool_handlers.get(tool_name) if not handler: return {"error": f"未知工具: {tool_name}"} try: return await handler(**params) except Exception as e: logger.error(f"工具 {tool_name} 执行失败: {e}") return {"error": str(e)} # ── 具体工具实现 ────────────────────────────────────── async def _tool_search_knowledge(self, query: str, top_k: int = 3, **kw) -> Dict[str, Any]: """搜索知识库""" try: from src.knowledge_base.knowledge_manager import KnowledgeManager km = KnowledgeManager() results = km.search_knowledge(query, top_k) return {"results": results, "count": len(results)} except Exception as e: return {"error": str(e)} async def _tool_add_knowledge(self, question: str, answer: str, category: str = "通用", **kw) -> Dict[str, Any]: """添加知识库条目""" try: from src.knowledge_base.knowledge_manager import KnowledgeManager km = KnowledgeManager() success = km.add_knowledge_entry(question=question, answer=answer, category=category) return {"success": success} except Exception as e: return {"error": str(e)} async def _tool_query_vehicle(self, vin: str = None, plate_number: str = None, **kw) -> Dict[str, Any]: """查询车辆信息""" try: from src.vehicle.vehicle_data_manager import VehicleDataManager vm = VehicleDataManager() if vin: result = vm.get_latest_vehicle_data_by_vin(vin) return {"vehicle_data": result} if result else {"error": "未找到该VIN的车辆数据"} elif plate_number: return {"error": "暂不支持按车牌号查询,请使用VIN码"} else: return {"error": "请提供 VIN 码"} except Exception as e: return {"error": str(e)} async def _tool_get_analytics(self, report_type: str = "summary", **kw) -> Dict[str, Any]: """获取分析报告""" try: from src.analytics.analytics_manager import AnalyticsManager am = AnalyticsManager() if report_type == "daily_analytics": return am.generate_daily_analytics() elif report_type == "summary": return am.get_analytics_summary() elif report_type == "category_performance": return am.get_category_performance() else: return {"error": f"不支持的报告类型: {report_type}"} except Exception as e: return {"error": str(e)} async def _tool_send_feishu_message(self, message: str, chat_id: str = None, **kw) -> Dict[str, Any]: """发送飞书消息""" try: from src.integrations.feishu_service import FeishuService fs = FeishuService() if not chat_id: return {"error": "请提供飞书群聊 chat_id"} success = fs.send_message(receive_id=chat_id, content=message, receive_id_type="chat_id") return {"success": success} except Exception as e: return {"error": str(e)} # ── 辅助方法 ────────────────────────────────────────── def _record_execution(self, message: str, user_id: str, tool_calls: list, response: str): """记录执行历史""" record = { "timestamp": datetime.now().isoformat(), "user_id": user_id, "message": message, "tool_calls": tool_calls, "response": response[:500], } self.execution_history.append(record) if len(self.execution_history) > 500: self.execution_history = self.execution_history[-500:] def _error_response(self, error_msg: str) -> Dict[str, Any]: return { "success": False, "response": "抱歉,系统处理出现问题,请稍后重试。", "error": error_msg, "tool_calls": [], "rounds": 0, } def get_tool_definitions(self) -> List[Dict[str, Any]]: """返回工具定义列表(供 API 展示)""" return TOOL_DEFINITIONS def get_execution_history(self, limit: int = 50) -> List[Dict[str, Any]]: """获取执行历史""" return self.execution_history[-limit:] def get_status(self) -> Dict[str, Any]: """获取 Agent 状态""" return { "status": "active", "available_tools": [t["name"] for t in TOOL_DEFINITIONS], "tool_count": len(TOOL_DEFINITIONS), "history_count": len(self.execution_history), "max_tool_rounds": self.MAX_TOOL_ROUNDS, }