Files
assist/src/agent/intelligent_agent.py
2025-09-16 17:05:50 +01:00

372 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
智能Agent核心 - 集成大模型和智能决策
高效实现Agent的智能处理能力
"""
import logging
import asyncio
import json
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class ActionType(Enum):
"""动作类型枚举"""
ALERT_RESPONSE = "alert_response"
KNOWLEDGE_UPDATE = "knowledge_update"
WORKORDER_CREATE = "workorder_create"
SYSTEM_OPTIMIZE = "system_optimize"
USER_NOTIFY = "user_notify"
class ConfidenceLevel(Enum):
"""置信度等级"""
HIGH = "high" # 高置信度 (>0.8)
MEDIUM = "medium" # 中等置信度 (0.5-0.8)
LOW = "low" # 低置信度 (<0.5)
@dataclass
class AgentAction:
"""Agent动作"""
action_type: ActionType
description: str
priority: int # 1-5, 5最高
confidence: float # 0-1
parameters: Dict[str, Any]
estimated_time: int # 预计执行时间(秒)
@dataclass
class AlertContext:
"""预警上下文"""
alert_id: str
alert_type: str
severity: str
description: str
affected_systems: List[str]
metrics: Dict[str, Any]
@dataclass
class KnowledgeContext:
"""知识库上下文"""
question: str
answer: str
confidence: float
source: str
category: str
class IntelligentAgent:
"""智能Agent核心"""
def __init__(self, llm_client=None):
self.llm_client = llm_client
self.action_history = []
self.learning_data = {}
self.confidence_thresholds = {
'high': 0.8,
'medium': 0.5,
'low': 0.3
}
async def process_alert(self, alert_context: AlertContext) -> List[AgentAction]:
"""处理预警信息,生成智能动作"""
try:
# 构建预警分析提示
prompt = self._build_alert_analysis_prompt(alert_context)
# 调用大模型分析
analysis = await self._call_llm(prompt)
# 解析动作
actions = self._parse_alert_actions(analysis, alert_context)
# 按优先级排序
actions.sort(key=lambda x: x.priority, reverse=True)
return actions
except Exception as e:
logger.error(f"处理预警失败: {e}")
return [self._create_default_alert_action(alert_context)]
async def process_knowledge_confidence(self, knowledge_context: KnowledgeContext) -> List[AgentAction]:
"""处理知识库置信度问题"""
try:
if knowledge_context.confidence >= self.confidence_thresholds['high']:
return [] # 高置信度,无需处理
# 构建知识增强提示
prompt = self._build_knowledge_enhancement_prompt(knowledge_context)
# 调用大模型增强知识
enhancement = await self._call_llm(prompt)
# 生成增强动作
actions = self._parse_knowledge_actions(enhancement, knowledge_context)
return actions
except Exception as e:
logger.error(f"处理知识库置信度失败: {e}")
return [self._create_default_knowledge_action(knowledge_context)]
async def execute_action(self, action: AgentAction) -> Dict[str, Any]:
"""执行Agent动作"""
try:
logger.info(f"执行Agent动作: {action.action_type.value} - {action.description}")
if action.action_type == ActionType.ALERT_RESPONSE:
return await self._execute_alert_response(action)
elif action.action_type == ActionType.KNOWLEDGE_UPDATE:
return await self._execute_knowledge_update(action)
elif action.action_type == ActionType.WORKORDER_CREATE:
return await self._execute_workorder_create(action)
elif action.action_type == ActionType.SYSTEM_OPTIMIZE:
return await self._execute_system_optimize(action)
elif action.action_type == ActionType.USER_NOTIFY:
return await self._execute_user_notify(action)
else:
return {"success": False, "error": "未知动作类型"}
except Exception as e:
logger.error(f"执行动作失败: {e}")
return {"success": False, "error": str(e)}
def _build_alert_analysis_prompt(self, alert_context: AlertContext) -> str:
"""构建预警分析提示"""
return f"""
作为TSP智能助手请分析以下预警信息并提供处理建议
预警信息:
- 类型: {alert_context.alert_type}
- 严重程度: {alert_context.severity}
- 描述: {alert_context.description}
- 影响系统: {', '.join(alert_context.affected_systems)}
- 指标数据: {json.dumps(alert_context.metrics, ensure_ascii=False)}
请提供以下格式的JSON响应
{{
"analysis": "预警原因分析",
"immediate_actions": [
{{
"action": "立即执行的动作",
"priority": 5,
"confidence": 0.9,
"parameters": {{"key": "value"}}
}}
],
"follow_up_actions": [
{{
"action": "后续跟进动作",
"priority": 3,
"confidence": 0.7,
"parameters": {{"key": "value"}}
}}
],
"prevention_measures": [
"预防措施1",
"预防措施2"
]
}}
"""
def _build_knowledge_enhancement_prompt(self, knowledge_context: KnowledgeContext) -> str:
"""构建知识增强提示"""
return f"""
作为TSP智能助手请分析以下知识库条目并提供增强建议
知识条目:
- 问题: {knowledge_context.question}
- 答案: {knowledge_context.answer}
- 置信度: {knowledge_context.confidence}
- 来源: {knowledge_context.source}
- 分类: {knowledge_context.category}
请提供以下格式的JSON响应
{{
"confidence_analysis": "置信度分析",
"enhancement_suggestions": [
"增强建议1",
"增强建议2"
],
"actions": [
{{
"action": "知识更新动作",
"priority": 4,
"confidence": 0.8,
"parameters": {{"enhanced_answer": "增强后的答案"}}
}}
],
"learning_opportunities": [
"学习机会1",
"学习机会2"
]
}}
"""
async def _call_llm(self, prompt: str) -> Dict[str, Any]:
"""调用大模型"""
try:
if self.llm_client:
# 使用真实的大模型客户端
response = await self.llm_client.generate(prompt)
return json.loads(response)
else:
# 模拟大模型响应
return self._simulate_llm_response(prompt)
except Exception as e:
logger.error(f"调用大模型失败: {e}")
return self._simulate_llm_response(prompt)
def _simulate_llm_response(self, prompt: str) -> Dict[str, Any]:
"""模拟大模型响应 - 千问模型风格"""
if "预警信息" in prompt:
return {
"analysis": "【千问分析】系统性能下降,需要立即处理。根据历史数据分析,这可能是由于资源不足或配置问题导致的。",
"immediate_actions": [
{
"action": "重启相关服务",
"priority": 5,
"confidence": 0.9,
"parameters": {"service": "main_service", "reason": "服务响应超时"}
}
],
"follow_up_actions": [
{
"action": "检查系统日志",
"priority": 3,
"confidence": 0.7,
"parameters": {"log_level": "error", "time_range": "last_hour"}
}
],
"prevention_measures": [
"增加监控频率,提前发现问题",
"优化系统配置,提升性能",
"建立预警机制,减少故障影响"
]
}
else:
return {
"confidence_analysis": "【千问分析】当前答案置信度较低,需要更多上下文信息。建议结合用户反馈和历史工单数据来提升答案质量。",
"enhancement_suggestions": [
"添加更多实际案例和操作步骤",
"提供详细的故障排除指南",
"结合系统架构图进行说明"
],
"actions": [
{
"action": "更新知识库条目",
"priority": 4,
"confidence": 0.8,
"parameters": {"enhanced_answer": "基于千问模型分析的增强答案"}
}
],
"learning_opportunities": [
"收集用户反馈,持续优化答案",
"分析相似问题,建立知识关联",
"利用千问模型的学习能力,提升知识质量"
]
}
def _parse_alert_actions(self, analysis: Dict[str, Any], alert_context: AlertContext) -> List[AgentAction]:
"""解析预警动作"""
actions = []
# 立即动作
for action_data in analysis.get("immediate_actions", []):
action = AgentAction(
action_type=ActionType.ALERT_RESPONSE,
description=action_data["action"],
priority=action_data["priority"],
confidence=action_data["confidence"],
parameters=action_data["parameters"],
estimated_time=30
)
actions.append(action)
# 后续动作
for action_data in analysis.get("follow_up_actions", []):
action = AgentAction(
action_type=ActionType.SYSTEM_OPTIMIZE,
description=action_data["action"],
priority=action_data["priority"],
confidence=action_data["confidence"],
parameters=action_data["parameters"],
estimated_time=300
)
actions.append(action)
return actions
def _parse_knowledge_actions(self, enhancement: Dict[str, Any], knowledge_context: KnowledgeContext) -> List[AgentAction]:
"""解析知识库动作"""
actions = []
for action_data in enhancement.get("actions", []):
action = AgentAction(
action_type=ActionType.KNOWLEDGE_UPDATE,
description=action_data["action"],
priority=action_data["priority"],
confidence=action_data["confidence"],
parameters=action_data["parameters"],
estimated_time=60
)
actions.append(action)
return actions
def _create_default_alert_action(self, alert_context: AlertContext) -> AgentAction:
"""创建默认预警动作"""
return AgentAction(
action_type=ActionType.USER_NOTIFY,
description=f"通知管理员处理{alert_context.alert_type}预警",
priority=3,
confidence=0.5,
parameters={"alert_id": alert_context.alert_id},
estimated_time=10
)
def _create_default_knowledge_action(self, knowledge_context: KnowledgeContext) -> AgentAction:
"""创建默认知识库动作"""
return AgentAction(
action_type=ActionType.KNOWLEDGE_UPDATE,
description="标记低置信度知识条目,等待人工审核",
priority=2,
confidence=0.3,
parameters={"question": knowledge_context.question},
estimated_time=5
)
async def _execute_alert_response(self, action: AgentAction) -> Dict[str, Any]:
"""执行预警响应动作"""
# 这里实现具体的预警响应逻辑
logger.info(f"执行预警响应: {action.description}")
return {"success": True, "message": "预警响应已执行"}
async def _execute_knowledge_update(self, action: AgentAction) -> Dict[str, Any]:
"""执行知识库更新动作"""
# 这里实现具体的知识库更新逻辑
logger.info(f"执行知识库更新: {action.description}")
return {"success": True, "message": "知识库已更新"}
async def _execute_workorder_create(self, action: AgentAction) -> Dict[str, Any]:
"""执行工单创建动作"""
# 这里实现具体的工单创建逻辑
logger.info(f"执行工单创建: {action.description}")
return {"success": True, "message": "工单已创建"}
async def _execute_system_optimize(self, action: AgentAction) -> Dict[str, Any]:
"""执行系统优化动作"""
# 这里实现具体的系统优化逻辑
logger.info(f"执行系统优化: {action.description}")
return {"success": True, "message": "系统优化已执行"}
async def _execute_user_notify(self, action: AgentAction) -> Dict[str, Any]:
"""执行用户通知动作"""
# 这里实现具体的用户通知逻辑
logger.info(f"执行用户通知: {action.description}")
return {"success": True, "message": "用户已通知"}