#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ TSP智能助手 - 全面前端功能测试脚本 测试所有前端页面的功能 """ import requests import json import time import os import tempfile from datetime import datetime class TSPFrontendTester: """TSP前端功能测试器""" def __init__(self, base_url="http://localhost:5000"): self.base_url = base_url self.session = requests.Session() self.test_results = { "total_tests": 0, "passed": 0, "failed": 0, "errors": [] } def log_test(self, test_name, success, message=""): """记录测试结果""" self.test_results["total_tests"] += 1 if success: self.test_results["passed"] += 1 print(f"✅ {test_name}: {message}") else: self.test_results["failed"] += 1 self.test_results["errors"].append(f"{test_name}: {message}") print(f"❌ {test_name}: {message}") def test_server_connection(self): """测试服务器连接""" print("\n" + "="*60) print("🔗 测试服务器连接") print("="*60) try: response = self.session.get(f"{self.base_url}/") if response.status_code == 200: self.log_test("服务器连接", True, "服务器响应正常") return True else: self.log_test("服务器连接", False, f"HTTP {response.status_code}") return False except requests.exceptions.ConnectionError: self.log_test("服务器连接", False, "无法连接到服务器") return False except Exception as e: self.log_test("服务器连接", False, f"连接错误: {e}") return False def test_health_endpoint(self): """测试健康检查端点""" print("\n" + "="*60) print("🏥 测试健康检查") print("="*60) try: response = self.session.get(f"{self.base_url}/api/health") if response.status_code == 200: data = response.json() self.log_test("健康检查", True, f"状态: {data.get('status', 'unknown')}") return True else: self.log_test("健康检查", False, f"HTTP {response.status_code}") return False except Exception as e: self.log_test("健康检查", False, f"请求错误: {e}") return False def test_agent_functionality(self): """测试Agent功能""" print("\n" + "="*60) print("🤖 测试Agent功能") print("="*60) # 1. 获取Agent状态 try: response = self.session.get(f"{self.base_url}/api/agent/status") if response.status_code == 200: data = response.json() if data.get("success"): self.log_test("获取Agent状态", True, f"状态: {data.get('status', 'unknown')}") else: self.log_test("获取Agent状态", False, "返回失败状态") else: self.log_test("获取Agent状态", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("获取Agent状态", False, f"请求错误: {e}") # 2. 切换Agent模式 try: response = self.session.post(f"{self.base_url}/api/agent/toggle", json={"enabled": True}) if response.status_code == 200: data = response.json() if data.get("success"): self.log_test("切换Agent模式", True, data.get("message", "切换成功")) else: self.log_test("切换Agent模式", False, "切换失败") else: self.log_test("切换Agent模式", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("切换Agent模式", False, f"请求错误: {e}") # 3. 启动Agent监控 try: response = self.session.post(f"{self.base_url}/api/agent/monitoring/start") if response.status_code == 200: data = response.json() if data.get("success"): self.log_test("启动Agent监控", True, data.get("message", "启动成功")) else: self.log_test("启动Agent监控", False, "启动失败") else: self.log_test("启动Agent监控", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("启动Agent监控", False, f"请求错误: {e}") # 4. 运行主动监控 try: response = self.session.post(f"{self.base_url}/api/agent/proactive-monitoring") if response.status_code == 200: data = response.json() if data.get("success"): actions = data.get("proactive_actions", []) self.log_test("运行主动监控", True, f"发现 {len(actions)} 个行动机会") else: self.log_test("运行主动监控", False, "监控失败") else: self.log_test("运行主动监控", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("运行主动监控", False, f"请求错误: {e}") # 5. 运行智能分析 try: response = self.session.post(f"{self.base_url}/api/agent/intelligent-analysis") if response.status_code == 200: data = response.json() if data.get("success"): self.log_test("运行智能分析", True, "分析完成") else: self.log_test("运行智能分析", False, "分析失败") else: self.log_test("运行智能分析", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("运行智能分析", False, f"请求错误: {e}") def test_knowledge_management(self): """测试知识库管理功能""" print("\n" + "="*60) print("📚 测试知识库管理") print("="*60) # 1. 获取知识库列表 try: response = self.session.get(f"{self.base_url}/api/knowledge") if response.status_code == 200: knowledge = response.json() self.log_test("获取知识库列表", True, f"共 {len(knowledge)} 条知识") else: self.log_test("获取知识库列表", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("获取知识库列表", False, f"请求错误: {e}") # 2. 添加知识库条目 test_knowledge = { "question": f"测试问题 - {datetime.now().strftime('%H:%M:%S')}", "answer": "这是一个测试答案,用于验证知识库添加功能是否正常工作。", "category": "技术问题", "confidence_score": 0.9 } try: response = self.session.post(f"{self.base_url}/api/knowledge", json=test_knowledge) if response.status_code == 200: data = response.json() if data.get("success"): self.log_test("添加知识库条目", True, data.get("message", "添加成功")) else: self.log_test("添加知识库条目", False, "添加失败") else: self.log_test("添加知识库条目", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("添加知识库条目", False, f"请求错误: {e}") # 3. 搜索知识库 try: response = self.session.get(f"{self.base_url}/api/knowledge/search?q=测试") if response.status_code == 200: results = response.json() self.log_test("搜索知识库", True, f"找到 {len(results)} 条结果") else: self.log_test("搜索知识库", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("搜索知识库", False, f"请求错误: {e}") # 4. 获取知识库统计 try: response = self.session.get(f"{self.base_url}/api/knowledge/stats") if response.status_code == 200: stats = response.json() self.log_test("获取知识库统计", True, f"总条目: {stats.get('total_entries', 0)}") else: self.log_test("获取知识库统计", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("获取知识库统计", False, f"请求错误: {e}") def test_file_upload(self): """测试文件上传功能""" print("\n" + "="*60) print("📁 测试文件上传功能") print("="*60) # 创建测试文件 test_content = """ TSP智能助手使用指南 1. 系统启动 - 运行 python start_dashboard.py - 访问 http://localhost:5000 2. 主要功能 - Agent管理:智能助手模式切换 - 知识库管理:添加、搜索、上传文件 - 工单管理:创建、跟踪工单 - 预警管理:系统监控和预警 3. 常见问题 Q: 如何启动Agent模式? A: 在Agent管理页面点击开关即可启动 Q: 如何添加知识库? A: 可以手动添加或上传文件自动生成 Q: 系统支持哪些文件格式? A: 支持TXT、PDF、DOC、DOCX、MD格式 """ # 创建临时文件 with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8') as f: f.write(test_content) temp_file_path = f.name try: # 上传文件 with open(temp_file_path, 'rb') as f: files = {'file': ('test_guide.txt', f, 'text/plain')} data = { 'process_method': 'auto', 'category': '技术问题', 'confidence_score': 0.8 } response = self.session.post(f"{self.base_url}/api/knowledge/upload", files=files, data=data) if response.status_code == 200: result = response.json() if result.get("success"): self.log_test("文件上传", True, f"成功生成 {result.get('knowledge_count', 0)} 条知识") else: self.log_test("文件上传", False, result.get("error", "上传失败")) else: self.log_test("文件上传", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("文件上传", False, f"请求错误: {e}") finally: # 清理临时文件 try: os.unlink(temp_file_path) except: pass def test_work_order_management(self): """测试工单管理功能""" print("\n" + "="*60) print("📋 测试工单管理") print("="*60) # 1. 获取工单列表 try: response = self.session.get(f"{self.base_url}/api/workorders") if response.status_code == 200: workorders = response.json() self.log_test("获取工单列表", True, f"共 {len(workorders)} 个工单") else: self.log_test("获取工单列表", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("获取工单列表", False, f"请求错误: {e}") # 2. 创建工单 test_workorder = { "title": f"测试工单 - {datetime.now().strftime('%H:%M:%S')}", "description": "这是一个测试工单,用于验证工单创建功能。", "priority": "medium", "category": "技术问题" } try: response = self.session.post(f"{self.base_url}/api/workorders", json=test_workorder) if response.status_code == 200: data = response.json() if data.get("success"): self.log_test("创建工单", True, data.get("message", "创建成功")) else: self.log_test("创建工单", False, "创建失败") else: self.log_test("创建工单", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("创建工单", False, f"请求错误: {e}") def test_analytics(self): """测试数据分析功能""" print("\n" + "="*60) print("📊 测试数据分析") print("="*60) try: response = self.session.get(f"{self.base_url}/api/analytics") if response.status_code == 200: analytics = response.json() self.log_test("获取分析数据", True, "数据获取成功") else: self.log_test("获取分析数据", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("获取分析数据", False, f"请求错误: {e}") def test_system_settings(self): """测试系统设置功能""" print("\n" + "="*60) print("⚙️ 测试系统设置") print("="*60) # 1. 获取系统设置 try: response = self.session.get(f"{self.base_url}/api/settings") if response.status_code == 200: settings = response.json() self.log_test("获取系统设置", True, "设置获取成功") else: self.log_test("获取系统设置", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("获取系统设置", False, f"请求错误: {e}") # 2. 获取系统信息 try: response = self.session.get(f"{self.base_url}/api/system/info") if response.status_code == 200: info = response.json() self.log_test("获取系统信息", True, f"版本: {info.get('version', 'unknown')}") else: self.log_test("获取系统信息", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("获取系统信息", False, f"请求错误: {e}") def test_chat_functionality(self): """测试聊天功能""" print("\n" + "="*60) print("💬 测试聊天功能") print("="*60) # 1. 创建聊天会话 try: response = self.session.post(f"{self.base_url}/api/chat/session") if response.status_code == 200: data = response.json() if data.get("success"): session_id = data.get("session_id") self.log_test("创建聊天会话", True, f"会话ID: {session_id}") # 2. 发送消息 test_message = { "message": "你好,这是一个测试消息", "session_id": session_id } response = self.session.post(f"{self.base_url}/api/chat/message", json=test_message) if response.status_code == 200: data = response.json() if data.get("success"): self.log_test("发送聊天消息", True, "消息发送成功") else: self.log_test("发送聊天消息", False, "消息发送失败") else: self.log_test("发送聊天消息", False, f"HTTP {response.status_code}") else: self.log_test("创建聊天会话", False, "会话创建失败") else: self.log_test("创建聊天会话", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("创建聊天会话", False, f"请求错误: {e}") def test_alert_management(self): """测试预警管理功能""" print("\n" + "="*60) print("🚨 测试预警管理") print("="*60) # 1. 获取预警列表 try: response = self.session.get(f"{self.base_url}/api/alerts") if response.status_code == 200: alerts = response.json() self.log_test("获取预警列表", True, f"共 {len(alerts)} 个预警") else: self.log_test("获取预警列表", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("获取预警列表", False, f"请求错误: {e}") # 2. 获取预警规则 try: response = self.session.get(f"{self.base_url}/api/alerts/rules") if response.status_code == 200: rules = response.json() self.log_test("获取预警规则", True, f"共 {len(rules)} 条规则") else: self.log_test("获取预警规则", False, f"HTTP {response.status_code}") except Exception as e: self.log_test("获取预警规则", False, f"请求错误: {e}") def run_all_tests(self): """运行所有测试""" print("🚀 TSP智能助手 - 全面前端功能测试") print("="*60) print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"测试目标: {self.base_url}") # 基础连接测试 if not self.test_server_connection(): print("\n❌ 服务器连接失败,请确保服务已启动") return # 运行所有功能测试 self.test_health_endpoint() self.test_agent_functionality() self.test_knowledge_management() self.test_file_upload() self.test_work_order_management() self.test_analytics() self.test_system_settings() self.test_chat_functionality() self.test_alert_management() # 输出测试结果 self.print_test_summary() def print_test_summary(self): """打印测试总结""" print("\n" + "="*60) print("📊 测试结果总结") print("="*60) total = self.test_results["total_tests"] passed = self.test_results["passed"] failed = self.test_results["failed"] print(f"总测试数: {total}") print(f"通过: {passed} ✅") print(f"失败: {failed} ❌") print(f"成功率: {(passed/total*100):.1f}%" if total > 0 else "成功率: 0%") if failed > 0: print("\n❌ 失败的测试:") for error in self.test_results["errors"]: print(f" - {error}") print("\n" + "="*60) if failed == 0: print("🎉 所有测试通过!系统功能正常") else: print("⚠️ 部分测试失败,请检查相关功能") print("="*60) def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='TSP智能助手前端功能测试') parser.add_argument('--url', default='http://localhost:5000', help='服务器地址 (默认: http://localhost:5000)') parser.add_argument('--verbose', action='store_true', help='详细输出') args = parser.parse_args() tester = TSPFrontendTester(args.url) tester.run_all_tests() if __name__ == "__main__": main()