Files
tsp-assistant/deploy.py
2025-09-08 15:27:22 +08:00

439 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TSP智能助手部署管理脚本
支持自动化部署、升级和回滚
"""
import os
import sys
import json
import shutil
import subprocess
import argparse
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
class DeploymentManager:
"""部署管理器"""
def __init__(self, config_file: str = "deploy_config.json"):
self.config_file = config_file
self.config = self._load_config()
self.backup_dir = Path("backups")
self.backup_dir.mkdir(exist_ok=True)
def _load_config(self) -> Dict:
"""加载部署配置"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"加载配置失败: {e}")
# 默认配置
return {
"environment": "production",
"app_name": "tsp_assistant",
"deploy_path": "/opt/tsp_assistant",
"backup_path": "./backups",
"service_name": "tsp_assistant",
"python_path": "python3",
"pip_path": "pip3",
"nginx_config": "/etc/nginx/sites-available/tsp_assistant",
"systemd_service": "/etc/systemd/system/tsp_assistant.service",
"database_backup": True,
"auto_restart": True,
"health_check_url": "http://localhost:5000/api/health"
}
def _run_command(self, command: str, check: bool = True) -> subprocess.CompletedProcess:
"""执行命令"""
print(f"执行命令: {command}")
try:
result = subprocess.run(command, shell=True, check=check,
capture_output=True, text=True)
if result.stdout:
print(f"输出: {result.stdout}")
return result
except subprocess.CalledProcessError as e:
print(f"命令执行失败: {e}")
if e.stderr:
print(f"错误: {e.stderr}")
raise
def backup_current_deployment(self) -> str:
"""备份当前部署"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{self.config['app_name']}_backup_{timestamp}"
backup_path = self.backup_dir / backup_name
print(f"创建备份: {backup_path}")
# 备份应用文件
if os.path.exists(self.config['deploy_path']):
shutil.copytree(self.config['deploy_path'], backup_path)
# 备份数据库
if self.config.get('database_backup', True):
self._backup_database(backup_path)
# 保存备份信息
backup_info = {
"backup_name": backup_name,
"backup_path": str(backup_path),
"timestamp": timestamp,
"version": self._get_current_version(),
"git_commit": self._get_git_commit()
}
with open(backup_path / "backup_info.json", 'w') as f:
json.dump(backup_info, f, indent=2)
print(f"备份完成: {backup_name}")
return backup_name
def _backup_database(self, backup_path: Path):
"""备份数据库"""
try:
# 创建数据库备份目录
db_backup_dir = backup_path / "database"
db_backup_dir.mkdir(exist_ok=True)
# 备份SQLite数据库
db_file = "tsp_assistant.db"
if os.path.exists(db_file):
shutil.copy2(db_file, db_backup_dir / db_file)
print(f"已备份SQLite数据库: {db_file}")
# 备份MySQL数据库如果使用
mysql_config = self._get_mysql_config()
if mysql_config:
dump_file = db_backup_dir / "mysql_dump.sql"
cmd = f"mysqldump -h{mysql_config['host']} -u{mysql_config['user']} -p{mysql_config['password']} {mysql_config['database']} > {dump_file}"
self._run_command(cmd)
print(f"已备份MySQL数据库: {mysql_config['database']}")
except Exception as e:
print(f"数据库备份失败: {e}")
def _get_mysql_config(self) -> Optional[Dict]:
"""获取MySQL配置"""
try:
from src.config.config import Config
db_url = Config.DATABASE_URL
if db_url.startswith("mysql"):
# 解析MySQL连接字符串
# mysql+pymysql://user:password@host/database
parts = db_url.split("://")[1].split("@")
user_pass = parts[0].split(":")
host_db = parts[1].split("/")
return {
"user": user_pass[0],
"password": user_pass[1],
"host": host_db[0],
"database": host_db[1].split("?")[0]
}
except:
pass
return None
def _get_current_version(self) -> str:
"""获取当前版本"""
try:
from version import VersionManager
vm = VersionManager()
return vm.get_version()
except:
return "unknown"
def _get_git_commit(self) -> str:
"""获取Git提交哈希"""
try:
result = subprocess.run(['git', 'rev-parse', 'HEAD'],
capture_output=True, text=True)
return result.stdout.strip()[:8] if result.returncode == 0 else "unknown"
except:
return "unknown"
def deploy(self, source_path: str = ".", force: bool = False) -> bool:
"""部署应用"""
try:
print("开始部署...")
# 检查目标路径
deploy_path = Path(self.config['deploy_path'])
if deploy_path.exists() and not force:
response = input(f"目标路径 {deploy_path} 已存在,是否继续?(y/N): ")
if response.lower() != 'y':
print("部署取消")
return False
# 创建备份
backup_name = self.backup_current_deployment()
# 停止服务
if self.config.get('auto_restart', True):
self.stop_service()
# 部署新版本
self._deploy_files(source_path, deploy_path)
# 安装依赖
self._install_dependencies(deploy_path)
# 运行数据库迁移
self._run_database_migrations(deploy_path)
# 启动服务
if self.config.get('auto_restart', True):
self.start_service()
# 健康检查
if not self._health_check():
print("健康检查失败,开始回滚...")
self.rollback(backup_name)
return False
print("部署成功!")
return True
except Exception as e:
print(f"部署失败: {e}")
return False
def _deploy_files(self, source_path: str, deploy_path: Path):
"""部署文件"""
print(f"部署文件从 {source_path}{deploy_path}")
# 创建目标目录
deploy_path.mkdir(parents=True, exist_ok=True)
# 复制文件
source = Path(source_path)
for item in source.iterdir():
if item.name.startswith('.') and item.name not in ['.git', '.env']:
continue
if item.is_file():
shutil.copy2(item, deploy_path / item.name)
elif item.is_dir():
shutil.copytree(item, deploy_path / item.name, dirs_exist_ok=True)
print("文件部署完成")
def _install_dependencies(self, deploy_path: Path):
"""安装依赖"""
print("安装依赖包...")
requirements_file = deploy_path / "requirements.txt"
if requirements_file.exists():
cmd = f"cd {deploy_path} && {self.config['pip_path']} install -r requirements.txt"
self._run_command(cmd)
else:
print("未找到requirements.txt文件")
def _run_database_migrations(self, deploy_path: Path):
"""运行数据库迁移"""
print("运行数据库迁移...")
try:
# 运行数据库初始化脚本
init_script = deploy_path / "init_database.py"
if init_script.exists():
cmd = f"cd {deploy_path} && {self.config['python_path']} init_database.py"
self._run_command(cmd)
except Exception as e:
print(f"数据库迁移失败: {e}")
def start_service(self):
"""启动服务"""
print("启动服务...")
if self.config.get('service_name'):
try:
self._run_command(f"systemctl start {self.config['service_name']}")
print("服务启动成功")
except:
print("使用systemctl启动失败尝试直接启动...")
self._start_directly()
else:
self._start_directly()
def stop_service(self):
"""停止服务"""
print("停止服务...")
if self.config.get('service_name'):
try:
self._run_command(f"systemctl stop {self.config['service_name']}", check=False)
except:
pass
# 杀死相关进程
try:
self._run_command("pkill -f 'python.*start_dashboard.py'", check=False)
self._run_command("pkill -f 'python.*app.py'", check=False)
except:
pass
def _start_directly(self):
"""直接启动应用"""
deploy_path = self.config['deploy_path']
start_script = Path(deploy_path) / "start_dashboard.py"
if start_script.exists():
cmd = f"cd {deploy_path} && nohup {self.config['python_path']} start_dashboard.py > logs/deploy.log 2>&1 &"
self._run_command(cmd)
print("应用已启动")
def _health_check(self) -> bool:
"""健康检查"""
print("执行健康检查...")
try:
import requests
response = requests.get(self.config['health_check_url'], timeout=10)
if response.status_code == 200:
print("健康检查通过")
return True
except Exception as e:
print(f"健康检查失败: {e}")
return False
def rollback(self, backup_name: str = None) -> bool:
"""回滚到指定备份"""
try:
if backup_name is None:
# 获取最新的备份
backups = list(self.backup_dir.glob("*backup_*"))
if not backups:
print("没有找到备份")
return False
backup_name = max(backups, key=os.path.getctime).name
backup_path = self.backup_dir / backup_name
if not backup_path.exists():
print(f"备份不存在: {backup_name}")
return False
print(f"回滚到备份: {backup_name}")
# 停止服务
self.stop_service()
# 恢复文件
deploy_path = Path(self.config['deploy_path'])
if deploy_path.exists():
shutil.rmtree(deploy_path)
shutil.copytree(backup_path, deploy_path)
# 恢复数据库
self._restore_database(backup_path)
# 启动服务
self.start_service()
print("回滚完成")
return True
except Exception as e:
print(f"回滚失败: {e}")
return False
def _restore_database(self, backup_path: Path):
"""恢复数据库"""
try:
db_backup_dir = backup_path / "database"
if db_backup_dir.exists():
# 恢复SQLite数据库
db_file = db_backup_dir / "tsp_assistant.db"
if db_file.exists():
shutil.copy2(db_file, "tsp_assistant.db")
print("已恢复SQLite数据库")
# 恢复MySQL数据库
mysql_dump = db_backup_dir / "mysql_dump.sql"
if mysql_dump.exists():
mysql_config = self._get_mysql_config()
if mysql_config:
cmd = f"mysql -h{mysql_config['host']} -u{mysql_config['user']} -p{mysql_config['password']} {mysql_config['database']} < {mysql_dump}"
self._run_command(cmd)
print("已恢复MySQL数据库")
except Exception as e:
print(f"数据库恢复失败: {e}")
def list_backups(self):
"""列出所有备份"""
backups = list(self.backup_dir.glob("*backup_*"))
if not backups:
print("没有找到备份")
return
print("可用备份:")
for backup in sorted(backups, key=os.path.getctime, reverse=True):
backup_info_file = backup / "backup_info.json"
if backup_info_file.exists():
try:
with open(backup_info_file, 'r') as f:
info = json.load(f)
print(f" {info['backup_name']} - 版本: {info['version']} - {info['timestamp']}")
except:
print(f" {backup.name}")
else:
print(f" {backup.name}")
def cleanup_old_backups(self, keep_count: int = 5):
"""清理旧备份"""
backups = list(self.backup_dir.glob("*backup_*"))
if len(backups) <= keep_count:
print(f"备份数量({len(backups)})不超过保留数量({keep_count})")
return
# 按创建时间排序,保留最新的
backups.sort(key=os.path.getctime, reverse=True)
to_delete = backups[keep_count:]
for backup in to_delete:
print(f"删除备份: {backup.name}")
shutil.rmtree(backup)
def main():
"""命令行接口"""
parser = argparse.ArgumentParser(description='TSP智能助手部署管理')
parser.add_argument('action', choices=['deploy', 'rollback', 'backup', 'list-backups', 'cleanup'],
help='要执行的操作')
parser.add_argument('--source', default='.', help='源代码路径')
parser.add_argument('--backup', help='备份名称')
parser.add_argument('--force', action='store_true', help='强制部署')
parser.add_argument('--keep', type=int, default=5, help='保留备份数量')
args = parser.parse_args()
dm = DeploymentManager()
if args.action == 'deploy':
success = dm.deploy(args.source, args.force)
sys.exit(0 if success else 1)
elif args.action == 'rollback':
success = dm.rollback(args.backup)
sys.exit(0 if success else 1)
elif args.action == 'backup':
backup_name = dm.backup_current_deployment()
print(f"备份完成: {backup_name}")
elif args.action == 'list-backups':
dm.list_backups()
elif args.action == 'cleanup':
dm.cleanup_old_backups(args.keep)
if __name__ == "__main__":
main()