#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ TSP助手预警管理Web应用 提供预警系统的Web界面和API接口 """ import sys import os import json from datetime import datetime, timedelta from flask import Flask, render_template, request, jsonify, redirect, url_for from flask_cors import CORS # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.main import TSPAssistant from src.agent_assistant import TSPAgentAssistant from src.analytics.alert_system import AlertRule, AlertLevel, AlertType from src.dialogue.realtime_chat import RealtimeChatManager from src.vehicle.vehicle_data_manager import VehicleDataManager app = Flask(__name__) CORS(app) # 初始化TSP助手和Agent助手 assistant = TSPAssistant() agent_assistant = TSPAgentAssistant() chat_manager = RealtimeChatManager() vehicle_manager = VehicleDataManager() @app.route('/') def index(): """主页 - 综合管理平台""" return render_template('dashboard.html') @app.route('/alerts') def alerts(): """预警管理页面""" return render_template('index.html') @app.route('/api/health') def get_health(): """获取系统健康状态""" try: health = assistant.get_system_health() return jsonify(health) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/alerts') def get_alerts(): """获取预警列表""" try: alerts = assistant.get_active_alerts() return jsonify(alerts) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/alerts', methods=['POST']) def create_alert(): """创建预警""" try: data = request.get_json() alert = assistant.create_alert( alert_type=data.get('alert_type', 'manual'), title=data.get('title', '手动预警'), description=data.get('description', ''), level=data.get('level', 'medium') ) return jsonify({"success": True, "alert": alert}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/alerts/statistics') def get_alert_statistics(): """获取预警统计""" try: stats = assistant.get_alert_statistics() return jsonify(stats) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/alerts//resolve', methods=['POST']) def resolve_alert(alert_id): """解决预警""" try: success = assistant.resolve_alert(alert_id) if success: return jsonify({"success": True, "message": "预警已解决"}) else: return jsonify({"success": False, "message": "解决预警失败"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/rules') def get_rules(): """获取预警规则列表""" try: rules = assistant.alert_system.rules rules_data = [] for name, rule in rules.items(): rules_data.append({ "name": rule.name, "description": rule.description, "alert_type": rule.alert_type.value, "level": rule.level.value, "threshold": rule.threshold, "condition": rule.condition, "enabled": rule.enabled, "check_interval": rule.check_interval, "cooldown": rule.cooldown }) return jsonify(rules_data) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/rules', methods=['POST']) def create_rule(): """创建预警规则""" try: data = request.get_json() rule = AlertRule( name=data['name'], description=data['description'], alert_type=AlertType(data['alert_type']), level=AlertLevel(data['level']), threshold=float(data['threshold']), condition=data['condition'], enabled=data.get('enabled', True), check_interval=int(data.get('check_interval', 300)), cooldown=int(data.get('cooldown', 3600)) ) success = assistant.alert_system.add_custom_rule(rule) if success: return jsonify({"success": True, "message": "规则创建成功"}) else: return jsonify({"success": False, "message": "规则创建失败"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/rules/', methods=['PUT']) def update_rule(rule_name): """更新预警规则""" try: data = request.get_json() success = assistant.alert_system.update_rule(rule_name, **data) if success: return jsonify({"success": True, "message": "规则更新成功"}) else: return jsonify({"success": False, "message": "规则更新失败"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/rules/', methods=['DELETE']) def delete_rule(rule_name): """删除预警规则""" try: success = assistant.alert_system.delete_rule(rule_name) if success: return jsonify({"success": True, "message": "规则删除成功"}) else: return jsonify({"success": False, "message": "规则删除失败"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/monitor/start', methods=['POST']) def start_monitoring(): """启动监控服务""" try: success = assistant.start_monitoring() if success: return jsonify({"success": True, "message": "监控服务已启动"}) else: return jsonify({"success": False, "message": "启动监控服务失败"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/monitor/stop', methods=['POST']) def stop_monitoring(): """停止监控服务""" try: success = assistant.stop_monitoring() if success: return jsonify({"success": True, "message": "监控服务已停止"}) else: return jsonify({"success": False, "message": "停止监控服务失败"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/monitor/status') def get_monitor_status(): """获取监控服务状态""" try: health = assistant.get_system_health() return jsonify({ "monitor_status": health.get("monitor_status", "unknown"), "health_score": health.get("health_score", 0), "active_alerts": health.get("active_alerts", 0) }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/check-alerts', methods=['POST']) def check_alerts(): """手动检查预警""" try: alerts = assistant.check_alerts() return jsonify({ "success": True, "alerts": alerts, "count": len(alerts) }) except Exception as e: return jsonify({"error": str(e)}), 500 # 实时对话相关路由 @app.route('/chat') def chat(): """实时对话页面 (WebSocket版本)""" return render_template('chat.html') @app.route('/chat-http') def chat_http(): """实时对话页面 (HTTP版本)""" return render_template('chat_http.html') @app.route('/api/chat/session', methods=['POST']) def create_chat_session(): """创建对话会话""" try: data = request.get_json() user_id = data.get('user_id', 'anonymous') work_order_id = data.get('work_order_id') session_id = chat_manager.create_session(user_id, work_order_id) return jsonify({ "success": True, "session_id": session_id, "message": "会话创建成功" }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/chat/message', methods=['POST']) def send_chat_message(): """发送聊天消息""" try: data = request.get_json() session_id = data.get('session_id') message = data.get('message') if not session_id or not message: return jsonify({"error": "缺少必要参数"}), 400 result = chat_manager.process_message(session_id, message) return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/chat/history/') def get_chat_history(session_id): """获取对话历史""" try: history = chat_manager.get_session_history(session_id) return jsonify({ "success": True, "history": history }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/chat/work-order', methods=['POST']) def create_work_order(): """创建工单""" try: data = request.get_json() session_id = data.get('session_id') title = data.get('title') description = data.get('description') category = data.get('category', '技术问题') priority = data.get('priority', 'medium') if not session_id or not title or not description: return jsonify({"error": "缺少必要参数"}), 400 result = chat_manager.create_work_order(session_id, title, description, category, priority) return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/chat/work-order/') def get_work_order_status(work_order_id): """获取工单状态""" try: result = chat_manager.get_work_order_status(work_order_id) return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/chat/session/', methods=['DELETE']) def end_chat_session(session_id): """结束对话会话""" try: success = chat_manager.end_session(session_id) return jsonify({ "success": success, "message": "会话已结束" if success else "结束会话失败" }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/chat/sessions') def get_active_sessions(): """获取活跃会话列表""" try: sessions = chat_manager.get_active_sessions() return jsonify({ "success": True, "sessions": sessions }) except Exception as e: return jsonify({"error": str(e)}), 500 # Agent相关API @app.route('/api/agent/status') def get_agent_status(): """获取Agent状态""" try: status = agent_assistant.get_agent_status() return jsonify(status) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/agent/toggle', methods=['POST']) def toggle_agent_mode(): """切换Agent模式""" try: data = request.get_json() enabled = data.get('enabled', True) success = agent_assistant.toggle_agent_mode(enabled) return jsonify({ "success": success, "message": f"Agent模式已{'启用' if enabled else '禁用'}" }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/agent/monitoring/start', methods=['POST']) def start_agent_monitoring(): """启动Agent监控""" try: success = agent_assistant.start_proactive_monitoring() return jsonify({ "success": success, "message": "Agent监控已启动" if success else "启动失败" }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/agent/monitoring/stop', methods=['POST']) def stop_agent_monitoring(): """停止Agent监控""" try: success = agent_assistant.stop_proactive_monitoring() return jsonify({ "success": success, "message": "Agent监控已停止" if success else "停止失败" }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/agent/proactive-monitoring', methods=['POST']) def proactive_monitoring(): """主动监控检查""" try: result = agent_assistant.run_proactive_monitoring() return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/agent/intelligent-analysis', methods=['POST']) def intelligent_analysis(): """智能分析""" try: analysis = agent_assistant.run_intelligent_analysis() return jsonify({"success": True, "analysis": analysis}) except Exception as e: return jsonify({"error": str(e)}), 500 # 知识库相关API @app.route('/api/knowledge') def get_knowledge(): """获取知识库列表""" try: # 获取分页参数 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) # 从数据库获取知识库数据 knowledge_entries = assistant.knowledge_manager.get_knowledge_entries( page=page, per_page=per_page ) return jsonify(knowledge_entries) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/knowledge/search') def search_knowledge(): """搜索知识库""" try: query = request.args.get('q', '') # 这里应该调用知识库管理器的搜索方法 results = assistant.search_knowledge(query, top_k=5) return jsonify(results.get('results', [])) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/knowledge', methods=['POST']) def add_knowledge(): """添加知识库条目""" try: data = request.get_json() success = assistant.knowledge_manager.add_knowledge_entry( question=data['question'], answer=data['answer'], category=data['category'], confidence_score=data['confidence_score'] ) return jsonify({"success": success, "message": "知识添加成功" if success else "添加失败"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/knowledge/stats') def get_knowledge_stats(): """获取知识库统计""" try: stats = assistant.knowledge_manager.get_knowledge_stats() return jsonify(stats) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/knowledge/upload', methods=['POST']) def upload_knowledge_file(): """上传文件并生成知识库""" try: if 'file' not in request.files: return jsonify({"error": "没有上传文件"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "没有选择文件"}), 400 # 保存文件到临时目录 import tempfile import os import uuid # 创建唯一的临时文件名 temp_filename = f"upload_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" temp_path = os.path.join(tempfile.gettempdir(), temp_filename) try: # 保存文件 file.save(temp_path) # 使用Agent助手处理文件 result = agent_assistant.process_file_to_knowledge(temp_path, file.filename) return jsonify(result) finally: # 确保删除临时文件 try: if os.path.exists(temp_path): os.unlink(temp_path) except Exception as cleanup_error: logger.warning(f"清理临时文件失败: {cleanup_error}") except Exception as e: logger.error(f"文件上传处理失败: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/knowledge/delete/', methods=['DELETE']) def delete_knowledge(knowledge_id): """删除知识库条目""" try: success = assistant.knowledge_manager.delete_knowledge_entry(knowledge_id) return jsonify({"success": success, "message": "删除成功" if success else "删除失败"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/knowledge/verify/', methods=['POST']) def verify_knowledge(knowledge_id): """验证知识库条目""" try: data = request.get_json() or {} verified_by = data.get('verified_by', 'admin') success = assistant.knowledge_manager.verify_knowledge_entry(knowledge_id, verified_by) return jsonify({"success": success, "message": "验证成功" if success else "验证失败"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/knowledge/unverify/', methods=['POST']) def unverify_knowledge(knowledge_id): """取消验证知识库条目""" try: success = assistant.knowledge_manager.unverify_knowledge_entry(knowledge_id) return jsonify({"success": success, "message": "取消验证成功" if success else "取消验证失败"}) except Exception as e: return jsonify({"error": str(e)}), 500 # 工单相关API @app.route('/api/workorders') def get_workorders(): """获取工单列表""" try: status_filter = request.args.get('status') priority_filter = request.args.get('priority') # 这里应该调用工单管理器的获取方法 workorders = [ { "id": 1, "title": "车辆无法远程启动", "description": "用户反映APP中远程启动功能无法使用", "category": "远程控制", "priority": "high", "status": "open", "created_at": "2024-01-01T10:00:00Z" }, { "id": 2, "title": "APP显示异常", "description": "APP中车辆信息显示不正确", "category": "APP功能", "priority": "medium", "status": "in_progress", "created_at": "2024-01-01T11:00:00Z" } ] # 应用过滤 if status_filter and status_filter != 'all': workorders = [w for w in workorders if w['status'] == status_filter] if priority_filter and priority_filter != 'all': workorders = [w for w in workorders if w['priority'] == priority_filter] return jsonify(workorders) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/workorders', methods=['POST']) def create_workorder(): """创建工单""" try: data = request.get_json() result = assistant.create_work_order( title=data['title'], description=data['description'], category=data['category'], priority=data['priority'] ) return jsonify({"success": True, "workorder": result}) except Exception as e: return jsonify({"error": str(e)}), 500 # 分析相关API @app.route('/api/analytics') def get_analytics(): """获取分析数据""" try: analytics = assistant.generate_analytics("last_7_days") return jsonify(analytics) except Exception as e: return jsonify({"error": str(e)}), 500 # 系统设置相关API @app.route('/api/settings') def get_settings(): """获取系统设置""" try: settings = { "api_timeout": 30, "max_history": 10, "refresh_interval": 10, "auto_monitoring": True, "agent_mode": True } return jsonify(settings) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/settings', methods=['POST']) def save_settings(): """保存系统设置""" try: data = request.get_json() # 这里应该保存设置到配置文件 return jsonify({"success": True, "message": "设置保存成功"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/system/info') def get_system_info(): """获取系统信息""" try: import sys import platform info = { "version": "1.0.0", "python_version": sys.version, "database": "SQLite", "uptime": "2天3小时", "memory_usage": 128 } return jsonify(info) except Exception as e: return jsonify({"error": str(e)}), 500 # 车辆数据相关API @app.route('/api/vehicle/data') def get_vehicle_data(): """获取车辆数据""" try: vehicle_id = request.args.get('vehicle_id') data_type = request.args.get('data_type') limit = request.args.get('limit', 10, type=int) if vehicle_id: data = vehicle_manager.get_vehicle_data(vehicle_id, data_type, limit) else: data = vehicle_manager.search_vehicle_data(limit=limit) return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/vehicle/data//latest') def get_latest_vehicle_data(vehicle_id): """获取车辆最新数据""" try: data = vehicle_manager.get_latest_vehicle_data(vehicle_id) return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/vehicle/data//summary') def get_vehicle_summary(vehicle_id): """获取车辆数据摘要""" try: summary = vehicle_manager.get_vehicle_summary(vehicle_id) return jsonify(summary) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/vehicle/data', methods=['POST']) def add_vehicle_data(): """添加车辆数据""" try: data = request.get_json() success = vehicle_manager.add_vehicle_data( vehicle_id=data['vehicle_id'], data_type=data['data_type'], data_value=data['data_value'], vehicle_vin=data.get('vehicle_vin') ) return jsonify({"success": success, "message": "数据添加成功" if success else "添加失败"}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/vehicle/init-sample-data', methods=['POST']) def init_sample_vehicle_data(): """初始化示例车辆数据""" try: success = vehicle_manager.add_sample_vehicle_data() return jsonify({"success": success, "message": "示例数据初始化成功" if success else "初始化失败"}) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)