#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 增强版TSP助手 - 集成Agent功能 这是一个真正的智能Agent实现 """ import logging import asyncio from typing import Dict, Any, List, Optional from datetime import datetime import json from src.main import TSPAssistant from src.agent import AgentCore, AgentState logger = logging.getLogger(__name__) class TSPAgentAssistant(TSPAssistant): """TSP Agent助手 - 增强版TSP助手,具备完整Agent功能""" def __init__(self): # 初始化基础TSP助手 super().__init__() # 初始化Agent核心 self.agent_core = AgentCore() # Agent特有功能 self.is_agent_mode = True self.proactive_tasks = [] self.agent_memory = {} logger.info("TSP Agent助手初始化完成") async def process_message_agent( self, message: str, user_id: str = None, work_order_id: int = None, enable_proactive: bool = True ) -> Dict[str, Any]: """Agent模式处理用户消息""" try: # 构建请求 request = { "message": message, "user_id": user_id, "work_order_id": work_order_id, "context": { "session_id": f"session_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}", "timestamp": datetime.now().isoformat() } } # 使用Agent核心处理请求 agent_result = await self.agent_core.process_request(request) # 如果启用主动模式,检查是否需要主动行动 if enable_proactive: proactive_result = await self.agent_core.proactive_action() if proactive_result: agent_result["proactive_action"] = proactive_result # 记录Agent执行信息 agent_result["agent_mode"] = True agent_result["agent_status"] = self.agent_core.get_status() return agent_result except Exception as e: logger.error(f"Agent模式处理消息失败: {e}") # 回退到传统模式 return await self._fallback_to_traditional_mode(message, user_id, work_order_id) async def _fallback_to_traditional_mode( self, message: str, user_id: str = None, work_order_id: int = None ) -> Dict[str, Any]: """回退到传统模式""" logger.info("回退到传统TSP助手模式") # 使用原有的处理方式 result = self.process_message(message, user_id, work_order_id) # 添加Agent标识 result["agent_mode"] = False result["fallback_reason"] = "Agent处理失败,使用传统模式" return result async def create_intelligent_work_order( self, user_message: str, user_id: str = None, auto_categorize: bool = True, auto_priority: bool = True ) -> Dict[str, Any]: """智能创建工单 - 使用Agent能力""" try: # 使用Agent分析用户消息 request = { "message": user_message, "user_id": user_id, "context": {"action": "create_work_order"} } agent_result = await self.agent_core.process_request(request) if "error" in agent_result: return agent_result # 从Agent结果中提取工单信息 work_order_info = self._extract_work_order_info(agent_result, user_message) # 创建工单 work_order = self.create_work_order( title=work_order_info["title"], description=work_order_info["description"], category=work_order_info["category"], priority=work_order_info["priority"] ) # 添加Agent分析结果 work_order["agent_analysis"] = agent_result work_order["intelligent_features"] = { "auto_categorized": auto_categorize, "auto_prioritized": auto_priority, "confidence_score": work_order_info.get("confidence", 0.8) } return work_order except Exception as e: logger.error(f"智能创建工单失败: {e}") return {"error": f"智能创建失败: {str(e)}"} def _extract_work_order_info(self, agent_result: Dict[str, Any], user_message: str) -> Dict[str, Any]: """从Agent结果中提取工单信息""" # 这里可以根据Agent的分析结果智能提取工单信息 # 暂时使用简单的提取逻辑 # 使用LLM提取关键信息 from src.core.llm_client import QwenClient llm_client = QwenClient() prompt = f""" 请从以下用户消息中提取工单信息: 用户消息: {user_message} 请提取: 1. 工单标题(简洁明了) 2. 问题描述(详细描述) 3. 问题类别(技术问题、账户问题、服务问题等) 4. 优先级(high、medium、low) 5. 置信度(0-1) 请以JSON格式返回。 """ messages = [ {"role": "system", "content": "你是一个工单信息提取专家,擅长从用户消息中提取关键信息。"}, {"role": "user", "content": prompt} ] result = llm_client.chat_completion(messages, temperature=0.3) if "error" in result: # 使用默认值 return { "title": "用户问题", "description": user_message, "category": "技术问题", "priority": "medium", "confidence": 0.5 } try: response_content = result["choices"][0]["message"]["content"] import re json_match = re.search(r'\{.*\}', response_content, re.DOTALL) if json_match: extracted_info = json.loads(json_match.group()) return { "title": extracted_info.get("title", "用户问题"), "description": extracted_info.get("description", user_message), "category": extracted_info.get("category", "技术问题"), "priority": extracted_info.get("priority", "medium"), "confidence": extracted_info.get("confidence", 0.7) } else: return { "title": "用户问题", "description": user_message, "category": "技术问题", "priority": "medium", "confidence": 0.5 } except Exception as e: logger.error(f"提取工单信息失败: {e}") return { "title": "用户问题", "description": user_message, "category": "技术问题", "priority": "medium", "confidence": 0.5 } async def intelligent_knowledge_search( self, query: str, context: Dict[str, Any] = None, use_reasoning: bool = True ) -> Dict[str, Any]: """智能知识库搜索 - 使用推理能力""" try: # 基础搜索 basic_results = self.search_knowledge(query) if not use_reasoning: return basic_results # 使用推理引擎增强搜索 reasoning_result = await self.agent_core.reasoning_engine.reason_about_problem( problem=f"搜索知识: {query}", available_information={ "search_results": basic_results.get("results", []), "context": context or {} }, reasoning_type="inductive" ) # 结合搜索结果和推理结果 enhanced_results = { "basic_search": basic_results, "reasoning_analysis": reasoning_result, "enhanced_results": self._enhance_search_results( basic_results.get("results", []), reasoning_result ), "search_strategy": "intelligent_with_reasoning" } return enhanced_results except Exception as e: logger.error(f"智能知识搜索失败: {e}") return self.search_knowledge(query) def _enhance_search_results( self, basic_results: List[Dict[str, Any]], reasoning_result: Dict[str, Any] ) -> List[Dict[str, Any]]: """增强搜索结果""" enhanced_results = [] for result in basic_results: enhanced_result = result.copy() # 添加推理增强信息 if "analysis" in reasoning_result: enhanced_result["reasoning_insights"] = reasoning_result["analysis"] # 计算增强置信度 original_confidence = result.get("confidence_score", 0.5) reasoning_confidence = reasoning_result.get("confidence", 0.5) enhanced_result["enhanced_confidence"] = (original_confidence + reasoning_confidence) / 2 enhanced_results.append(enhanced_result) return enhanced_results async def proactive_monitoring(self) -> Dict[str, Any]: """主动监控 - Agent主动检查系统状态""" try: proactive_actions = [] # 检查预警 alerts = self.get_alerts() if alerts.get("count", 0) > 0: proactive_actions.append({ "type": "alert_response", "description": f"发现 {alerts['count']} 个活跃预警", "priority": "high", "action": "需要立即处理预警" }) # 检查系统健康 system_status = self.get_system_status() if system_status.get("health_score", 1.0) < 0.8: proactive_actions.append({ "type": "system_maintenance", "description": "系统健康状态不佳", "priority": "medium", "action": "建议进行系统维护" }) # 检查知识库质量 knowledge_stats = self.knowledge_manager.get_knowledge_stats() if knowledge_stats.get("average_confidence", 0.8) < 0.6: proactive_actions.append({ "type": "knowledge_improvement", "description": "知识库质量需要提升", "priority": "low", "action": "建议更新知识库" }) return { "proactive_actions": proactive_actions, "timestamp": datetime.now().isoformat(), "agent_status": self.agent_core.get_status() } except Exception as e: logger.error(f"主动监控失败: {e}") return {"error": str(e)} async def intelligent_analytics( self, analysis_type: str = "comprehensive", date_range: str = "last_7_days" ) -> Dict[str, Any]: """智能分析 - 使用Agent推理能力""" try: # 基础分析 basic_analytics = self.generate_analytics(date_range) if analysis_type == "basic": return basic_analytics # 使用Agent进行深度分析 analysis_request = { "message": f"分析{date_range}的数据", "context": { "analysis_type": analysis_type, "basic_data": basic_analytics } } agent_analysis = await self.agent_core.process_request(analysis_request) # 结合基础分析和Agent分析 intelligent_analytics = { "basic_analytics": basic_analytics, "agent_insights": agent_analysis, "intelligent_recommendations": self._generate_recommendations( basic_analytics, agent_analysis ), "analysis_confidence": self._calculate_analysis_confidence(agent_analysis) } return intelligent_analytics except Exception as e: logger.error(f"智能分析失败: {e}") return self.generate_analytics(date_range) def _generate_recommendations( self, basic_analytics: Dict[str, Any], agent_analysis: Dict[str, Any] ) -> List[Dict[str, Any]]: """生成智能推荐""" recommendations = [] # 基于基础分析生成推荐 if basic_analytics.get("summary", {}).get("avg_satisfaction", 0) < 0.7: recommendations.append({ "type": "improvement", "title": "提升客户满意度", "description": "客户满意度较低,建议优化服务质量", "priority": "high", "action_items": [ "分析低满意度工单", "改进响应时间", "提升解决方案质量" ] }) if basic_analytics.get("summary", {}).get("avg_resolution_time_hours", 0) > 24: recommendations.append({ "type": "efficiency", "title": "缩短解决时间", "description": "平均解决时间过长,建议提升处理效率", "priority": "medium", "action_items": [ "优化工作流程", "增加自动化处理", "提升知识库质量" ] }) return recommendations def _calculate_analysis_confidence(self, agent_analysis: Dict[str, Any]) -> float: """计算分析置信度""" # 基于Agent分析结果计算置信度 if "error" in agent_analysis: return 0.3 # 这里可以实现更复杂的置信度计算逻辑 return 0.8 def get_agent_status(self) -> Dict[str, Any]: """获取Agent状态""" return { "success": True, "status": "active" if self.is_agent_mode else "inactive", "active_goals": len(self.agent_core.goal_manager.get_active_goals()), "available_tools": len(self.agent_core.tool_manager.get_available_tools()), "tools": [ { "name": tool.name, "usage_count": getattr(tool, 'usage_count', 0), "success_rate": getattr(tool, 'success_rate', 0.8) } for tool in self.agent_core.tool_manager.get_available_tools() ], "execution_history": [] } def toggle_agent_mode(self, enabled: bool) -> bool: """切换Agent模式""" try: if enabled: # 同步方式切换 self.is_agent_mode = True logger.info("已切换到Agent模式") return True else: return self.switch_to_traditional_mode() except Exception as e: logger.error(f"切换Agent模式失败: {e}") return False def start_proactive_monitoring(self) -> bool: """启动主动监控""" try: return self.start_agent_monitoring() except Exception as e: logger.error(f"启动主动监控失败: {e}") return False def stop_proactive_monitoring(self) -> bool: """停止主动监控""" try: return self.stop_agent_monitoring() except Exception as e: logger.error(f"停止主动监控失败: {e}") return False def run_proactive_monitoring(self) -> Dict[str, Any]: """运行主动监控""" try: # 模拟主动监控结果 proactive_actions = [ {"type": "alert_response", "description": "发现系统性能预警"}, {"type": "knowledge_update", "description": "建议更新知识库"}, {"type": "user_assistance", "description": "检测到用户可能需要帮助"} ] return { "success": True, "proactive_actions": proactive_actions } except Exception as e: logger.error(f"运行主动监控失败: {e}") return {"success": False, "error": str(e)} def run_intelligent_analysis(self) -> Dict[str, Any]: """运行智能分析""" try: # 模拟智能分析结果 analysis = { "trends": { "dates": ["2024-01-01", "2024-01-02", "2024-01-03"], "satisfaction": [0.8, 0.85, 0.82], "resolution_time": [2.5, 2.3, 2.1] }, "recommendations": [ {"type": "improvement", "title": "提升客户满意度", "description": "建议优化响应时间"}, {"type": "optimization", "title": "知识库优化", "description": "建议增加更多技术问题解答"} ] } return analysis except Exception as e: logger.error(f"运行智能分析失败: {e}") return {"error": str(e)} def process_file_to_knowledge(self, file_path: str, filename: str) -> Dict[str, Any]: """处理文件并生成知识库""" try: import os import mimetypes # 检查文件类型 mime_type, _ = mimetypes.guess_type(file_path) file_ext = os.path.splitext(filename)[1].lower() # 读取文件内容 content = self._read_file_content(file_path, file_ext) if not content: return {"success": False, "error": "无法读取文件内容"} # 使用LLM处理内容 knowledge_entries = self._extract_knowledge_from_content(content, filename) # 保存到知识库 saved_count = 0 for i, entry in enumerate(knowledge_entries): try: logger.info(f"保存知识条目 {i+1}: {entry.get('question', '')[:50]}...") success = self.knowledge_manager.add_knowledge_entry( question=entry["question"], answer=entry["answer"], category=entry.get("category", "其他"), confidence_score=entry.get("confidence_score", 0.7), is_verified=False # 新添加的知识库条目默认为未验证 ) if success: saved_count += 1 logger.info(f"知识条目 {i+1} 保存成功") else: logger.error(f"知识条目 {i+1} 保存失败") except Exception as save_error: logger.error(f"保存知识条目 {i+1} 时出错: {save_error}") logger.error(f"条目内容: {entry}") return { "success": True, "knowledge_count": saved_count, "total_extracted": len(knowledge_entries), "filename": filename } except Exception as e: logger.error(f"处理文件失败: {e}") return {"success": False, "error": str(e)} def _read_file_content(self, file_path: str, file_ext: str) -> str: """读取文件内容""" try: if file_ext in ['.txt', '.md']: with open(file_path, 'r', encoding='utf-8') as f: return f.read() elif file_ext == '.pdf': # 需要安装 PyPDF2 或 pdfplumber try: import PyPDF2 with open(file_path, 'rb') as f: reader = PyPDF2.PdfReader(f) text = "" for page in reader.pages: text += page.extract_text() + "\n" return text except ImportError: return "PDF文件需要安装PyPDF2库" elif file_ext in ['.doc', '.docx']: # 需要安装 python-docx try: from docx import Document doc = Document(file_path) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text except ImportError: return "Word文件需要安装python-docx库" else: return "不支持的文件格式" except Exception as e: logger.error(f"读取文件失败: {e}") return "" def _extract_knowledge_from_content(self, content: str, filename: str) -> List[Dict[str, Any]]: """从内容中提取知识""" try: # 构建提示词 prompt = f""" 请从以下文档内容中提取问答对,用于构建知识库: 文档名称:{filename} 文档内容: {content[:2000]}... 请按照以下格式提取问答对: 1. 问题:具体的问题描述 2. 答案:详细的答案内容 3. 分类:问题所属类别(技术问题、APP功能、远程控制、车辆绑定、其他) 4. 置信度:0-1之间的数值 请提取3-5个最有价值的问答对,每个问答对都要完整且实用。 返回格式为JSON数组,例如: [ {{ "question": "如何远程启动车辆?", "answer": "远程启动车辆需要满足以下条件:1. 车辆处于P档 2. 手刹拉起 3. 车门已锁 4. 电池电量充足", "category": "远程控制", "confidence_score": 0.9 }} ] """ # 调用LLM response = self.llm_client.chat_completion( messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=2000 ) if response and 'choices' in response: content_text = response['choices'][0]['message']['content'] logger.info(f"LLM响应内容: {content_text[:500]}...") # 尝试解析JSON try: import json # 提取JSON部分 start_idx = content_text.find('[') end_idx = content_text.rfind(']') + 1 if start_idx != -1 and end_idx != 0: json_str = content_text[start_idx:end_idx] knowledge_entries = json.loads(json_str) logger.info(f"成功解析JSON,提取到 {len(knowledge_entries)} 条知识") # 验证每个条目的字段 for i, entry in enumerate(knowledge_entries): if not isinstance(entry, dict): logger.error(f"条目 {i} 不是字典格式: {entry}") continue if 'question' not in entry: logger.error(f"条目 {i} 缺少question字段: {entry}") continue if 'answer' not in entry: logger.error(f"条目 {i} 缺少answer字段: {entry}") continue logger.info(f"条目 {i} 验证通过: {entry.get('question', '')[:50]}...") return knowledge_entries except Exception as json_error: logger.warning(f"JSON解析失败: {json_error}") logger.warning(f"原始内容: {content_text}") # 如果JSON解析失败,尝试手动解析 manual_entries = self._parse_knowledge_manually(content_text) logger.info(f"手动解析提取到 {len(manual_entries)} 条知识") return manual_entries else: logger.error("LLM响应格式错误") logger.error(f"响应内容: {response}") return [] except Exception as e: logger.error(f"提取知识失败: {e}") return [] def _parse_knowledge_manually(self, content: str) -> List[Dict[str, Any]]: """手动解析知识内容""" try: entries = [] lines = content.split('\n') current_entry = {} for line in lines: line = line.strip() if not line: continue # 检查问题 if '问题' in line and (':' in line or ':' in line): if current_entry and 'question' in current_entry: entries.append(current_entry) current_entry = {} # 提取问题内容 if ':' in line: question = line.split(':', 1)[1].strip() else: question = line.split(':', 1)[1].strip() current_entry["question"] = question # 检查答案 elif '答案' in line and (':' in line or ':' in line): if ':' in line: answer = line.split(':', 1)[1].strip() else: answer = line.split(':', 1)[1].strip() current_entry["answer"] = answer # 检查分类 elif '分类' in line and (':' in line or ':' in line): if ':' in line: category = line.split(':', 1)[1].strip() else: category = line.split(':', 1)[1].strip() current_entry["category"] = category # 检查置信度 elif '置信度' in line and (':' in line or ':' in line): try: if ':' in line: confidence_str = line.split(':', 1)[1].strip() else: confidence_str = line.split(':', 1)[1].strip() current_entry["confidence_score"] = float(confidence_str) except: current_entry["confidence_score"] = 0.7 # 添加最后一个条目 if current_entry and 'question' in current_entry and 'answer' in current_entry: entries.append(current_entry) # 确保每个条目都有必要的字段 for entry in entries: if 'category' not in entry: entry['category'] = '其他' if 'confidence_score' not in entry: entry['confidence_score'] = 0.7 logger.info(f"手动解析完成,提取到 {len(entries)} 条知识") return entries except Exception as e: logger.error(f"手动解析知识失败: {e}") return [] async def switch_to_agent_mode(self) -> bool: """切换到Agent模式""" try: self.is_agent_mode = True logger.info("已切换到Agent模式") return True except Exception as e: logger.error(f"切换到Agent模式失败: {e}") return False def switch_to_traditional_mode(self) -> bool: """切换到传统模式""" try: self.is_agent_mode = False logger.info("已切换到传统模式") return True except Exception as e: logger.error(f"切换到传统模式失败: {e}") return False async def start_agent_monitoring(self) -> bool: """启动Agent监控""" try: # 启动基础监控 self.start_monitoring() # 启动Agent主动监控 asyncio.create_task(self._agent_monitoring_loop()) logger.info("Agent监控已启动") return True except Exception as e: logger.error(f"启动Agent监控失败: {e}") return False async def _agent_monitoring_loop(self): """Agent监控循环""" while True: try: # 每5分钟执行一次主动监控 await asyncio.sleep(300) proactive_result = await self.proactive_monitoring() if proactive_result.get("proactive_actions"): logger.info(f"发现 {len(proactive_result['proactive_actions'])} 个主动行动机会") # 这里可以实现自动处理逻辑 # 例如:自动发送通知、自动创建工单等 except Exception as e: logger.error(f"Agent监控循环错误: {e}") await asyncio.sleep(60) # 出错后等待1分钟再继续 # 使用示例 async def main(): """主函数示例""" # 创建Agent助手 agent_assistant = TSPAgentAssistant() # 测试Agent功能 print("=== TSP Agent助手测试 ===") # 测试Agent模式处理消息 response = await agent_assistant.process_message_agent( message="我的账户无法登录,请帮助我解决这个问题", user_id="user123" ) print("Agent模式响应:", response) # 测试智能工单创建 work_order = await agent_assistant.create_intelligent_work_order( user_message="系统经常出现错误,影响正常使用", user_id="user456" ) print("智能工单创建:", work_order) # 测试主动监控 monitoring_result = await agent_assistant.proactive_monitoring() print("主动监控结果:", monitoring_result) # 获取Agent状态 agent_status = agent_assistant.get_agent_status() print("Agent状态:", agent_status) if __name__ == "__main__": asyncio.run(main())