Files
assist/init_database.py

805 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
TSP助手数据库初始化脚本 - 重构版本
结合项目新特性,提供更高效的数据库初始化和管理功能
"""
import sys
import os
import logging
import json
from typing import Dict, List, Optional, Any
from sqlalchemy import text, inspect
from datetime import datetime, timedelta
from pathlib import Path
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from src.config.config import Config
from src.utils.helpers import setup_logging
from src.core.database import db_manager
from src.core.models import Base, WorkOrder, KnowledgeEntry, Conversation, Analytics, Alert, VehicleData
class DatabaseInitializer:
"""数据库初始化器 - 重构版本"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.db_url = str(db_manager.engine.url)
self.is_mysql = 'mysql' in self.db_url
self.is_sqlite = 'sqlite' in self.db_url
self.is_postgresql = 'postgresql' in self.db_url
# 数据库版本信息
self.db_version = self._get_database_version()
# 迁移历史记录
self.migration_history = []
def _get_database_version(self) -> str:
"""获取数据库版本信息"""
try:
with db_manager.get_session() as session:
if self.is_mysql:
result = session.execute(text("SELECT VERSION()")).fetchone()
return f"MySQL {result[0]}"
elif self.is_postgresql:
result = session.execute(text("SELECT version()")).fetchone()
return f"PostgreSQL {result[0].split()[1]}"
else: # SQLite
result = session.execute(text("SELECT sqlite_version()")).fetchone()
return f"SQLite {result[0]}"
except Exception as e:
self.logger.warning(f"无法获取数据库版本: {e}")
return "Unknown"
def initialize_database(self, force_reset: bool = False) -> bool:
"""初始化数据库 - 主入口函数"""
print("=" * 80)
print("🚀 TSP智能助手数据库初始化系统")
print("=" * 80)
print(f"📊 数据库类型: {self.db_version}")
print(f"🔗 连接地址: {self.db_url}")
print(f"⏰ 初始化时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
try:
# 设置日志
setup_logging(Config.LOG_LEVEL, Config.LOG_FILE)
# 测试数据库连接
if not self._test_connection():
return False
# 检查是否需要重置数据库
if force_reset:
if not self._reset_database():
return False
# 创建数据库表
if not self._create_tables():
return False
# 执行数据库迁移
if not self._run_migrations():
return False
# 插入初始数据
if not self._insert_initial_data():
return False
# 验证数据库完整性
if not self._verify_database_integrity():
return False
# 生成初始化报告
self._generate_init_report()
print("\n" + "=" * 80)
print("🎉 数据库初始化完成!")
print("=" * 80)
return True
except Exception as e:
print(f"\n❌ 数据库初始化失败: {e}")
self.logger.error(f"数据库初始化失败: {e}", exc_info=True)
return False
def _test_connection(self) -> bool:
"""测试数据库连接"""
print("\n🔌 测试数据库连接...")
try:
if db_manager.test_connection():
print("✅ 数据库连接成功")
return True
else:
print("❌ 数据库连接失败")
return False
except Exception as e:
print(f"❌ 数据库连接测试异常: {e}")
return False
def _reset_database(self) -> bool:
"""重置数据库(谨慎使用)"""
print("\n⚠️ 重置数据库...")
try:
# 删除所有表
Base.metadata.drop_all(bind=db_manager.engine)
print("✅ 数据库表删除成功")
# 重新创建所有表
Base.metadata.create_all(bind=db_manager.engine)
print("✅ 数据库表重新创建成功")
return True
except Exception as e:
print(f"❌ 数据库重置失败: {e}")
return False
def _create_tables(self) -> bool:
"""创建数据库表"""
print("\n📋 创建数据库表...")
try:
# 获取现有表信息
inspector = inspect(db_manager.engine)
existing_tables = inspector.get_table_names()
# 创建所有表
Base.metadata.create_all(bind=db_manager.engine)
# 检查新创建的表
new_tables = inspector.get_table_names()
created_tables = set(new_tables) - set(existing_tables)
if created_tables:
print(f"✅ 新创建表: {', '.join(created_tables)}")
else:
print("✅ 所有表已存在")
return True
except Exception as e:
print(f"❌ 创建数据库表失败: {e}")
return False
def _run_migrations(self) -> bool:
"""执行数据库迁移"""
print("\n🔄 执行数据库迁移...")
migrations = [
self._migrate_knowledge_verification_fields,
self._migrate_alert_severity_field,
self._migrate_vehicle_data_table,
self._migrate_conversation_enhancements,
self._migrate_workorder_enhancements,
self._migrate_workorder_suggestions_enhancements,
self._migrate_analytics_enhancements,
self._migrate_system_optimization_fields
]
success_count = 0
for migration in migrations:
try:
if migration():
success_count += 1
except Exception as e:
self.logger.error(f"迁移失败: {migration.__name__}: {e}")
print(f"⚠️ 迁移 {migration.__name__} 失败: {e}")
print(f"✅ 完成 {success_count}/{len(migrations)} 个迁移")
return success_count > 0
def _migrate_knowledge_verification_fields(self) -> bool:
"""迁移知识库验证字段"""
print(" 📝 检查知识库验证字段...")
fields_to_add = [
('is_verified', 'BOOLEAN DEFAULT FALSE'),
('verified_by', 'VARCHAR(100)'),
('verified_at', 'DATETIME')
]
return self._add_table_columns('knowledge_entries', fields_to_add)
def _migrate_alert_severity_field(self) -> bool:
"""迁移预警严重程度字段"""
print(" 🚨 检查预警严重程度字段...")
fields_to_add = [
('severity', 'VARCHAR(20) DEFAULT \'medium\'')
]
return self._add_table_columns('alerts', fields_to_add)
def _migrate_vehicle_data_table(self) -> bool:
"""迁移车辆数据表"""
print(" 🚗 检查车辆数据表...")
try:
with db_manager.get_session() as session:
# 检查表是否存在
inspector = inspect(db_manager.engine)
if 'vehicle_data' not in inspector.get_table_names():
print(" 创建vehicle_data表...")
VehicleData.__table__.create(session.bind, checkfirst=True)
print(" ✅ vehicle_data表创建成功")
else:
print(" ✅ vehicle_data表已存在")
session.commit()
return True
except Exception as e:
print(f" ❌ 车辆数据表迁移失败: {e}")
return False
def _migrate_conversation_enhancements(self) -> bool:
"""迁移对话增强字段"""
print(" 💬 检查对话增强字段...")
fields_to_add = [
('response_time', 'FLOAT'),
('user_satisfaction', 'INTEGER'),
('ai_confidence', 'FLOAT'),
('context_data', 'TEXT')
]
return self._add_table_columns('conversations', fields_to_add)
def _migrate_workorder_enhancements(self) -> bool:
"""迁移工单增强字段"""
print(" 📋 检查工单增强字段...")
fields_to_add = [
('ai_suggestion', 'TEXT'),
('human_resolution', 'TEXT'),
('ai_similarity', 'FLOAT'),
('ai_approved', 'BOOLEAN DEFAULT FALSE'),
('feishu_record_id', 'VARCHAR(100)'),
('sync_status', 'VARCHAR(20) DEFAULT \'pending\''),
# 飞书集成扩展字段
('source', 'VARCHAR(50)'),
('module', 'VARCHAR(100)'),
('created_by', 'VARCHAR(100)'),
('wilfulness', 'VARCHAR(100)'),
('date_of_close', 'DATETIME'),
('vehicle_type', 'VARCHAR(100)'),
('vin_sim', 'VARCHAR(50)'),
('app_remote_control_version', 'VARCHAR(100)'),
('hmi_sw', 'VARCHAR(100)'),
('parent_record', 'VARCHAR(100)'),
('has_updated_same_day', 'VARCHAR(50)'),
('operating_time', 'VARCHAR(100)')
]
return self._add_table_columns('work_orders', fields_to_add)
def _migrate_workorder_suggestions_enhancements(self) -> bool:
"""迁移工单建议表增强字段"""
print(" 💡 检查工单建议表增强字段...")
fields_to_add = [
('use_human_resolution', 'BOOLEAN DEFAULT FALSE') # 是否使用人工描述入库
]
return self._add_table_columns('work_order_suggestions', fields_to_add)
def _migrate_analytics_enhancements(self) -> bool:
"""迁移分析增强字段"""
print(" 📊 检查分析增强字段...")
fields_to_add = [
('performance_score', 'FLOAT'),
('quality_metrics', 'TEXT'),
('cost_analysis', 'TEXT'),
('optimization_suggestions', 'TEXT')
]
return self._add_table_columns('analytics', fields_to_add)
def _migrate_system_optimization_fields(self) -> bool:
"""迁移系统优化字段"""
print(" ⚙️ 检查系统优化字段...")
# 为各个表添加系统优化相关字段
tables_and_fields = {
'conversations': [
('processing_time', 'FLOAT'),
('memory_usage', 'FLOAT'),
('cpu_usage', 'FLOAT')
],
'work_orders': [
('processing_efficiency', 'FLOAT'),
('resource_usage', 'TEXT')
],
'knowledge_entries': [
('search_frequency', 'INTEGER DEFAULT 0'),
('last_accessed', 'DATETIME'),
('relevance_score', 'FLOAT')
]
}
success = True
for table_name, fields in tables_and_fields.items():
if not self._add_table_columns(table_name, fields):
success = False
return success
def _add_table_columns(self, table_name: str, fields: List[tuple]) -> bool:
"""为表添加字段"""
try:
added_count = 0
skipped_count = 0
for field_name, field_type in fields:
try:
if self._column_exists(table_name, field_name):
skipped_count += 1
continue
print(f" 添加字段 {table_name}.{field_name}...")
# 使用单独的会话添加每个字段,避免长时间锁定
with db_manager.get_session() as session:
alter_sql = f"ALTER TABLE {table_name} ADD COLUMN {field_name} {field_type}"
session.execute(text(alter_sql))
session.commit()
print(f" ✅ 字段 {field_name} 添加成功")
added_count += 1
except Exception as field_error:
print(f" ⚠️ 字段 {field_name} 添加失败: {field_error}")
# 继续处理其他字段,不中断整个过程
if added_count > 0:
print(f" 📊 成功添加 {added_count} 个字段,跳过 {skipped_count} 个已存在字段")
else:
print(f" 📊 所有字段都已存在,跳过 {skipped_count} 个字段")
return True
except Exception as e:
print(f" ❌ 添加字段过程失败: {e}")
return False
def _column_exists(self, table_name: str, column_name: str) -> bool:
"""检查字段是否存在"""
try:
with db_manager.get_session() as session:
if self.is_mysql:
result = session.execute(text("""
SELECT COUNT(*) as count
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = :table_name
AND COLUMN_NAME = :column_name
"""), {"table_name": table_name, "column_name": column_name}).fetchone()
elif self.is_postgresql:
result = session.execute(text("""
SELECT COUNT(*) as count
FROM information_schema.columns
WHERE table_name = :table_name
AND column_name = :column_name
"""), {"table_name": table_name, "column_name": column_name}).fetchone()
else: # SQLite
result = session.execute(text("""
SELECT COUNT(*) as count
FROM pragma_table_info(:table_name)
WHERE name = :column_name
"""), {"table_name": table_name, "column_name": column_name}).fetchone()
return result[0] > 0
except Exception:
return False
def _insert_initial_data(self) -> bool:
"""插入初始数据"""
print("\n📊 插入初始数据...")
try:
with db_manager.get_session() as session:
# 检查是否已有数据
existing_count = session.query(KnowledgeEntry).count()
if existing_count > 0:
print(f" ✅ 数据库中已有 {existing_count} 条知识库条目,跳过初始数据插入")
return True
# 插入初始知识库数据
initial_data = self._get_initial_knowledge_data()
for data in initial_data:
entry = KnowledgeEntry(**data)
session.add(entry)
session.commit()
print(f" ✅ 成功插入 {len(initial_data)} 条知识库条目")
# 添加示例车辆数据
self._add_sample_vehicle_data()
# 验证现有知识库条目
self._verify_existing_knowledge()
return True
except Exception as e:
print(f" ❌ 插入初始数据失败: {e}")
return False
def _get_initial_knowledge_data(self) -> List[Dict[str, Any]]:
"""获取初始知识库数据"""
return [
{
"question": "如何重置密码?",
"answer": "您可以通过以下步骤重置密码1. 点击登录页面的'忘记密码'链接 2. 输入您的邮箱地址 3. 检查邮箱并点击重置链接 4. 设置新密码",
"category": "账户问题",
"confidence_score": 0.9,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.9
},
{
"question": "账户被锁定了怎么办?",
"answer": "如果您的账户被锁定请尝试以下解决方案1. 等待15分钟后重试登录 2. 如果问题持续请联系客服并提供您的用户ID",
"category": "账户问题",
"confidence_score": 0.8,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.8
},
{
"question": "如何修改个人信息?",
"answer": "您可以在个人设置页面修改个人信息1. 登录后点击右上角的个人头像 2. 选择'个人设置' 3. 修改相关信息并保存",
"category": "账户问题",
"confidence_score": 0.7,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.7
},
{
"question": "支付失败怎么办?",
"answer": "如果支付失败请检查1. 银行卡余额是否充足 2. 银行卡是否支持在线支付 3. 网络连接是否正常 4. 如果问题持续,请联系支付客服",
"category": "支付问题",
"confidence_score": 0.8,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.8
},
{
"question": "如何申请退款?",
"answer": "申请退款流程1. 在订单详情页面点击'申请退款' 2. 选择退款原因 3. 填写退款说明 4. 提交申请后等待审核",
"category": "支付问题",
"confidence_score": 0.7,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.7
},
{
"question": "系统无法访问怎么办?",
"answer": "如果系统无法访问请尝试1. 检查网络连接 2. 清除浏览器缓存 3. 尝试使用其他浏览器 4. 如果问题持续,请联系技术支持",
"category": "技术问题",
"confidence_score": 0.8,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.8
},
{
"question": "如何联系客服?",
"answer": "您可以通过以下方式联系客服1. 在线客服:点击页面右下角的客服图标 2. 电话客服400-123-4567 3. 邮箱客服support@example.com",
"category": "服务问题",
"confidence_score": 0.9,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.9
},
{
"question": "如何远程启动车辆?",
"answer": "远程启动车辆需要满足以下条件1. 车辆处于P档 2. 手刹拉起 3. 车门已锁 4. 电池电量充足。操作步骤打开APP → 点击远程启动按钮 → 确认启动条件 → 等待启动完成",
"category": "远程控制",
"confidence_score": 0.9,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.9
},
{
"question": "APP显示车辆信息错误怎么办",
"answer": "如果APP显示车辆信息错误请尝试1. 重新登录APP 2. 重启车辆系统 3. 检查网络连接 4. 如果问题持续,请联系客服",
"category": "APP功能",
"confidence_score": 0.8,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.8
},
{
"question": "车辆无法远程启动的原因?",
"answer": "车辆无法远程启动的常见原因1. 车辆不在P档 2. 手刹未拉起 3. 车门未锁 4. 电池电量不足 5. 网络信号差 6. 车辆系统故障",
"category": "远程控制",
"confidence_score": 0.9,
"is_verified": True,
"verified_by": "system",
"verified_at": datetime.now(),
"search_frequency": 0,
"relevance_score": 0.9
}
]
def _add_sample_vehicle_data(self) -> bool:
"""添加示例车辆数据"""
try:
from src.vehicle.vehicle_data_manager import VehicleDataManager
vehicle_manager = VehicleDataManager()
success = vehicle_manager.add_sample_vehicle_data()
if success:
print(" ✅ 示例车辆数据添加成功")
else:
print(" ❌ 示例车辆数据添加失败")
return success
except Exception as e:
print(f" ❌ 添加示例车辆数据失败: {e}")
return False
def _verify_existing_knowledge(self) -> bool:
"""验证现有的知识库条目"""
try:
with db_manager.get_session() as session:
# 获取所有未验证的知识库条目
unverified_entries = session.query(KnowledgeEntry).filter(
KnowledgeEntry.is_verified == False
).all()
if unverified_entries:
print(f" 📝 发现 {len(unverified_entries)} 条未验证的知识库条目")
# 将现有的知识库条目标记为已验证
for entry in unverified_entries:
entry.is_verified = True
entry.verified_by = "system_init"
entry.verified_at = datetime.now()
if not hasattr(entry, 'search_frequency'):
entry.search_frequency = 0
if not hasattr(entry, 'relevance_score'):
entry.relevance_score = 0.7
session.commit()
print(f" ✅ 成功验证 {len(unverified_entries)} 条知识库条目")
else:
print(" ✅ 所有知识库条目已验证")
return True
except Exception as e:
print(f" ❌ 验证知识库条目失败: {e}")
return False
def _verify_database_integrity(self) -> bool:
"""验证数据库完整性"""
print("\n🔍 验证数据库完整性...")
try:
with db_manager.get_session() as session:
# 检查各表的记录数
tables_info = {
'work_orders': WorkOrder,
'conversations': Conversation,
'knowledge_entries': KnowledgeEntry,
'analytics': Analytics,
'alerts': Alert,
'vehicle_data': VehicleData
}
total_records = 0
for table_name, model_class in tables_info.items():
try:
count = session.query(model_class).count()
total_records += count
print(f" 📋 {table_name}: {count} 条记录")
except Exception as e:
print(f" ⚠️ {table_name}: 检查失败 - {e}")
print(f" 📊 总记录数: {total_records}")
# 检查关键字段
self._check_critical_fields()
print(" ✅ 数据库完整性验证通过")
return True
except Exception as e:
print(f" ❌ 数据库完整性验证失败: {e}")
return False
def _check_critical_fields(self):
"""检查关键字段"""
critical_checks = [
("knowledge_entries", "is_verified"),
("alerts", "severity"),
("vehicle_data", "vehicle_id"),
("conversations", "response_time"),
("work_orders", "ai_suggestion")
]
for table_name, field_name in critical_checks:
if self._column_exists(table_name, field_name):
print(f"{table_name}.{field_name} 字段存在")
else:
print(f" ⚠️ {table_name}.{field_name} 字段缺失")
def _generate_init_report(self):
"""生成初始化报告"""
print("\n📋 生成初始化报告...")
try:
report = {
"init_time": datetime.now().isoformat(),
"database_version": self.db_version,
"database_url": self.db_url,
"migrations_applied": len(self.migration_history),
"tables_created": self._get_table_count(),
"initial_data_inserted": True,
"verification_passed": True
}
# 保存报告到文件
report_path = Path("database_init_report.json")
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f" ✅ 初始化报告已保存到: {report_path}")
except Exception as e:
print(f" ⚠️ 生成初始化报告失败: {e}")
def _get_table_count(self) -> int:
"""获取表数量"""
try:
inspector = inspect(db_manager.engine)
return len(inspector.get_table_names())
except Exception:
return 0
def check_database_status(self) -> Dict[str, Any]:
"""检查数据库状态"""
print("\n" + "=" * 80)
print("📊 数据库状态检查")
print("=" * 80)
try:
with db_manager.get_session() as session:
# 检查各表的记录数
tables_info = {
'work_orders': WorkOrder,
'conversations': Conversation,
'knowledge_entries': KnowledgeEntry,
'analytics': Analytics,
'alerts': Alert,
'vehicle_data': VehicleData
}
status = {
"database_version": self.db_version,
"connection_status": "正常",
"tables": {},
"total_records": 0,
"last_check": datetime.now().isoformat()
}
for table_name, model_class in tables_info.items():
try:
count = session.query(model_class).count()
status["tables"][table_name] = count
status["total_records"] += count
print(f"📋 {table_name}: {count} 条记录")
except Exception as e:
status["tables"][table_name] = f"错误: {e}"
print(f"⚠️ {table_name}: 检查失败 - {e}")
# 检查车辆数据详情
if 'vehicle_data' in status["tables"] and isinstance(status["tables"]['vehicle_data'], int):
vehicle_count = status["tables"]['vehicle_data']
if vehicle_count > 0:
vehicle_ids = session.query(VehicleData.vehicle_id).distinct().all()
print(f" - 车辆数量: {len(vehicle_ids)}")
status["vehicle_count"] = len(vehicle_ids)
for vehicle_id in vehicle_ids[:3]: # 显示前3个车辆
vehicle_data_types = session.query(VehicleData.data_type).filter(
VehicleData.vehicle_id == vehicle_id[0]
).distinct().all()
print(f" - 车辆 {vehicle_id[0]}: {len(vehicle_data_types)} 种数据类型")
# 检查知识库验证状态
if 'knowledge_entries' in status["tables"] and isinstance(status["tables"]['knowledge_entries'], int):
verified_count = session.query(KnowledgeEntry).filter(KnowledgeEntry.is_verified == True).count()
unverified_count = session.query(KnowledgeEntry).filter(KnowledgeEntry.is_verified == False).count()
print(f" - 已验证: {verified_count}")
print(f" - 未验证: {unverified_count}")
status["knowledge_verification"] = {
"verified": verified_count,
"unverified": unverified_count
}
print(f"\n📊 总记录数: {status['total_records']}")
print("\n✅ 数据库状态检查完成")
return status
except Exception as e:
print(f"❌ 数据库状态检查失败: {e}")
return {"error": str(e)}
def main():
"""主函数"""
print("🚀 TSP智能助手数据库初始化工具 - 重构版本")
print("=" * 80)
# 创建初始化器
initializer = DatabaseInitializer()
# 检查命令行参数
force_reset = '--reset' in sys.argv or '--force' in sys.argv
if force_reset:
print("⚠️ 警告:将重置数据库,所有数据将被删除!")
try:
confirm = input("确定要继续吗?(y/N): ")
if confirm.lower() != 'y':
print("操作已取消")
return
except Exception:
print("非交互环境,跳过确认")
# 初始化数据库
if initializer.initialize_database(force_reset=force_reset):
# 检查数据库状态
initializer.check_database_status()
print("\n" + "=" * 80)
print("🎉 数据库初始化成功!")
print("=" * 80)
print("✅ 已完成的操作:")
print(" - 创建所有数据库表")
print(" - 执行数据库迁移")
print(" - 添加知识库验证字段")
print(" - 创建车辆数据表")
print(" - 插入初始知识库数据")
print(" - 添加示例车辆数据")
print(" - 验证所有知识库条目")
print(" - 生成初始化报告")
print("\n🚀 现在您可以运行以下命令启动系统:")
print(" python start_dashboard.py")
print("\n🧪 或运行功能测试:")
print(" python test_new_features.py")
print("\n📋 新功能包括:")
print(" - 知识库分页显示")
print(" - 知识库验证机制")
print(" - 车辆实时数据管理")
print(" - 文件上传生成知识库")
print(" - 智能对话结合车辆数据")
print(" - 飞书同步功能")
print(" - 系统性能优化")
else:
print("\n" + "=" * 80)
print("❌ 数据库初始化失败!")
if __name__ == "__main__":
main()