Files
tsp-assistant/comprehensive_frontend_test.py

506 lines
20 KiB
Python
Raw Permalink Normal View History

2025-09-06 21:06:18 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TSP智能助手 - 全面前端功能测试脚本
测试所有前端页面的功能
"""
import requests
import json
import time
import os
import tempfile
from datetime import datetime
class TSPFrontendTester:
"""TSP前端功能测试器"""
def __init__(self, base_url="http://localhost:5000"):
self.base_url = base_url
self.session = requests.Session()
self.test_results = {
"total_tests": 0,
"passed": 0,
"failed": 0,
"errors": []
}
def log_test(self, test_name, success, message=""):
"""记录测试结果"""
self.test_results["total_tests"] += 1
if success:
self.test_results["passed"] += 1
print(f"{test_name}: {message}")
else:
self.test_results["failed"] += 1
self.test_results["errors"].append(f"{test_name}: {message}")
print(f"{test_name}: {message}")
def test_server_connection(self):
"""测试服务器连接"""
print("\n" + "="*60)
print("🔗 测试服务器连接")
print("="*60)
try:
response = self.session.get(f"{self.base_url}/")
if response.status_code == 200:
self.log_test("服务器连接", True, "服务器响应正常")
return True
else:
self.log_test("服务器连接", False, f"HTTP {response.status_code}")
return False
except requests.exceptions.ConnectionError:
self.log_test("服务器连接", False, "无法连接到服务器")
return False
except Exception as e:
self.log_test("服务器连接", False, f"连接错误: {e}")
return False
def test_health_endpoint(self):
"""测试健康检查端点"""
print("\n" + "="*60)
print("🏥 测试健康检查")
print("="*60)
try:
response = self.session.get(f"{self.base_url}/api/health")
if response.status_code == 200:
data = response.json()
self.log_test("健康检查", True, f"状态: {data.get('status', 'unknown')}")
return True
else:
self.log_test("健康检查", False, f"HTTP {response.status_code}")
return False
except Exception as e:
self.log_test("健康检查", False, f"请求错误: {e}")
return False
def test_agent_functionality(self):
"""测试Agent功能"""
print("\n" + "="*60)
print("🤖 测试Agent功能")
print("="*60)
# 1. 获取Agent状态
try:
response = self.session.get(f"{self.base_url}/api/agent/status")
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.log_test("获取Agent状态", True, f"状态: {data.get('status', 'unknown')}")
else:
self.log_test("获取Agent状态", False, "返回失败状态")
else:
self.log_test("获取Agent状态", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("获取Agent状态", False, f"请求错误: {e}")
# 2. 切换Agent模式
try:
response = self.session.post(f"{self.base_url}/api/agent/toggle",
json={"enabled": True})
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.log_test("切换Agent模式", True, data.get("message", "切换成功"))
else:
self.log_test("切换Agent模式", False, "切换失败")
else:
self.log_test("切换Agent模式", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("切换Agent模式", False, f"请求错误: {e}")
# 3. 启动Agent监控
try:
response = self.session.post(f"{self.base_url}/api/agent/monitoring/start")
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.log_test("启动Agent监控", True, data.get("message", "启动成功"))
else:
self.log_test("启动Agent监控", False, "启动失败")
else:
self.log_test("启动Agent监控", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("启动Agent监控", False, f"请求错误: {e}")
# 4. 运行主动监控
try:
response = self.session.post(f"{self.base_url}/api/agent/proactive-monitoring")
if response.status_code == 200:
data = response.json()
if data.get("success"):
actions = data.get("proactive_actions", [])
self.log_test("运行主动监控", True, f"发现 {len(actions)} 个行动机会")
else:
self.log_test("运行主动监控", False, "监控失败")
else:
self.log_test("运行主动监控", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("运行主动监控", False, f"请求错误: {e}")
# 5. 运行智能分析
try:
response = self.session.post(f"{self.base_url}/api/agent/intelligent-analysis")
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.log_test("运行智能分析", True, "分析完成")
else:
self.log_test("运行智能分析", False, "分析失败")
else:
self.log_test("运行智能分析", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("运行智能分析", False, f"请求错误: {e}")
def test_knowledge_management(self):
"""测试知识库管理功能"""
print("\n" + "="*60)
print("📚 测试知识库管理")
print("="*60)
# 1. 获取知识库列表
try:
response = self.session.get(f"{self.base_url}/api/knowledge")
if response.status_code == 200:
knowledge = response.json()
self.log_test("获取知识库列表", True, f"{len(knowledge)} 条知识")
else:
self.log_test("获取知识库列表", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("获取知识库列表", False, f"请求错误: {e}")
# 2. 添加知识库条目
test_knowledge = {
"question": f"测试问题 - {datetime.now().strftime('%H:%M:%S')}",
"answer": "这是一个测试答案,用于验证知识库添加功能是否正常工作。",
"category": "技术问题",
"confidence_score": 0.9
}
try:
response = self.session.post(f"{self.base_url}/api/knowledge",
json=test_knowledge)
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.log_test("添加知识库条目", True, data.get("message", "添加成功"))
else:
self.log_test("添加知识库条目", False, "添加失败")
else:
self.log_test("添加知识库条目", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("添加知识库条目", False, f"请求错误: {e}")
# 3. 搜索知识库
try:
response = self.session.get(f"{self.base_url}/api/knowledge/search?q=测试")
if response.status_code == 200:
results = response.json()
self.log_test("搜索知识库", True, f"找到 {len(results)} 条结果")
else:
self.log_test("搜索知识库", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("搜索知识库", False, f"请求错误: {e}")
# 4. 获取知识库统计
try:
response = self.session.get(f"{self.base_url}/api/knowledge/stats")
if response.status_code == 200:
stats = response.json()
self.log_test("获取知识库统计", True, f"总条目: {stats.get('total_entries', 0)}")
else:
self.log_test("获取知识库统计", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("获取知识库统计", False, f"请求错误: {e}")
def test_file_upload(self):
"""测试文件上传功能"""
print("\n" + "="*60)
print("📁 测试文件上传功能")
print("="*60)
# 创建测试文件
test_content = """
TSP智能助手使用指南
1. 系统启动
- 运行 python start_dashboard.py
- 访问 http://localhost:5000
2. 主要功能
- Agent管理智能助手模式切换
- 知识库管理添加搜索上传文件
- 工单管理创建跟踪工单
- 预警管理系统监控和预警
3. 常见问题
Q: 如何启动Agent模式
A: 在Agent管理页面点击开关即可启动
Q: 如何添加知识库
A: 可以手动添加或上传文件自动生成
Q: 系统支持哪些文件格式
A: 支持TXTPDFDOCDOCXMD格式
"""
# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8') as f:
f.write(test_content)
temp_file_path = f.name
try:
# 上传文件
with open(temp_file_path, 'rb') as f:
files = {'file': ('test_guide.txt', f, 'text/plain')}
data = {
'process_method': 'auto',
'category': '技术问题',
'confidence_score': 0.8
}
response = self.session.post(f"{self.base_url}/api/knowledge/upload",
files=files, data=data)
if response.status_code == 200:
result = response.json()
if result.get("success"):
self.log_test("文件上传", True,
f"成功生成 {result.get('knowledge_count', 0)} 条知识")
else:
self.log_test("文件上传", False, result.get("error", "上传失败"))
else:
self.log_test("文件上传", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("文件上传", False, f"请求错误: {e}")
finally:
# 清理临时文件
try:
os.unlink(temp_file_path)
except:
pass
def test_work_order_management(self):
"""测试工单管理功能"""
print("\n" + "="*60)
print("📋 测试工单管理")
print("="*60)
# 1. 获取工单列表
try:
response = self.session.get(f"{self.base_url}/api/workorders")
if response.status_code == 200:
workorders = response.json()
self.log_test("获取工单列表", True, f"{len(workorders)} 个工单")
else:
self.log_test("获取工单列表", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("获取工单列表", False, f"请求错误: {e}")
# 2. 创建工单
test_workorder = {
"title": f"测试工单 - {datetime.now().strftime('%H:%M:%S')}",
"description": "这是一个测试工单,用于验证工单创建功能。",
"priority": "medium",
"category": "技术问题"
}
try:
response = self.session.post(f"{self.base_url}/api/workorders",
json=test_workorder)
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.log_test("创建工单", True, data.get("message", "创建成功"))
else:
self.log_test("创建工单", False, "创建失败")
else:
self.log_test("创建工单", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("创建工单", False, f"请求错误: {e}")
def test_analytics(self):
"""测试数据分析功能"""
print("\n" + "="*60)
print("📊 测试数据分析")
print("="*60)
try:
response = self.session.get(f"{self.base_url}/api/analytics")
if response.status_code == 200:
analytics = response.json()
self.log_test("获取分析数据", True, "数据获取成功")
else:
self.log_test("获取分析数据", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("获取分析数据", False, f"请求错误: {e}")
def test_system_settings(self):
"""测试系统设置功能"""
print("\n" + "="*60)
print("⚙️ 测试系统设置")
print("="*60)
# 1. 获取系统设置
try:
response = self.session.get(f"{self.base_url}/api/settings")
if response.status_code == 200:
settings = response.json()
self.log_test("获取系统设置", True, "设置获取成功")
else:
self.log_test("获取系统设置", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("获取系统设置", False, f"请求错误: {e}")
# 2. 获取系统信息
try:
response = self.session.get(f"{self.base_url}/api/system/info")
if response.status_code == 200:
info = response.json()
self.log_test("获取系统信息", True, f"版本: {info.get('version', 'unknown')}")
else:
self.log_test("获取系统信息", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("获取系统信息", False, f"请求错误: {e}")
def test_chat_functionality(self):
"""测试聊天功能"""
print("\n" + "="*60)
print("💬 测试聊天功能")
print("="*60)
# 1. 创建聊天会话
try:
response = self.session.post(f"{self.base_url}/api/chat/session")
if response.status_code == 200:
data = response.json()
if data.get("success"):
session_id = data.get("session_id")
self.log_test("创建聊天会话", True, f"会话ID: {session_id}")
# 2. 发送消息
test_message = {
"message": "你好,这是一个测试消息",
"session_id": session_id
}
response = self.session.post(f"{self.base_url}/api/chat/message",
json=test_message)
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.log_test("发送聊天消息", True, "消息发送成功")
else:
self.log_test("发送聊天消息", False, "消息发送失败")
else:
self.log_test("发送聊天消息", False, f"HTTP {response.status_code}")
else:
self.log_test("创建聊天会话", False, "会话创建失败")
else:
self.log_test("创建聊天会话", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("创建聊天会话", False, f"请求错误: {e}")
def test_alert_management(self):
"""测试预警管理功能"""
print("\n" + "="*60)
print("🚨 测试预警管理")
print("="*60)
# 1. 获取预警列表
try:
response = self.session.get(f"{self.base_url}/api/alerts")
if response.status_code == 200:
alerts = response.json()
self.log_test("获取预警列表", True, f"{len(alerts)} 个预警")
else:
self.log_test("获取预警列表", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("获取预警列表", False, f"请求错误: {e}")
# 2. 获取预警规则
try:
response = self.session.get(f"{self.base_url}/api/alerts/rules")
if response.status_code == 200:
rules = response.json()
self.log_test("获取预警规则", True, f"{len(rules)} 条规则")
else:
self.log_test("获取预警规则", False, f"HTTP {response.status_code}")
except Exception as e:
self.log_test("获取预警规则", False, f"请求错误: {e}")
def run_all_tests(self):
"""运行所有测试"""
print("🚀 TSP智能助手 - 全面前端功能测试")
print("="*60)
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"测试目标: {self.base_url}")
# 基础连接测试
if not self.test_server_connection():
print("\n❌ 服务器连接失败,请确保服务已启动")
return
# 运行所有功能测试
self.test_health_endpoint()
self.test_agent_functionality()
self.test_knowledge_management()
self.test_file_upload()
self.test_work_order_management()
self.test_analytics()
self.test_system_settings()
self.test_chat_functionality()
self.test_alert_management()
# 输出测试结果
self.print_test_summary()
def print_test_summary(self):
"""打印测试总结"""
print("\n" + "="*60)
print("📊 测试结果总结")
print("="*60)
total = self.test_results["total_tests"]
passed = self.test_results["passed"]
failed = self.test_results["failed"]
print(f"总测试数: {total}")
print(f"通过: {passed}")
print(f"失败: {failed}")
print(f"成功率: {(passed/total*100):.1f}%" if total > 0 else "成功率: 0%")
if failed > 0:
print("\n❌ 失败的测试:")
for error in self.test_results["errors"]:
print(f" - {error}")
print("\n" + "="*60)
if failed == 0:
print("🎉 所有测试通过!系统功能正常")
else:
print("⚠️ 部分测试失败,请检查相关功能")
print("="*60)
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='TSP智能助手前端功能测试')
parser.add_argument('--url', default='http://localhost:5000',
help='服务器地址 (默认: http://localhost:5000)')
parser.add_argument('--verbose', action='store_true',
help='详细输出')
args = parser.parse_args()
tester = TSPFrontendTester(args.url)
tester.run_all_tests()
if __name__ == "__main__":
main()