817 lines
31 KiB
Python
817 lines
31 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
增强版TSP助手 - 集成Agent功能
|
||
这是一个真正的智能Agent实现
|
||
"""
|
||
|
||
import logging
|
||
import asyncio
|
||
from typing import Dict, Any, List, Optional
|
||
from datetime import datetime
|
||
import json
|
||
|
||
from src.main import TSPAssistant
|
||
from src.agent import AgentCore, AgentState
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class TSPAgentAssistant(TSPAssistant):
|
||
"""TSP Agent助手 - 增强版TSP助手,具备完整Agent功能"""
|
||
|
||
def __init__(self):
|
||
# 初始化基础TSP助手
|
||
super().__init__()
|
||
|
||
# 初始化Agent核心
|
||
self.agent_core = AgentCore()
|
||
|
||
# Agent特有功能
|
||
self.is_agent_mode = True
|
||
self.proactive_tasks = []
|
||
self.agent_memory = {}
|
||
|
||
logger.info("TSP Agent助手初始化完成")
|
||
|
||
async def process_message_agent(
|
||
self,
|
||
message: str,
|
||
user_id: str = None,
|
||
work_order_id: int = None,
|
||
enable_proactive: bool = True
|
||
) -> Dict[str, Any]:
|
||
"""Agent模式处理用户消息"""
|
||
try:
|
||
# 构建请求
|
||
request = {
|
||
"message": message,
|
||
"user_id": user_id,
|
||
"work_order_id": work_order_id,
|
||
"context": {
|
||
"session_id": f"session_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
}
|
||
|
||
# 使用Agent核心处理请求
|
||
agent_result = await self.agent_core.process_request(request)
|
||
|
||
# 如果启用主动模式,检查是否需要主动行动
|
||
if enable_proactive:
|
||
proactive_result = await self.agent_core.proactive_action()
|
||
if proactive_result:
|
||
agent_result["proactive_action"] = proactive_result
|
||
|
||
# 记录Agent执行信息
|
||
agent_result["agent_mode"] = True
|
||
agent_result["agent_status"] = self.agent_core.get_status()
|
||
|
||
return agent_result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Agent模式处理消息失败: {e}")
|
||
# 回退到传统模式
|
||
return await self._fallback_to_traditional_mode(message, user_id, work_order_id)
|
||
|
||
async def _fallback_to_traditional_mode(
|
||
self,
|
||
message: str,
|
||
user_id: str = None,
|
||
work_order_id: int = None
|
||
) -> Dict[str, Any]:
|
||
"""回退到传统模式"""
|
||
logger.info("回退到传统TSP助手模式")
|
||
|
||
# 使用原有的处理方式
|
||
result = self.process_message(message, user_id, work_order_id)
|
||
|
||
# 添加Agent标识
|
||
result["agent_mode"] = False
|
||
result["fallback_reason"] = "Agent处理失败,使用传统模式"
|
||
|
||
return result
|
||
|
||
async def create_intelligent_work_order(
|
||
self,
|
||
user_message: str,
|
||
user_id: str = None,
|
||
auto_categorize: bool = True,
|
||
auto_priority: bool = True
|
||
) -> Dict[str, Any]:
|
||
"""智能创建工单 - 使用Agent能力"""
|
||
try:
|
||
# 使用Agent分析用户消息
|
||
request = {
|
||
"message": user_message,
|
||
"user_id": user_id,
|
||
"context": {"action": "create_work_order"}
|
||
}
|
||
|
||
agent_result = await self.agent_core.process_request(request)
|
||
|
||
if "error" in agent_result:
|
||
return agent_result
|
||
|
||
# 从Agent结果中提取工单信息
|
||
work_order_info = self._extract_work_order_info(agent_result, user_message)
|
||
|
||
# 创建工单
|
||
work_order = self.create_work_order(
|
||
title=work_order_info["title"],
|
||
description=work_order_info["description"],
|
||
category=work_order_info["category"],
|
||
priority=work_order_info["priority"]
|
||
)
|
||
|
||
# 添加Agent分析结果
|
||
work_order["agent_analysis"] = agent_result
|
||
work_order["intelligent_features"] = {
|
||
"auto_categorized": auto_categorize,
|
||
"auto_prioritized": auto_priority,
|
||
"confidence_score": work_order_info.get("confidence", 0.8)
|
||
}
|
||
|
||
return work_order
|
||
|
||
except Exception as e:
|
||
logger.error(f"智能创建工单失败: {e}")
|
||
return {"error": f"智能创建失败: {str(e)}"}
|
||
|
||
def _extract_work_order_info(self, agent_result: Dict[str, Any], user_message: str) -> Dict[str, Any]:
|
||
"""从Agent结果中提取工单信息"""
|
||
# 这里可以根据Agent的分析结果智能提取工单信息
|
||
# 暂时使用简单的提取逻辑
|
||
|
||
# 使用LLM提取关键信息
|
||
from src.core.llm_client import QwenClient
|
||
llm_client = QwenClient()
|
||
|
||
prompt = f"""
|
||
请从以下用户消息中提取工单信息:
|
||
|
||
用户消息: {user_message}
|
||
|
||
请提取:
|
||
1. 工单标题(简洁明了)
|
||
2. 问题描述(详细描述)
|
||
3. 问题类别(技术问题、账户问题、服务问题等)
|
||
4. 优先级(high、medium、low)
|
||
5. 置信度(0-1)
|
||
|
||
请以JSON格式返回。
|
||
"""
|
||
|
||
messages = [
|
||
{"role": "system", "content": "你是一个工单信息提取专家,擅长从用户消息中提取关键信息。"},
|
||
{"role": "user", "content": prompt}
|
||
]
|
||
|
||
result = llm_client.chat_completion(messages, temperature=0.3)
|
||
|
||
if "error" in result:
|
||
# 使用默认值
|
||
return {
|
||
"title": "用户问题",
|
||
"description": user_message,
|
||
"category": "技术问题",
|
||
"priority": "medium",
|
||
"confidence": 0.5
|
||
}
|
||
|
||
try:
|
||
response_content = result["choices"][0]["message"]["content"]
|
||
import re
|
||
json_match = re.search(r'\{.*\}', response_content, re.DOTALL)
|
||
if json_match:
|
||
extracted_info = json.loads(json_match.group())
|
||
return {
|
||
"title": extracted_info.get("title", "用户问题"),
|
||
"description": extracted_info.get("description", user_message),
|
||
"category": extracted_info.get("category", "技术问题"),
|
||
"priority": extracted_info.get("priority", "medium"),
|
||
"confidence": extracted_info.get("confidence", 0.7)
|
||
}
|
||
else:
|
||
return {
|
||
"title": "用户问题",
|
||
"description": user_message,
|
||
"category": "技术问题",
|
||
"priority": "medium",
|
||
"confidence": 0.5
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"提取工单信息失败: {e}")
|
||
return {
|
||
"title": "用户问题",
|
||
"description": user_message,
|
||
"category": "技术问题",
|
||
"priority": "medium",
|
||
"confidence": 0.5
|
||
}
|
||
|
||
async def intelligent_knowledge_search(
|
||
self,
|
||
query: str,
|
||
context: Dict[str, Any] = None,
|
||
use_reasoning: bool = True
|
||
) -> Dict[str, Any]:
|
||
"""智能知识库搜索 - 使用推理能力"""
|
||
try:
|
||
# 基础搜索
|
||
basic_results = self.search_knowledge(query)
|
||
|
||
if not use_reasoning:
|
||
return basic_results
|
||
|
||
# 使用推理引擎增强搜索
|
||
reasoning_result = await self.agent_core.reasoning_engine.reason_about_problem(
|
||
problem=f"搜索知识: {query}",
|
||
available_information={
|
||
"search_results": basic_results.get("results", []),
|
||
"context": context or {}
|
||
},
|
||
reasoning_type="inductive"
|
||
)
|
||
|
||
# 结合搜索结果和推理结果
|
||
enhanced_results = {
|
||
"basic_search": basic_results,
|
||
"reasoning_analysis": reasoning_result,
|
||
"enhanced_results": self._enhance_search_results(
|
||
basic_results.get("results", []),
|
||
reasoning_result
|
||
),
|
||
"search_strategy": "intelligent_with_reasoning"
|
||
}
|
||
|
||
return enhanced_results
|
||
|
||
except Exception as e:
|
||
logger.error(f"智能知识搜索失败: {e}")
|
||
return self.search_knowledge(query)
|
||
|
||
def _enhance_search_results(
|
||
self,
|
||
basic_results: List[Dict[str, Any]],
|
||
reasoning_result: Dict[str, Any]
|
||
) -> List[Dict[str, Any]]:
|
||
"""增强搜索结果"""
|
||
enhanced_results = []
|
||
|
||
for result in basic_results:
|
||
enhanced_result = result.copy()
|
||
|
||
# 添加推理增强信息
|
||
if "analysis" in reasoning_result:
|
||
enhanced_result["reasoning_insights"] = reasoning_result["analysis"]
|
||
|
||
# 计算增强置信度
|
||
original_confidence = result.get("confidence_score", 0.5)
|
||
reasoning_confidence = reasoning_result.get("confidence", 0.5)
|
||
enhanced_result["enhanced_confidence"] = (original_confidence + reasoning_confidence) / 2
|
||
|
||
enhanced_results.append(enhanced_result)
|
||
|
||
return enhanced_results
|
||
|
||
async def proactive_monitoring(self) -> Dict[str, Any]:
|
||
"""主动监控 - Agent主动检查系统状态"""
|
||
try:
|
||
proactive_actions = []
|
||
|
||
# 检查预警
|
||
alerts = self.get_alerts()
|
||
if alerts.get("count", 0) > 0:
|
||
proactive_actions.append({
|
||
"type": "alert_response",
|
||
"description": f"发现 {alerts['count']} 个活跃预警",
|
||
"priority": "high",
|
||
"action": "需要立即处理预警"
|
||
})
|
||
|
||
# 检查系统健康
|
||
system_status = self.get_system_status()
|
||
if system_status.get("health_score", 1.0) < 0.8:
|
||
proactive_actions.append({
|
||
"type": "system_maintenance",
|
||
"description": "系统健康状态不佳",
|
||
"priority": "medium",
|
||
"action": "建议进行系统维护"
|
||
})
|
||
|
||
# 检查知识库质量
|
||
knowledge_stats = self.knowledge_manager.get_knowledge_stats()
|
||
if knowledge_stats.get("average_confidence", 0.8) < 0.6:
|
||
proactive_actions.append({
|
||
"type": "knowledge_improvement",
|
||
"description": "知识库质量需要提升",
|
||
"priority": "low",
|
||
"action": "建议更新知识库"
|
||
})
|
||
|
||
return {
|
||
"proactive_actions": proactive_actions,
|
||
"timestamp": datetime.now().isoformat(),
|
||
"agent_status": self.agent_core.get_status()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"主动监控失败: {e}")
|
||
return {"error": str(e)}
|
||
|
||
async def intelligent_analytics(
|
||
self,
|
||
analysis_type: str = "comprehensive",
|
||
date_range: str = "last_7_days"
|
||
) -> Dict[str, Any]:
|
||
"""智能分析 - 使用Agent推理能力"""
|
||
try:
|
||
# 基础分析
|
||
basic_analytics = self.generate_analytics(date_range)
|
||
|
||
if analysis_type == "basic":
|
||
return basic_analytics
|
||
|
||
# 使用Agent进行深度分析
|
||
analysis_request = {
|
||
"message": f"分析{date_range}的数据",
|
||
"context": {
|
||
"analysis_type": analysis_type,
|
||
"basic_data": basic_analytics
|
||
}
|
||
}
|
||
|
||
agent_analysis = await self.agent_core.process_request(analysis_request)
|
||
|
||
# 结合基础分析和Agent分析
|
||
intelligent_analytics = {
|
||
"basic_analytics": basic_analytics,
|
||
"agent_insights": agent_analysis,
|
||
"intelligent_recommendations": self._generate_recommendations(
|
||
basic_analytics,
|
||
agent_analysis
|
||
),
|
||
"analysis_confidence": self._calculate_analysis_confidence(agent_analysis)
|
||
}
|
||
|
||
return intelligent_analytics
|
||
|
||
except Exception as e:
|
||
logger.error(f"智能分析失败: {e}")
|
||
return self.generate_analytics(date_range)
|
||
|
||
def _generate_recommendations(
|
||
self,
|
||
basic_analytics: Dict[str, Any],
|
||
agent_analysis: Dict[str, Any]
|
||
) -> List[Dict[str, Any]]:
|
||
"""生成智能推荐"""
|
||
recommendations = []
|
||
|
||
# 基于基础分析生成推荐
|
||
if basic_analytics.get("summary", {}).get("avg_satisfaction", 0) < 0.7:
|
||
recommendations.append({
|
||
"type": "improvement",
|
||
"title": "提升客户满意度",
|
||
"description": "客户满意度较低,建议优化服务质量",
|
||
"priority": "high",
|
||
"action_items": [
|
||
"分析低满意度工单",
|
||
"改进响应时间",
|
||
"提升解决方案质量"
|
||
]
|
||
})
|
||
|
||
if basic_analytics.get("summary", {}).get("avg_resolution_time_hours", 0) > 24:
|
||
recommendations.append({
|
||
"type": "efficiency",
|
||
"title": "缩短解决时间",
|
||
"description": "平均解决时间过长,建议提升处理效率",
|
||
"priority": "medium",
|
||
"action_items": [
|
||
"优化工作流程",
|
||
"增加自动化处理",
|
||
"提升知识库质量"
|
||
]
|
||
})
|
||
|
||
return recommendations
|
||
|
||
def _calculate_analysis_confidence(self, agent_analysis: Dict[str, Any]) -> float:
|
||
"""计算分析置信度"""
|
||
# 基于Agent分析结果计算置信度
|
||
if "error" in agent_analysis:
|
||
return 0.3
|
||
|
||
# 这里可以实现更复杂的置信度计算逻辑
|
||
return 0.8
|
||
|
||
def get_agent_status(self) -> Dict[str, Any]:
|
||
"""获取Agent状态"""
|
||
return {
|
||
"success": True,
|
||
"status": "active" if self.is_agent_mode else "inactive",
|
||
"active_goals": len(self.agent_core.goal_manager.get_active_goals()),
|
||
"available_tools": len(self.agent_core.tool_manager.get_available_tools()),
|
||
"tools": [
|
||
{
|
||
"name": tool.name,
|
||
"usage_count": getattr(tool, 'usage_count', 0),
|
||
"success_rate": getattr(tool, 'success_rate', 0.8)
|
||
}
|
||
for tool in self.agent_core.tool_manager.get_available_tools()
|
||
],
|
||
"execution_history": []
|
||
}
|
||
|
||
def toggle_agent_mode(self, enabled: bool) -> bool:
|
||
"""切换Agent模式"""
|
||
try:
|
||
if enabled:
|
||
# 同步方式切换
|
||
self.is_agent_mode = True
|
||
logger.info("已切换到Agent模式")
|
||
return True
|
||
else:
|
||
return self.switch_to_traditional_mode()
|
||
except Exception as e:
|
||
logger.error(f"切换Agent模式失败: {e}")
|
||
return False
|
||
|
||
def start_proactive_monitoring(self) -> bool:
|
||
"""启动主动监控"""
|
||
try:
|
||
return self.start_agent_monitoring()
|
||
except Exception as e:
|
||
logger.error(f"启动主动监控失败: {e}")
|
||
return False
|
||
|
||
def stop_proactive_monitoring(self) -> bool:
|
||
"""停止主动监控"""
|
||
try:
|
||
return self.stop_agent_monitoring()
|
||
except Exception as e:
|
||
logger.error(f"停止主动监控失败: {e}")
|
||
return False
|
||
|
||
def run_proactive_monitoring(self) -> Dict[str, Any]:
|
||
"""运行主动监控"""
|
||
try:
|
||
# 模拟主动监控结果
|
||
proactive_actions = [
|
||
{"type": "alert_response", "description": "发现系统性能预警"},
|
||
{"type": "knowledge_update", "description": "建议更新知识库"},
|
||
{"type": "user_assistance", "description": "检测到用户可能需要帮助"}
|
||
]
|
||
return {
|
||
"success": True,
|
||
"proactive_actions": proactive_actions
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"运行主动监控失败: {e}")
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def run_intelligent_analysis(self) -> Dict[str, Any]:
|
||
"""运行智能分析"""
|
||
try:
|
||
# 模拟智能分析结果
|
||
analysis = {
|
||
"trends": {
|
||
"dates": ["2024-01-01", "2024-01-02", "2024-01-03"],
|
||
"satisfaction": [0.8, 0.85, 0.82],
|
||
"resolution_time": [2.5, 2.3, 2.1]
|
||
},
|
||
"recommendations": [
|
||
{"type": "improvement", "title": "提升客户满意度", "description": "建议优化响应时间"},
|
||
{"type": "optimization", "title": "知识库优化", "description": "建议增加更多技术问题解答"}
|
||
]
|
||
}
|
||
return analysis
|
||
except Exception as e:
|
||
logger.error(f"运行智能分析失败: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def process_file_to_knowledge(self, file_path: str, filename: str) -> Dict[str, Any]:
|
||
"""处理文件并生成知识库"""
|
||
try:
|
||
import os
|
||
import mimetypes
|
||
|
||
# 检查文件类型
|
||
mime_type, _ = mimetypes.guess_type(file_path)
|
||
file_ext = os.path.splitext(filename)[1].lower()
|
||
|
||
# 读取文件内容
|
||
content = self._read_file_content(file_path, file_ext)
|
||
if not content:
|
||
return {"success": False, "error": "无法读取文件内容"}
|
||
|
||
# 使用LLM处理内容
|
||
knowledge_entries = self._extract_knowledge_from_content(content, filename)
|
||
|
||
# 保存到知识库
|
||
saved_count = 0
|
||
for i, entry in enumerate(knowledge_entries):
|
||
try:
|
||
logger.info(f"保存知识条目 {i+1}: {entry.get('question', '')[:50]}...")
|
||
success = self.knowledge_manager.add_knowledge_entry(
|
||
question=entry["question"],
|
||
answer=entry["answer"],
|
||
category=entry.get("category", "其他"),
|
||
confidence_score=entry.get("confidence_score", 0.7),
|
||
is_verified=False # 新添加的知识库条目默认为未验证
|
||
)
|
||
if success:
|
||
saved_count += 1
|
||
logger.info(f"知识条目 {i+1} 保存成功")
|
||
else:
|
||
logger.error(f"知识条目 {i+1} 保存失败")
|
||
except Exception as save_error:
|
||
logger.error(f"保存知识条目 {i+1} 时出错: {save_error}")
|
||
logger.error(f"条目内容: {entry}")
|
||
|
||
return {
|
||
"success": True,
|
||
"knowledge_count": saved_count,
|
||
"total_extracted": len(knowledge_entries),
|
||
"filename": filename
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理文件失败: {e}")
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def _read_file_content(self, file_path: str, file_ext: str) -> str:
|
||
"""读取文件内容"""
|
||
try:
|
||
if file_ext in ['.txt', '.md']:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
return f.read()
|
||
elif file_ext == '.pdf':
|
||
# 需要安装 PyPDF2 或 pdfplumber
|
||
try:
|
||
import PyPDF2
|
||
with open(file_path, 'rb') as f:
|
||
reader = PyPDF2.PdfReader(f)
|
||
text = ""
|
||
for page in reader.pages:
|
||
text += page.extract_text() + "\n"
|
||
return text
|
||
except ImportError:
|
||
return "PDF文件需要安装PyPDF2库"
|
||
elif file_ext in ['.doc', '.docx']:
|
||
# 需要安装 python-docx
|
||
try:
|
||
from docx import Document
|
||
doc = Document(file_path)
|
||
text = ""
|
||
for paragraph in doc.paragraphs:
|
||
text += paragraph.text + "\n"
|
||
return text
|
||
except ImportError:
|
||
return "Word文件需要安装python-docx库"
|
||
else:
|
||
return "不支持的文件格式"
|
||
except Exception as e:
|
||
logger.error(f"读取文件失败: {e}")
|
||
return ""
|
||
|
||
def _extract_knowledge_from_content(self, content: str, filename: str) -> List[Dict[str, Any]]:
|
||
"""从内容中提取知识"""
|
||
try:
|
||
# 构建提示词
|
||
prompt = f"""
|
||
请从以下文档内容中提取问答对,用于构建知识库:
|
||
|
||
文档名称:{filename}
|
||
文档内容:
|
||
{content[:2000]}...
|
||
|
||
请按照以下格式提取问答对:
|
||
1. 问题:具体的问题描述
|
||
2. 答案:详细的答案内容
|
||
3. 分类:问题所属类别(技术问题、APP功能、远程控制、车辆绑定、其他)
|
||
4. 置信度:0-1之间的数值
|
||
|
||
请提取3-5个最有价值的问答对,每个问答对都要完整且实用。
|
||
返回格式为JSON数组,例如:
|
||
[
|
||
{{
|
||
"question": "如何远程启动车辆?",
|
||
"answer": "远程启动车辆需要满足以下条件:1. 车辆处于P档 2. 手刹拉起 3. 车门已锁 4. 电池电量充足",
|
||
"category": "远程控制",
|
||
"confidence_score": 0.9
|
||
}}
|
||
]
|
||
"""
|
||
|
||
# 调用LLM
|
||
response = self.llm_client.chat_completion(
|
||
messages=[{"role": "user", "content": prompt}],
|
||
temperature=0.3,
|
||
max_tokens=2000
|
||
)
|
||
|
||
if response and 'choices' in response:
|
||
content_text = response['choices'][0]['message']['content']
|
||
logger.info(f"LLM响应内容: {content_text[:500]}...")
|
||
|
||
# 尝试解析JSON
|
||
try:
|
||
import json
|
||
# 提取JSON部分
|
||
start_idx = content_text.find('[')
|
||
end_idx = content_text.rfind(']') + 1
|
||
if start_idx != -1 and end_idx != 0:
|
||
json_str = content_text[start_idx:end_idx]
|
||
knowledge_entries = json.loads(json_str)
|
||
logger.info(f"成功解析JSON,提取到 {len(knowledge_entries)} 条知识")
|
||
|
||
# 验证每个条目的字段
|
||
for i, entry in enumerate(knowledge_entries):
|
||
if not isinstance(entry, dict):
|
||
logger.error(f"条目 {i} 不是字典格式: {entry}")
|
||
continue
|
||
if 'question' not in entry:
|
||
logger.error(f"条目 {i} 缺少question字段: {entry}")
|
||
continue
|
||
if 'answer' not in entry:
|
||
logger.error(f"条目 {i} 缺少answer字段: {entry}")
|
||
continue
|
||
logger.info(f"条目 {i} 验证通过: {entry.get('question', '')[:50]}...")
|
||
|
||
return knowledge_entries
|
||
except Exception as json_error:
|
||
logger.warning(f"JSON解析失败: {json_error}")
|
||
logger.warning(f"原始内容: {content_text}")
|
||
|
||
# 如果JSON解析失败,尝试手动解析
|
||
manual_entries = self._parse_knowledge_manually(content_text)
|
||
logger.info(f"手动解析提取到 {len(manual_entries)} 条知识")
|
||
return manual_entries
|
||
else:
|
||
logger.error("LLM响应格式错误")
|
||
logger.error(f"响应内容: {response}")
|
||
return []
|
||
|
||
except Exception as e:
|
||
logger.error(f"提取知识失败: {e}")
|
||
return []
|
||
|
||
def _parse_knowledge_manually(self, content: str) -> List[Dict[str, Any]]:
|
||
"""手动解析知识内容"""
|
||
try:
|
||
entries = []
|
||
lines = content.split('\n')
|
||
|
||
current_entry = {}
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# 检查问题
|
||
if '问题' in line and (':' in line or ':' in line):
|
||
if current_entry and 'question' in current_entry:
|
||
entries.append(current_entry)
|
||
current_entry = {}
|
||
# 提取问题内容
|
||
if ':' in line:
|
||
question = line.split(':', 1)[1].strip()
|
||
else:
|
||
question = line.split(':', 1)[1].strip()
|
||
current_entry["question"] = question
|
||
|
||
# 检查答案
|
||
elif '答案' in line and (':' in line or ':' in line):
|
||
if ':' in line:
|
||
answer = line.split(':', 1)[1].strip()
|
||
else:
|
||
answer = line.split(':', 1)[1].strip()
|
||
current_entry["answer"] = answer
|
||
|
||
# 检查分类
|
||
elif '分类' in line and (':' in line or ':' in line):
|
||
if ':' in line:
|
||
category = line.split(':', 1)[1].strip()
|
||
else:
|
||
category = line.split(':', 1)[1].strip()
|
||
current_entry["category"] = category
|
||
|
||
# 检查置信度
|
||
elif '置信度' in line and (':' in line or ':' in line):
|
||
try:
|
||
if ':' in line:
|
||
confidence_str = line.split(':', 1)[1].strip()
|
||
else:
|
||
confidence_str = line.split(':', 1)[1].strip()
|
||
current_entry["confidence_score"] = float(confidence_str)
|
||
except:
|
||
current_entry["confidence_score"] = 0.7
|
||
|
||
# 添加最后一个条目
|
||
if current_entry and 'question' in current_entry and 'answer' in current_entry:
|
||
entries.append(current_entry)
|
||
|
||
# 确保每个条目都有必要的字段
|
||
for entry in entries:
|
||
if 'category' not in entry:
|
||
entry['category'] = '其他'
|
||
if 'confidence_score' not in entry:
|
||
entry['confidence_score'] = 0.7
|
||
|
||
logger.info(f"手动解析完成,提取到 {len(entries)} 条知识")
|
||
return entries
|
||
|
||
except Exception as e:
|
||
logger.error(f"手动解析知识失败: {e}")
|
||
return []
|
||
|
||
async def switch_to_agent_mode(self) -> bool:
|
||
"""切换到Agent模式"""
|
||
try:
|
||
self.is_agent_mode = True
|
||
logger.info("已切换到Agent模式")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"切换到Agent模式失败: {e}")
|
||
return False
|
||
|
||
def switch_to_traditional_mode(self) -> bool:
|
||
"""切换到传统模式"""
|
||
try:
|
||
self.is_agent_mode = False
|
||
logger.info("已切换到传统模式")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"切换到传统模式失败: {e}")
|
||
return False
|
||
|
||
async def start_agent_monitoring(self) -> bool:
|
||
"""启动Agent监控"""
|
||
try:
|
||
# 启动基础监控
|
||
self.start_monitoring()
|
||
|
||
# 启动Agent主动监控
|
||
asyncio.create_task(self._agent_monitoring_loop())
|
||
|
||
logger.info("Agent监控已启动")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"启动Agent监控失败: {e}")
|
||
return False
|
||
|
||
async def _agent_monitoring_loop(self):
|
||
"""Agent监控循环"""
|
||
while True:
|
||
try:
|
||
# 每5分钟执行一次主动监控
|
||
await asyncio.sleep(300)
|
||
|
||
proactive_result = await self.proactive_monitoring()
|
||
|
||
if proactive_result.get("proactive_actions"):
|
||
logger.info(f"发现 {len(proactive_result['proactive_actions'])} 个主动行动机会")
|
||
|
||
# 这里可以实现自动处理逻辑
|
||
# 例如:自动发送通知、自动创建工单等
|
||
|
||
except Exception as e:
|
||
logger.error(f"Agent监控循环错误: {e}")
|
||
await asyncio.sleep(60) # 出错后等待1分钟再继续
|
||
|
||
# 使用示例
|
||
async def main():
|
||
"""主函数示例"""
|
||
# 创建Agent助手
|
||
agent_assistant = TSPAgentAssistant()
|
||
|
||
# 测试Agent功能
|
||
print("=== TSP Agent助手测试 ===")
|
||
|
||
# 测试Agent模式处理消息
|
||
response = await agent_assistant.process_message_agent(
|
||
message="我的账户无法登录,请帮助我解决这个问题",
|
||
user_id="user123"
|
||
)
|
||
print("Agent模式响应:", response)
|
||
|
||
# 测试智能工单创建
|
||
work_order = await agent_assistant.create_intelligent_work_order(
|
||
user_message="系统经常出现错误,影响正常使用",
|
||
user_id="user456"
|
||
)
|
||
print("智能工单创建:", work_order)
|
||
|
||
# 测试主动监控
|
||
monitoring_result = await agent_assistant.proactive_monitoring()
|
||
print("主动监控结果:", monitoring_result)
|
||
|
||
# 获取Agent状态
|
||
agent_status = agent_assistant.get_agent_status()
|
||
print("Agent状态:", agent_status)
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|