#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ TSP智能助手部署管理脚本 支持自动化部署、升级和回滚 """ import os import sys import json import shutil import subprocess import argparse from datetime import datetime from pathlib import Path from typing import Dict, List, Optional class DeploymentManager: """部署管理器""" def __init__(self, config_file: str = "deploy_config.json"): self.config_file = config_file self.config = self._load_config() self.backup_dir = Path("backups") self.backup_dir.mkdir(exist_ok=True) def _load_config(self) -> Dict: """加载部署配置""" if os.path.exists(self.config_file): try: with open(self.config_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"加载配置失败: {e}") # 默认配置 return { "environment": "production", "app_name": "tsp_assistant", "deploy_path": "/opt/tsp_assistant", "backup_path": "./backups", "service_name": "tsp_assistant", "python_path": "python3", "pip_path": "pip3", "nginx_config": "/etc/nginx/sites-available/tsp_assistant", "systemd_service": "/etc/systemd/system/tsp_assistant.service", "database_backup": True, "auto_restart": True, "health_check_url": "http://localhost:5000/api/health" } def _run_command(self, command: str, check: bool = True) -> subprocess.CompletedProcess: """执行命令""" print(f"执行命令: {command}") try: result = subprocess.run(command, shell=True, check=check, capture_output=True, text=True) if result.stdout: print(f"输出: {result.stdout}") return result except subprocess.CalledProcessError as e: print(f"命令执行失败: {e}") if e.stderr: print(f"错误: {e.stderr}") raise def backup_current_deployment(self) -> str: """备份当前部署""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_name = f"{self.config['app_name']}_backup_{timestamp}" backup_path = self.backup_dir / backup_name print(f"创建备份: {backup_path}") # 备份应用文件 if os.path.exists(self.config['deploy_path']): shutil.copytree(self.config['deploy_path'], backup_path) # 备份数据库 if self.config.get('database_backup', True): self._backup_database(backup_path) # 保存备份信息 backup_info = { "backup_name": backup_name, "backup_path": str(backup_path), "timestamp": timestamp, "version": self._get_current_version(), "git_commit": self._get_git_commit() } with open(backup_path / "backup_info.json", 'w') as f: json.dump(backup_info, f, indent=2) print(f"备份完成: {backup_name}") return backup_name def _backup_database(self, backup_path: Path): """备份数据库""" try: # 创建数据库备份目录 db_backup_dir = backup_path / "database" db_backup_dir.mkdir(exist_ok=True) # 备份SQLite数据库 db_file = "tsp_assistant.db" if os.path.exists(db_file): shutil.copy2(db_file, db_backup_dir / db_file) print(f"已备份SQLite数据库: {db_file}") # 备份MySQL数据库(如果使用) mysql_config = self._get_mysql_config() if mysql_config: dump_file = db_backup_dir / "mysql_dump.sql" cmd = f"mysqldump -h{mysql_config['host']} -u{mysql_config['user']} -p{mysql_config['password']} {mysql_config['database']} > {dump_file}" self._run_command(cmd) print(f"已备份MySQL数据库: {mysql_config['database']}") except Exception as e: print(f"数据库备份失败: {e}") def _get_mysql_config(self) -> Optional[Dict]: """获取MySQL配置""" try: from src.config.config import Config db_url = Config.DATABASE_URL if db_url.startswith("mysql"): # 解析MySQL连接字符串 # mysql+pymysql://user:password@host/database parts = db_url.split("://")[1].split("@") user_pass = parts[0].split(":") host_db = parts[1].split("/") return { "user": user_pass[0], "password": user_pass[1], "host": host_db[0], "database": host_db[1].split("?")[0] } except: pass return None def _get_current_version(self) -> str: """获取当前版本""" try: from version import VersionManager vm = VersionManager() return vm.get_version() except: return "unknown" def _get_git_commit(self) -> str: """获取Git提交哈希""" try: result = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True, text=True) return result.stdout.strip()[:8] if result.returncode == 0 else "unknown" except: return "unknown" def deploy(self, source_path: str = ".", force: bool = False) -> bool: """部署应用""" try: print("开始部署...") # 检查目标路径 deploy_path = Path(self.config['deploy_path']) if deploy_path.exists() and not force: response = input(f"目标路径 {deploy_path} 已存在,是否继续?(y/N): ") if response.lower() != 'y': print("部署取消") return False # 创建备份 backup_name = self.backup_current_deployment() # 停止服务 if self.config.get('auto_restart', True): self.stop_service() # 部署新版本 self._deploy_files(source_path, deploy_path) # 安装依赖 self._install_dependencies(deploy_path) # 运行数据库迁移 self._run_database_migrations(deploy_path) # 启动服务 if self.config.get('auto_restart', True): self.start_service() # 健康检查 if not self._health_check(): print("健康检查失败,开始回滚...") self.rollback(backup_name) return False print("部署成功!") return True except Exception as e: print(f"部署失败: {e}") return False def _deploy_files(self, source_path: str, deploy_path: Path): """部署文件""" print(f"部署文件从 {source_path} 到 {deploy_path}") # 创建目标目录 deploy_path.mkdir(parents=True, exist_ok=True) # 复制文件 source = Path(source_path) for item in source.iterdir(): if item.name.startswith('.') and item.name not in ['.git', '.env']: continue if item.is_file(): shutil.copy2(item, deploy_path / item.name) elif item.is_dir(): shutil.copytree(item, deploy_path / item.name, dirs_exist_ok=True) print("文件部署完成") def _install_dependencies(self, deploy_path: Path): """安装依赖""" print("安装依赖包...") requirements_file = deploy_path / "requirements.txt" if requirements_file.exists(): cmd = f"cd {deploy_path} && {self.config['pip_path']} install -r requirements.txt" self._run_command(cmd) else: print("未找到requirements.txt文件") def _run_database_migrations(self, deploy_path: Path): """运行数据库迁移""" print("运行数据库迁移...") try: # 运行数据库初始化脚本 init_script = deploy_path / "init_database.py" if init_script.exists(): cmd = f"cd {deploy_path} && {self.config['python_path']} init_database.py" self._run_command(cmd) except Exception as e: print(f"数据库迁移失败: {e}") def start_service(self): """启动服务""" print("启动服务...") if self.config.get('service_name'): try: self._run_command(f"systemctl start {self.config['service_name']}") print("服务启动成功") except: print("使用systemctl启动失败,尝试直接启动...") self._start_directly() else: self._start_directly() def stop_service(self): """停止服务""" print("停止服务...") if self.config.get('service_name'): try: self._run_command(f"systemctl stop {self.config['service_name']}", check=False) except: pass # 杀死相关进程 try: self._run_command("pkill -f 'python.*start_dashboard.py'", check=False) self._run_command("pkill -f 'python.*app.py'", check=False) except: pass def _start_directly(self): """直接启动应用""" deploy_path = self.config['deploy_path'] start_script = Path(deploy_path) / "start_dashboard.py" if start_script.exists(): cmd = f"cd {deploy_path} && nohup {self.config['python_path']} start_dashboard.py > logs/deploy.log 2>&1 &" self._run_command(cmd) print("应用已启动") def _health_check(self) -> bool: """健康检查""" print("执行健康检查...") try: import requests response = requests.get(self.config['health_check_url'], timeout=10) if response.status_code == 200: print("健康检查通过") return True except Exception as e: print(f"健康检查失败: {e}") return False def rollback(self, backup_name: str = None) -> bool: """回滚到指定备份""" try: if backup_name is None: # 获取最新的备份 backups = list(self.backup_dir.glob("*backup_*")) if not backups: print("没有找到备份") return False backup_name = max(backups, key=os.path.getctime).name backup_path = self.backup_dir / backup_name if not backup_path.exists(): print(f"备份不存在: {backup_name}") return False print(f"回滚到备份: {backup_name}") # 停止服务 self.stop_service() # 恢复文件 deploy_path = Path(self.config['deploy_path']) if deploy_path.exists(): shutil.rmtree(deploy_path) shutil.copytree(backup_path, deploy_path) # 恢复数据库 self._restore_database(backup_path) # 启动服务 self.start_service() print("回滚完成") return True except Exception as e: print(f"回滚失败: {e}") return False def _restore_database(self, backup_path: Path): """恢复数据库""" try: db_backup_dir = backup_path / "database" if db_backup_dir.exists(): # 恢复SQLite数据库 db_file = db_backup_dir / "tsp_assistant.db" if db_file.exists(): shutil.copy2(db_file, "tsp_assistant.db") print("已恢复SQLite数据库") # 恢复MySQL数据库 mysql_dump = db_backup_dir / "mysql_dump.sql" if mysql_dump.exists(): mysql_config = self._get_mysql_config() if mysql_config: cmd = f"mysql -h{mysql_config['host']} -u{mysql_config['user']} -p{mysql_config['password']} {mysql_config['database']} < {mysql_dump}" self._run_command(cmd) print("已恢复MySQL数据库") except Exception as e: print(f"数据库恢复失败: {e}") def list_backups(self): """列出所有备份""" backups = list(self.backup_dir.glob("*backup_*")) if not backups: print("没有找到备份") return print("可用备份:") for backup in sorted(backups, key=os.path.getctime, reverse=True): backup_info_file = backup / "backup_info.json" if backup_info_file.exists(): try: with open(backup_info_file, 'r') as f: info = json.load(f) print(f" {info['backup_name']} - 版本: {info['version']} - {info['timestamp']}") except: print(f" {backup.name}") else: print(f" {backup.name}") def cleanup_old_backups(self, keep_count: int = 5): """清理旧备份""" backups = list(self.backup_dir.glob("*backup_*")) if len(backups) <= keep_count: print(f"备份数量({len(backups)})不超过保留数量({keep_count})") return # 按创建时间排序,保留最新的 backups.sort(key=os.path.getctime, reverse=True) to_delete = backups[keep_count:] for backup in to_delete: print(f"删除备份: {backup.name}") shutil.rmtree(backup) def main(): """命令行接口""" parser = argparse.ArgumentParser(description='TSP智能助手部署管理') parser.add_argument('action', choices=['deploy', 'rollback', 'backup', 'list-backups', 'cleanup'], help='要执行的操作') parser.add_argument('--source', default='.', help='源代码路径') parser.add_argument('--backup', help='备份名称') parser.add_argument('--force', action='store_true', help='强制部署') parser.add_argument('--keep', type=int, default=5, help='保留备份数量') args = parser.parse_args() dm = DeploymentManager() if args.action == 'deploy': success = dm.deploy(args.source, args.force) sys.exit(0 if success else 1) elif args.action == 'rollback': success = dm.rollback(args.backup) sys.exit(0 if success else 1) elif args.action == 'backup': backup_name = dm.backup_current_deployment() print(f"备份完成: {backup_name}") elif args.action == 'list-backups': dm.list_backups() elif args.action == 'cleanup': dm.cleanup_old_backups(args.keep) if __name__ == "__main__": main()