#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 智能Agent核心 - 集成大模型和智能决策 高效实现Agent的智能处理能力 """ import logging import asyncio import json from typing import Dict, Any, List, Optional, Tuple from datetime import datetime from dataclasses import dataclass from enum import Enum logger = logging.getLogger(__name__) class ActionType(Enum): """动作类型枚举""" ALERT_RESPONSE = "alert_response" KNOWLEDGE_UPDATE = "knowledge_update" WORKORDER_CREATE = "workorder_create" SYSTEM_OPTIMIZE = "system_optimize" USER_NOTIFY = "user_notify" class ConfidenceLevel(Enum): """置信度等级""" HIGH = "high" # 高置信度 (>0.8) MEDIUM = "medium" # 中等置信度 (0.5-0.8) LOW = "low" # 低置信度 (<0.5) @dataclass class AgentAction: """Agent动作""" action_type: ActionType description: str priority: int # 1-5, 5最高 confidence: float # 0-1 parameters: Dict[str, Any] estimated_time: int # 预计执行时间(秒) @dataclass class AlertContext: """预警上下文""" alert_id: str alert_type: str severity: str description: str affected_systems: List[str] metrics: Dict[str, Any] @dataclass class KnowledgeContext: """知识库上下文""" question: str answer: str confidence: float source: str category: str class IntelligentAgent: """智能Agent核心""" def __init__(self, llm_client=None): self.llm_client = llm_client self.action_history = [] self.learning_data = {} self.confidence_thresholds = { 'high': 0.8, 'medium': 0.5, 'low': 0.3 } async def process_alert(self, alert_context: AlertContext) -> List[AgentAction]: """处理预警信息,生成智能动作""" try: # 构建预警分析提示 prompt = self._build_alert_analysis_prompt(alert_context) # 调用大模型分析 analysis = await self._call_llm(prompt) # 解析动作 actions = self._parse_alert_actions(analysis, alert_context) # 按优先级排序 actions.sort(key=lambda x: x.priority, reverse=True) return actions except Exception as e: logger.error(f"处理预警失败: {e}") return [self._create_default_alert_action(alert_context)] async def process_knowledge_confidence(self, knowledge_context: KnowledgeContext) -> List[AgentAction]: """处理知识库置信度问题""" try: if knowledge_context.confidence >= self.confidence_thresholds['high']: return [] # 高置信度,无需处理 # 构建知识增强提示 prompt = self._build_knowledge_enhancement_prompt(knowledge_context) # 调用大模型增强知识 enhancement = await self._call_llm(prompt) # 生成增强动作 actions = self._parse_knowledge_actions(enhancement, knowledge_context) return actions except Exception as e: logger.error(f"处理知识库置信度失败: {e}") return [self._create_default_knowledge_action(knowledge_context)] async def execute_action(self, action: AgentAction) -> Dict[str, Any]: """执行Agent动作""" try: logger.info(f"执行Agent动作: {action.action_type.value} - {action.description}") if action.action_type == ActionType.ALERT_RESPONSE: return await self._execute_alert_response(action) elif action.action_type == ActionType.KNOWLEDGE_UPDATE: return await self._execute_knowledge_update(action) elif action.action_type == ActionType.WORKORDER_CREATE: return await self._execute_workorder_create(action) elif action.action_type == ActionType.SYSTEM_OPTIMIZE: return await self._execute_system_optimize(action) elif action.action_type == ActionType.USER_NOTIFY: return await self._execute_user_notify(action) else: return {"success": False, "error": "未知动作类型"} except Exception as e: logger.error(f"执行动作失败: {e}") return {"success": False, "error": str(e)} def _build_alert_analysis_prompt(self, alert_context: AlertContext) -> str: """构建预警分析提示""" return f""" 作为TSP智能助手,请分析以下预警信息并提供处理建议: 预警信息: - 类型: {alert_context.alert_type} - 严重程度: {alert_context.severity} - 描述: {alert_context.description} - 影响系统: {', '.join(alert_context.affected_systems)} - 指标数据: {json.dumps(alert_context.metrics, ensure_ascii=False)} 请提供以下格式的JSON响应: {{ "analysis": "预警原因分析", "immediate_actions": [ {{ "action": "立即执行的动作", "priority": 5, "confidence": 0.9, "parameters": {{"key": "value"}} }} ], "follow_up_actions": [ {{ "action": "后续跟进动作", "priority": 3, "confidence": 0.7, "parameters": {{"key": "value"}} }} ], "prevention_measures": [ "预防措施1", "预防措施2" ] }} """ def _build_knowledge_enhancement_prompt(self, knowledge_context: KnowledgeContext) -> str: """构建知识增强提示""" return f""" 作为TSP智能助手,请分析以下知识库条目并提供增强建议: 知识条目: - 问题: {knowledge_context.question} - 答案: {knowledge_context.answer} - 置信度: {knowledge_context.confidence} - 来源: {knowledge_context.source} - 分类: {knowledge_context.category} 请提供以下格式的JSON响应: {{ "confidence_analysis": "置信度分析", "enhancement_suggestions": [ "增强建议1", "增强建议2" ], "actions": [ {{ "action": "知识更新动作", "priority": 4, "confidence": 0.8, "parameters": {{"enhanced_answer": "增强后的答案"}} }} ], "learning_opportunities": [ "学习机会1", "学习机会2" ] }} """ async def _call_llm(self, prompt: str) -> Dict[str, Any]: """调用大模型""" try: if self.llm_client: # 使用真实的大模型客户端 response = await self.llm_client.generate(prompt) return json.loads(response) else: # 模拟大模型响应 return self._simulate_llm_response(prompt) except Exception as e: logger.error(f"调用大模型失败: {e}") return self._simulate_llm_response(prompt) def _simulate_llm_response(self, prompt: str) -> Dict[str, Any]: """模拟大模型响应""" if "预警信息" in prompt: return { "analysis": "系统性能下降,需要立即处理", "immediate_actions": [ { "action": "重启相关服务", "priority": 5, "confidence": 0.9, "parameters": {"service": "main_service"} } ], "follow_up_actions": [ { "action": "检查系统日志", "priority": 3, "confidence": 0.7, "parameters": {"log_level": "error"} } ], "prevention_measures": [ "增加监控频率", "优化系统配置" ] } else: return { "confidence_analysis": "当前答案置信度较低,需要更多上下文信息", "enhancement_suggestions": [ "添加更多示例", "提供详细步骤" ], "actions": [ { "action": "更新知识库条目", "priority": 4, "confidence": 0.8, "parameters": {"enhanced_answer": "增强后的答案"} } ], "learning_opportunities": [ "收集用户反馈", "分析相似问题" ] } def _parse_alert_actions(self, analysis: Dict[str, Any], alert_context: AlertContext) -> List[AgentAction]: """解析预警动作""" actions = [] # 立即动作 for action_data in analysis.get("immediate_actions", []): action = AgentAction( action_type=ActionType.ALERT_RESPONSE, description=action_data["action"], priority=action_data["priority"], confidence=action_data["confidence"], parameters=action_data["parameters"], estimated_time=30 ) actions.append(action) # 后续动作 for action_data in analysis.get("follow_up_actions", []): action = AgentAction( action_type=ActionType.SYSTEM_OPTIMIZE, description=action_data["action"], priority=action_data["priority"], confidence=action_data["confidence"], parameters=action_data["parameters"], estimated_time=300 ) actions.append(action) return actions def _parse_knowledge_actions(self, enhancement: Dict[str, Any], knowledge_context: KnowledgeContext) -> List[AgentAction]: """解析知识库动作""" actions = [] for action_data in enhancement.get("actions", []): action = AgentAction( action_type=ActionType.KNOWLEDGE_UPDATE, description=action_data["action"], priority=action_data["priority"], confidence=action_data["confidence"], parameters=action_data["parameters"], estimated_time=60 ) actions.append(action) return actions def _create_default_alert_action(self, alert_context: AlertContext) -> AgentAction: """创建默认预警动作""" return AgentAction( action_type=ActionType.USER_NOTIFY, description=f"通知管理员处理{alert_context.alert_type}预警", priority=3, confidence=0.5, parameters={"alert_id": alert_context.alert_id}, estimated_time=10 ) def _create_default_knowledge_action(self, knowledge_context: KnowledgeContext) -> AgentAction: """创建默认知识库动作""" return AgentAction( action_type=ActionType.KNOWLEDGE_UPDATE, description="标记低置信度知识条目,等待人工审核", priority=2, confidence=0.3, parameters={"question": knowledge_context.question}, estimated_time=5 ) async def _execute_alert_response(self, action: AgentAction) -> Dict[str, Any]: """执行预警响应动作""" # 这里实现具体的预警响应逻辑 logger.info(f"执行预警响应: {action.description}") return {"success": True, "message": "预警响应已执行"} async def _execute_knowledge_update(self, action: AgentAction) -> Dict[str, Any]: """执行知识库更新动作""" # 这里实现具体的知识库更新逻辑 logger.info(f"执行知识库更新: {action.description}") return {"success": True, "message": "知识库已更新"} async def _execute_workorder_create(self, action: AgentAction) -> Dict[str, Any]: """执行工单创建动作""" # 这里实现具体的工单创建逻辑 logger.info(f"执行工单创建: {action.description}") return {"success": True, "message": "工单已创建"} async def _execute_system_optimize(self, action: AgentAction) -> Dict[str, Any]: """执行系统优化动作""" # 这里实现具体的系统优化逻辑 logger.info(f"执行系统优化: {action.description}") return {"success": True, "message": "系统优化已执行"} async def _execute_user_notify(self, action: AgentAction) -> Dict[str, Any]: """执行用户通知动作""" # 这里实现具体的用户通知逻辑 logger.info(f"执行用户通知: {action.description}") return {"success": True, "message": "用户已通知"}