Files
tsp-assistant/src/agent_assistant.py
2025-09-06 21:06:18 +08:00

817 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
增强版TSP助手 - 集成Agent功能
这是一个真正的智能Agent实现
"""
import logging
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
import json
from src.main import TSPAssistant
from src.agent import AgentCore, AgentState
logger = logging.getLogger(__name__)
class TSPAgentAssistant(TSPAssistant):
"""TSP Agent助手 - 增强版TSP助手具备完整Agent功能"""
def __init__(self):
# 初始化基础TSP助手
super().__init__()
# 初始化Agent核心
self.agent_core = AgentCore()
# Agent特有功能
self.is_agent_mode = True
self.proactive_tasks = []
self.agent_memory = {}
logger.info("TSP Agent助手初始化完成")
async def process_message_agent(
self,
message: str,
user_id: str = None,
work_order_id: int = None,
enable_proactive: bool = True
) -> Dict[str, Any]:
"""Agent模式处理用户消息"""
try:
# 构建请求
request = {
"message": message,
"user_id": user_id,
"work_order_id": work_order_id,
"context": {
"session_id": f"session_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"timestamp": datetime.now().isoformat()
}
}
# 使用Agent核心处理请求
agent_result = await self.agent_core.process_request(request)
# 如果启用主动模式,检查是否需要主动行动
if enable_proactive:
proactive_result = await self.agent_core.proactive_action()
if proactive_result:
agent_result["proactive_action"] = proactive_result
# 记录Agent执行信息
agent_result["agent_mode"] = True
agent_result["agent_status"] = self.agent_core.get_status()
return agent_result
except Exception as e:
logger.error(f"Agent模式处理消息失败: {e}")
# 回退到传统模式
return await self._fallback_to_traditional_mode(message, user_id, work_order_id)
async def _fallback_to_traditional_mode(
self,
message: str,
user_id: str = None,
work_order_id: int = None
) -> Dict[str, Any]:
"""回退到传统模式"""
logger.info("回退到传统TSP助手模式")
# 使用原有的处理方式
result = self.process_message(message, user_id, work_order_id)
# 添加Agent标识
result["agent_mode"] = False
result["fallback_reason"] = "Agent处理失败使用传统模式"
return result
async def create_intelligent_work_order(
self,
user_message: str,
user_id: str = None,
auto_categorize: bool = True,
auto_priority: bool = True
) -> Dict[str, Any]:
"""智能创建工单 - 使用Agent能力"""
try:
# 使用Agent分析用户消息
request = {
"message": user_message,
"user_id": user_id,
"context": {"action": "create_work_order"}
}
agent_result = await self.agent_core.process_request(request)
if "error" in agent_result:
return agent_result
# 从Agent结果中提取工单信息
work_order_info = self._extract_work_order_info(agent_result, user_message)
# 创建工单
work_order = self.create_work_order(
title=work_order_info["title"],
description=work_order_info["description"],
category=work_order_info["category"],
priority=work_order_info["priority"]
)
# 添加Agent分析结果
work_order["agent_analysis"] = agent_result
work_order["intelligent_features"] = {
"auto_categorized": auto_categorize,
"auto_prioritized": auto_priority,
"confidence_score": work_order_info.get("confidence", 0.8)
}
return work_order
except Exception as e:
logger.error(f"智能创建工单失败: {e}")
return {"error": f"智能创建失败: {str(e)}"}
def _extract_work_order_info(self, agent_result: Dict[str, Any], user_message: str) -> Dict[str, Any]:
"""从Agent结果中提取工单信息"""
# 这里可以根据Agent的分析结果智能提取工单信息
# 暂时使用简单的提取逻辑
# 使用LLM提取关键信息
from src.core.llm_client import QwenClient
llm_client = QwenClient()
prompt = f"""
请从以下用户消息中提取工单信息:
用户消息: {user_message}
请提取:
1. 工单标题(简洁明了)
2. 问题描述(详细描述)
3. 问题类别(技术问题、账户问题、服务问题等)
4. 优先级high、medium、low
5. 置信度0-1
请以JSON格式返回。
"""
messages = [
{"role": "system", "content": "你是一个工单信息提取专家,擅长从用户消息中提取关键信息。"},
{"role": "user", "content": prompt}
]
result = llm_client.chat_completion(messages, temperature=0.3)
if "error" in result:
# 使用默认值
return {
"title": "用户问题",
"description": user_message,
"category": "技术问题",
"priority": "medium",
"confidence": 0.5
}
try:
response_content = result["choices"][0]["message"]["content"]
import re
json_match = re.search(r'\{.*\}', response_content, re.DOTALL)
if json_match:
extracted_info = json.loads(json_match.group())
return {
"title": extracted_info.get("title", "用户问题"),
"description": extracted_info.get("description", user_message),
"category": extracted_info.get("category", "技术问题"),
"priority": extracted_info.get("priority", "medium"),
"confidence": extracted_info.get("confidence", 0.7)
}
else:
return {
"title": "用户问题",
"description": user_message,
"category": "技术问题",
"priority": "medium",
"confidence": 0.5
}
except Exception as e:
logger.error(f"提取工单信息失败: {e}")
return {
"title": "用户问题",
"description": user_message,
"category": "技术问题",
"priority": "medium",
"confidence": 0.5
}
async def intelligent_knowledge_search(
self,
query: str,
context: Dict[str, Any] = None,
use_reasoning: bool = True
) -> Dict[str, Any]:
"""智能知识库搜索 - 使用推理能力"""
try:
# 基础搜索
basic_results = self.search_knowledge(query)
if not use_reasoning:
return basic_results
# 使用推理引擎增强搜索
reasoning_result = await self.agent_core.reasoning_engine.reason_about_problem(
problem=f"搜索知识: {query}",
available_information={
"search_results": basic_results.get("results", []),
"context": context or {}
},
reasoning_type="inductive"
)
# 结合搜索结果和推理结果
enhanced_results = {
"basic_search": basic_results,
"reasoning_analysis": reasoning_result,
"enhanced_results": self._enhance_search_results(
basic_results.get("results", []),
reasoning_result
),
"search_strategy": "intelligent_with_reasoning"
}
return enhanced_results
except Exception as e:
logger.error(f"智能知识搜索失败: {e}")
return self.search_knowledge(query)
def _enhance_search_results(
self,
basic_results: List[Dict[str, Any]],
reasoning_result: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""增强搜索结果"""
enhanced_results = []
for result in basic_results:
enhanced_result = result.copy()
# 添加推理增强信息
if "analysis" in reasoning_result:
enhanced_result["reasoning_insights"] = reasoning_result["analysis"]
# 计算增强置信度
original_confidence = result.get("confidence_score", 0.5)
reasoning_confidence = reasoning_result.get("confidence", 0.5)
enhanced_result["enhanced_confidence"] = (original_confidence + reasoning_confidence) / 2
enhanced_results.append(enhanced_result)
return enhanced_results
async def proactive_monitoring(self) -> Dict[str, Any]:
"""主动监控 - Agent主动检查系统状态"""
try:
proactive_actions = []
# 检查预警
alerts = self.get_alerts()
if alerts.get("count", 0) > 0:
proactive_actions.append({
"type": "alert_response",
"description": f"发现 {alerts['count']} 个活跃预警",
"priority": "high",
"action": "需要立即处理预警"
})
# 检查系统健康
system_status = self.get_system_status()
if system_status.get("health_score", 1.0) < 0.8:
proactive_actions.append({
"type": "system_maintenance",
"description": "系统健康状态不佳",
"priority": "medium",
"action": "建议进行系统维护"
})
# 检查知识库质量
knowledge_stats = self.knowledge_manager.get_knowledge_stats()
if knowledge_stats.get("average_confidence", 0.8) < 0.6:
proactive_actions.append({
"type": "knowledge_improvement",
"description": "知识库质量需要提升",
"priority": "low",
"action": "建议更新知识库"
})
return {
"proactive_actions": proactive_actions,
"timestamp": datetime.now().isoformat(),
"agent_status": self.agent_core.get_status()
}
except Exception as e:
logger.error(f"主动监控失败: {e}")
return {"error": str(e)}
async def intelligent_analytics(
self,
analysis_type: str = "comprehensive",
date_range: str = "last_7_days"
) -> Dict[str, Any]:
"""智能分析 - 使用Agent推理能力"""
try:
# 基础分析
basic_analytics = self.generate_analytics(date_range)
if analysis_type == "basic":
return basic_analytics
# 使用Agent进行深度分析
analysis_request = {
"message": f"分析{date_range}的数据",
"context": {
"analysis_type": analysis_type,
"basic_data": basic_analytics
}
}
agent_analysis = await self.agent_core.process_request(analysis_request)
# 结合基础分析和Agent分析
intelligent_analytics = {
"basic_analytics": basic_analytics,
"agent_insights": agent_analysis,
"intelligent_recommendations": self._generate_recommendations(
basic_analytics,
agent_analysis
),
"analysis_confidence": self._calculate_analysis_confidence(agent_analysis)
}
return intelligent_analytics
except Exception as e:
logger.error(f"智能分析失败: {e}")
return self.generate_analytics(date_range)
def _generate_recommendations(
self,
basic_analytics: Dict[str, Any],
agent_analysis: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""生成智能推荐"""
recommendations = []
# 基于基础分析生成推荐
if basic_analytics.get("summary", {}).get("avg_satisfaction", 0) < 0.7:
recommendations.append({
"type": "improvement",
"title": "提升客户满意度",
"description": "客户满意度较低,建议优化服务质量",
"priority": "high",
"action_items": [
"分析低满意度工单",
"改进响应时间",
"提升解决方案质量"
]
})
if basic_analytics.get("summary", {}).get("avg_resolution_time_hours", 0) > 24:
recommendations.append({
"type": "efficiency",
"title": "缩短解决时间",
"description": "平均解决时间过长,建议提升处理效率",
"priority": "medium",
"action_items": [
"优化工作流程",
"增加自动化处理",
"提升知识库质量"
]
})
return recommendations
def _calculate_analysis_confidence(self, agent_analysis: Dict[str, Any]) -> float:
"""计算分析置信度"""
# 基于Agent分析结果计算置信度
if "error" in agent_analysis:
return 0.3
# 这里可以实现更复杂的置信度计算逻辑
return 0.8
def get_agent_status(self) -> Dict[str, Any]:
"""获取Agent状态"""
return {
"success": True,
"status": "active" if self.is_agent_mode else "inactive",
"active_goals": len(self.agent_core.goal_manager.get_active_goals()),
"available_tools": len(self.agent_core.tool_manager.get_available_tools()),
"tools": [
{
"name": tool.name,
"usage_count": getattr(tool, 'usage_count', 0),
"success_rate": getattr(tool, 'success_rate', 0.8)
}
for tool in self.agent_core.tool_manager.get_available_tools()
],
"execution_history": []
}
def toggle_agent_mode(self, enabled: bool) -> bool:
"""切换Agent模式"""
try:
if enabled:
# 同步方式切换
self.is_agent_mode = True
logger.info("已切换到Agent模式")
return True
else:
return self.switch_to_traditional_mode()
except Exception as e:
logger.error(f"切换Agent模式失败: {e}")
return False
def start_proactive_monitoring(self) -> bool:
"""启动主动监控"""
try:
return self.start_agent_monitoring()
except Exception as e:
logger.error(f"启动主动监控失败: {e}")
return False
def stop_proactive_monitoring(self) -> bool:
"""停止主动监控"""
try:
return self.stop_agent_monitoring()
except Exception as e:
logger.error(f"停止主动监控失败: {e}")
return False
def run_proactive_monitoring(self) -> Dict[str, Any]:
"""运行主动监控"""
try:
# 模拟主动监控结果
proactive_actions = [
{"type": "alert_response", "description": "发现系统性能预警"},
{"type": "knowledge_update", "description": "建议更新知识库"},
{"type": "user_assistance", "description": "检测到用户可能需要帮助"}
]
return {
"success": True,
"proactive_actions": proactive_actions
}
except Exception as e:
logger.error(f"运行主动监控失败: {e}")
return {"success": False, "error": str(e)}
def run_intelligent_analysis(self) -> Dict[str, Any]:
"""运行智能分析"""
try:
# 模拟智能分析结果
analysis = {
"trends": {
"dates": ["2024-01-01", "2024-01-02", "2024-01-03"],
"satisfaction": [0.8, 0.85, 0.82],
"resolution_time": [2.5, 2.3, 2.1]
},
"recommendations": [
{"type": "improvement", "title": "提升客户满意度", "description": "建议优化响应时间"},
{"type": "optimization", "title": "知识库优化", "description": "建议增加更多技术问题解答"}
]
}
return analysis
except Exception as e:
logger.error(f"运行智能分析失败: {e}")
return {"error": str(e)}
def process_file_to_knowledge(self, file_path: str, filename: str) -> Dict[str, Any]:
"""处理文件并生成知识库"""
try:
import os
import mimetypes
# 检查文件类型
mime_type, _ = mimetypes.guess_type(file_path)
file_ext = os.path.splitext(filename)[1].lower()
# 读取文件内容
content = self._read_file_content(file_path, file_ext)
if not content:
return {"success": False, "error": "无法读取文件内容"}
# 使用LLM处理内容
knowledge_entries = self._extract_knowledge_from_content(content, filename)
# 保存到知识库
saved_count = 0
for i, entry in enumerate(knowledge_entries):
try:
logger.info(f"保存知识条目 {i+1}: {entry.get('question', '')[:50]}...")
success = self.knowledge_manager.add_knowledge_entry(
question=entry["question"],
answer=entry["answer"],
category=entry.get("category", "其他"),
confidence_score=entry.get("confidence_score", 0.7),
is_verified=False # 新添加的知识库条目默认为未验证
)
if success:
saved_count += 1
logger.info(f"知识条目 {i+1} 保存成功")
else:
logger.error(f"知识条目 {i+1} 保存失败")
except Exception as save_error:
logger.error(f"保存知识条目 {i+1} 时出错: {save_error}")
logger.error(f"条目内容: {entry}")
return {
"success": True,
"knowledge_count": saved_count,
"total_extracted": len(knowledge_entries),
"filename": filename
}
except Exception as e:
logger.error(f"处理文件失败: {e}")
return {"success": False, "error": str(e)}
def _read_file_content(self, file_path: str, file_ext: str) -> str:
"""读取文件内容"""
try:
if file_ext in ['.txt', '.md']:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
elif file_ext == '.pdf':
# 需要安装 PyPDF2 或 pdfplumber
try:
import PyPDF2
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text
except ImportError:
return "PDF文件需要安装PyPDF2库"
elif file_ext in ['.doc', '.docx']:
# 需要安装 python-docx
try:
from docx import Document
doc = Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
except ImportError:
return "Word文件需要安装python-docx库"
else:
return "不支持的文件格式"
except Exception as e:
logger.error(f"读取文件失败: {e}")
return ""
def _extract_knowledge_from_content(self, content: str, filename: str) -> List[Dict[str, Any]]:
"""从内容中提取知识"""
try:
# 构建提示词
prompt = f"""
请从以下文档内容中提取问答对,用于构建知识库:
文档名称:{filename}
文档内容:
{content[:2000]}...
请按照以下格式提取问答对:
1. 问题:具体的问题描述
2. 答案:详细的答案内容
3. 分类问题所属类别技术问题、APP功能、远程控制、车辆绑定、其他
4. 置信度0-1之间的数值
请提取3-5个最有价值的问答对每个问答对都要完整且实用。
返回格式为JSON数组例如
[
{{
"question": "如何远程启动车辆?",
"answer": "远程启动车辆需要满足以下条件1. 车辆处于P档 2. 手刹拉起 3. 车门已锁 4. 电池电量充足",
"category": "远程控制",
"confidence_score": 0.9
}}
]
"""
# 调用LLM
response = self.llm_client.chat_completion(
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=2000
)
if response and 'choices' in response:
content_text = response['choices'][0]['message']['content']
logger.info(f"LLM响应内容: {content_text[:500]}...")
# 尝试解析JSON
try:
import json
# 提取JSON部分
start_idx = content_text.find('[')
end_idx = content_text.rfind(']') + 1
if start_idx != -1 and end_idx != 0:
json_str = content_text[start_idx:end_idx]
knowledge_entries = json.loads(json_str)
logger.info(f"成功解析JSON提取到 {len(knowledge_entries)} 条知识")
# 验证每个条目的字段
for i, entry in enumerate(knowledge_entries):
if not isinstance(entry, dict):
logger.error(f"条目 {i} 不是字典格式: {entry}")
continue
if 'question' not in entry:
logger.error(f"条目 {i} 缺少question字段: {entry}")
continue
if 'answer' not in entry:
logger.error(f"条目 {i} 缺少answer字段: {entry}")
continue
logger.info(f"条目 {i} 验证通过: {entry.get('question', '')[:50]}...")
return knowledge_entries
except Exception as json_error:
logger.warning(f"JSON解析失败: {json_error}")
logger.warning(f"原始内容: {content_text}")
# 如果JSON解析失败尝试手动解析
manual_entries = self._parse_knowledge_manually(content_text)
logger.info(f"手动解析提取到 {len(manual_entries)} 条知识")
return manual_entries
else:
logger.error("LLM响应格式错误")
logger.error(f"响应内容: {response}")
return []
except Exception as e:
logger.error(f"提取知识失败: {e}")
return []
def _parse_knowledge_manually(self, content: str) -> List[Dict[str, Any]]:
"""手动解析知识内容"""
try:
entries = []
lines = content.split('\n')
current_entry = {}
for line in lines:
line = line.strip()
if not line:
continue
# 检查问题
if '问题' in line and ('' in line or ':' in line):
if current_entry and 'question' in current_entry:
entries.append(current_entry)
current_entry = {}
# 提取问题内容
if '' in line:
question = line.split('', 1)[1].strip()
else:
question = line.split(':', 1)[1].strip()
current_entry["question"] = question
# 检查答案
elif '答案' in line and ('' in line or ':' in line):
if '' in line:
answer = line.split('', 1)[1].strip()
else:
answer = line.split(':', 1)[1].strip()
current_entry["answer"] = answer
# 检查分类
elif '分类' in line and ('' in line or ':' in line):
if '' in line:
category = line.split('', 1)[1].strip()
else:
category = line.split(':', 1)[1].strip()
current_entry["category"] = category
# 检查置信度
elif '置信度' in line and ('' in line or ':' in line):
try:
if '' in line:
confidence_str = line.split('', 1)[1].strip()
else:
confidence_str = line.split(':', 1)[1].strip()
current_entry["confidence_score"] = float(confidence_str)
except:
current_entry["confidence_score"] = 0.7
# 添加最后一个条目
if current_entry and 'question' in current_entry and 'answer' in current_entry:
entries.append(current_entry)
# 确保每个条目都有必要的字段
for entry in entries:
if 'category' not in entry:
entry['category'] = '其他'
if 'confidence_score' not in entry:
entry['confidence_score'] = 0.7
logger.info(f"手动解析完成,提取到 {len(entries)} 条知识")
return entries
except Exception as e:
logger.error(f"手动解析知识失败: {e}")
return []
async def switch_to_agent_mode(self) -> bool:
"""切换到Agent模式"""
try:
self.is_agent_mode = True
logger.info("已切换到Agent模式")
return True
except Exception as e:
logger.error(f"切换到Agent模式失败: {e}")
return False
def switch_to_traditional_mode(self) -> bool:
"""切换到传统模式"""
try:
self.is_agent_mode = False
logger.info("已切换到传统模式")
return True
except Exception as e:
logger.error(f"切换到传统模式失败: {e}")
return False
async def start_agent_monitoring(self) -> bool:
"""启动Agent监控"""
try:
# 启动基础监控
self.start_monitoring()
# 启动Agent主动监控
asyncio.create_task(self._agent_monitoring_loop())
logger.info("Agent监控已启动")
return True
except Exception as e:
logger.error(f"启动Agent监控失败: {e}")
return False
async def _agent_monitoring_loop(self):
"""Agent监控循环"""
while True:
try:
# 每5分钟执行一次主动监控
await asyncio.sleep(300)
proactive_result = await self.proactive_monitoring()
if proactive_result.get("proactive_actions"):
logger.info(f"发现 {len(proactive_result['proactive_actions'])} 个主动行动机会")
# 这里可以实现自动处理逻辑
# 例如:自动发送通知、自动创建工单等
except Exception as e:
logger.error(f"Agent监控循环错误: {e}")
await asyncio.sleep(60) # 出错后等待1分钟再继续
# 使用示例
async def main():
"""主函数示例"""
# 创建Agent助手
agent_assistant = TSPAgentAssistant()
# 测试Agent功能
print("=== TSP Agent助手测试 ===")
# 测试Agent模式处理消息
response = await agent_assistant.process_message_agent(
message="我的账户无法登录,请帮助我解决这个问题",
user_id="user123"
)
print("Agent模式响应:", response)
# 测试智能工单创建
work_order = await agent_assistant.create_intelligent_work_order(
user_message="系统经常出现错误,影响正常使用",
user_id="user456"
)
print("智能工单创建:", work_order)
# 测试主动监控
monitoring_result = await agent_assistant.proactive_monitoring()
print("主动监控结果:", monitoring_result)
# 获取Agent状态
agent_status = agent_assistant.get_agent_status()
print("Agent状态:", agent_status)
if __name__ == "__main__":
asyncio.run(main())