diff --git a/src/analytics/ai_success_monitor.py b/src/analytics/ai_success_monitor.py index f2fc9b0..c06f922 100644 --- a/src/analytics/ai_success_monitor.py +++ b/src/analytics/ai_success_monitor.py @@ -10,11 +10,11 @@ from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timedelta from dataclasses import dataclass from collections import defaultdict -import redis import time from ..core.database import db_manager from ..core.models import Alert +from ..core.redis_manager import redis_manager from ..config.config import Config logger = logging.getLogger(__name__) @@ -38,9 +38,6 @@ class AISuccessMonitor: """AI调用成功率监控器""" def __init__(self): - self.redis_client = None - self._init_redis() - # 监控阈值 self.thresholds = { "success_rate_min": 0.95, # 最低成功率95% @@ -58,23 +55,9 @@ class AISuccessMonitor: "poor": {"success_rate": 0.85, "response_time": 12.0} } - def _init_redis(self): - """初始化Redis连接""" - try: - self.redis_client = redis.Redis( - host='43.134.68.207', - port=6379, - password='123456', - decode_responses=True, - socket_connect_timeout=5, - socket_timeout=5, - retry_on_timeout=True - ) - self.redis_client.ping() - logger.info("AI成功率监控Redis连接成功") - except Exception as e: - logger.error(f"AI成功率监控Redis连接失败: {e}") - self.redis_client = None + def _get_redis_client(self): + """获取Redis客户端""" + return redis_manager.get_connection() def record_api_call( self, @@ -120,7 +103,8 @@ class AISuccessMonitor: def _save_to_redis(self, api_call: APICall): """保存到Redis""" - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return try: @@ -139,23 +123,23 @@ class AISuccessMonitor: } # 保存到多个键 - self.redis_client.zadd( + redis_client.zadd( "api_calls:daily", {json.dumps(call_data, ensure_ascii=False): timestamp} ) - self.redis_client.zadd( + redis_client.zadd( f"api_calls:model:{api_call.model_name}", {json.dumps(call_data, ensure_ascii=False): timestamp} ) - self.redis_client.zadd( + redis_client.zadd( f"api_calls:user:{api_call.user_id}", {json.dumps(call_data, ensure_ascii=False): timestamp} ) # 设置过期时间(保留30天) - self.redis_client.expire("api_calls:daily", 30 * 24 * 3600) + redis_client.expire("api_calls:daily", 30 * 24 * 3600) except Exception as e: logger.error(f"保存API调用到Redis失败: {e}") @@ -205,11 +189,12 @@ class AISuccessMonitor: def _get_consecutive_failures(self, model_name: str) -> int: """获取连续失败次数""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return 0 # 获取最近的调用记录 - recent_calls = self.redis_client.zrevrange( + recent_calls = redis_client.zrevrange( f"api_calls:model:{model_name}", 0, 9, # 最近10次调用 @@ -236,7 +221,8 @@ class AISuccessMonitor: def _get_hourly_failures(self, timestamp: datetime) -> int: """获取每小时失败次数""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return 0 hour_start = timestamp.replace(minute=0, second=0, microsecond=0) @@ -245,7 +231,7 @@ class AISuccessMonitor: start_time = hour_start.timestamp() end_time = hour_end.timestamp() - calls = self.redis_client.zrangebyscore( + calls = redis_client.zrangebyscore( "api_calls:daily", start_time, end_time, @@ -270,13 +256,14 @@ class AISuccessMonitor: def _get_recent_success_rate(self, model_name: str, hours: int = 1) -> float: """获取最近成功率""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return 0.0 end_time = datetime.now().timestamp() start_time = (datetime.now() - timedelta(hours=hours)).timestamp() - calls = self.redis_client.zrangebyscore( + calls = redis_client.zrangebyscore( f"api_calls:model:{model_name}", start_time, end_time, @@ -306,13 +293,14 @@ class AISuccessMonitor: def _get_avg_response_time(self, model_name: str, hours: int = 1) -> float: """获取平均响应时间""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return 0.0 end_time = datetime.now().timestamp() start_time = (datetime.now() - timedelta(hours=hours)).timestamp() - calls = self.redis_client.zrangebyscore( + calls = redis_client.zrangebyscore( f"api_calls:model:{model_name}", start_time, end_time, @@ -366,13 +354,14 @@ class AISuccessMonitor: def get_model_performance(self, model_name: str, hours: int = 24) -> Dict[str, Any]: """获取模型性能指标""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return {} end_time = datetime.now().timestamp() start_time = (datetime.now() - timedelta(hours=hours)).timestamp() - calls = self.redis_client.zrangebyscore( + calls = redis_client.zrangebyscore( f"api_calls:model:{model_name}", start_time, end_time, @@ -451,13 +440,14 @@ class AISuccessMonitor: def get_system_performance(self, hours: int = 24) -> Dict[str, Any]: """获取系统整体性能""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return {} end_time = datetime.now().timestamp() start_time = (datetime.now() - timedelta(hours=hours)).timestamp() - calls = self.redis_client.zrangebyscore( + calls = redis_client.zrangebyscore( "api_calls:daily", start_time, end_time, @@ -538,7 +528,8 @@ class AISuccessMonitor: start_time = day_start.timestamp() end_time = day_end.timestamp() - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: trend_data.append({ "date": date.isoformat(), "total_calls": 0, @@ -547,7 +538,7 @@ class AISuccessMonitor: }) continue - calls = self.redis_client.zrangebyscore( + calls = redis_client.zrangebyscore( "api_calls:daily", start_time, end_time, @@ -598,27 +589,28 @@ class AISuccessMonitor: def cleanup_old_data(self, days: int = 30) -> int: """清理旧数据""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return 0 cutoff_time = (datetime.now() - timedelta(days=days)).timestamp() # 清理每日数据 - removed_count = self.redis_client.zremrangebyscore( + removed_count = redis_client.zremrangebyscore( "api_calls:daily", 0, cutoff_time ) # 清理模型数据 - model_keys = self.redis_client.keys("api_calls:model:*") + model_keys = redis_client.keys("api_calls:model:*") for key in model_keys: - self.redis_client.zremrangebyscore(key, 0, cutoff_time) + redis_client.zremrangebyscore(key, 0, cutoff_time) # 清理用户数据 - user_keys = self.redis_client.keys("api_calls:user:*") + user_keys = redis_client.keys("api_calls:user:*") for key in user_keys: - self.redis_client.zremrangebyscore(key, 0, cutoff_time) + redis_client.zremrangebyscore(key, 0, cutoff_time) logger.info(f"清理AI成功率监控数据成功: 数量={removed_count}") return removed_count diff --git a/src/analytics/token_monitor.py b/src/analytics/token_monitor.py index fd03d41..2a8d4c7 100644 --- a/src/analytics/token_monitor.py +++ b/src/analytics/token_monitor.py @@ -10,10 +10,9 @@ from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timedelta from dataclasses import dataclass from collections import defaultdict -import redis - from ..core.database import db_manager from ..core.models import Conversation +from ..core.redis_manager import redis_manager from ..config.config import Config logger = logging.getLogger(__name__) @@ -37,9 +36,6 @@ class TokenMonitor: """Token消耗监控器""" def __init__(self): - self.redis_client = None - self._init_redis() - # Token价格配置(每1000个token的价格,单位:元) self.token_prices = { "qwen-plus-latest": { @@ -64,23 +60,9 @@ class TokenMonitor: "error_rate_threshold": 0.1 # 错误率阈值 } - def _init_redis(self): - """初始化Redis连接""" - try: - self.redis_client = redis.Redis( - host='43.134.68.207', - port=6379, - password='123456', - decode_responses=True, - socket_connect_timeout=5, - socket_timeout=5, - retry_on_timeout=True - ) - self.redis_client.ping() - logger.info("Token监控Redis连接成功") - except Exception as e: - logger.error(f"Token监控Redis连接失败: {e}") - self.redis_client = None + def _get_redis_client(self): + """获取Redis客户端""" + return redis_manager.get_connection() def record_token_usage( self, @@ -141,7 +123,8 @@ class TokenMonitor: def _save_to_redis(self, usage: TokenUsage): """保存到Redis""" - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return try: @@ -161,24 +144,24 @@ class TokenMonitor: } # 保存到多个键 - self.redis_client.zadd( + redis_client.zadd( "token_usage:daily", {json.dumps(usage_data, ensure_ascii=False): timestamp} ) - self.redis_client.zadd( + redis_client.zadd( f"token_usage:user:{usage.user_id}", {json.dumps(usage_data, ensure_ascii=False): timestamp} ) if usage.work_order_id: - self.redis_client.zadd( + redis_client.zadd( f"token_usage:work_order:{usage.work_order_id}", {json.dumps(usage_data, ensure_ascii=False): timestamp} ) # 设置过期时间(保留30天) - self.redis_client.expire("token_usage:daily", 30 * 24 * 3600) + redis_client.expire("token_usage:daily", 30 * 24 * 3600) except Exception as e: logger.error(f"保存Token使用到Redis失败: {e}") @@ -241,14 +224,15 @@ class TokenMonitor: def get_daily_cost(self, date: datetime.date) -> float: """获取指定日期的成本""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return 0.0 start_time = datetime.combine(date, datetime.min.time()).timestamp() end_time = datetime.combine(date, datetime.max.time()).timestamp() # 从Redis获取当日数据 - usage_records = self.redis_client.zrangebyscore( + usage_records = redis_client.zrangebyscore( "token_usage:daily", start_time, end_time, @@ -272,7 +256,8 @@ class TokenMonitor: def get_hourly_cost(self, timestamp: datetime) -> float: """获取指定小时的成本""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return 0.0 # 获取当前小时的数据 @@ -282,7 +267,7 @@ class TokenMonitor: start_time = hour_start.timestamp() end_time = hour_end.timestamp() - usage_records = self.redis_client.zrangebyscore( + usage_records = redis_client.zrangebyscore( "token_usage:daily", start_time, end_time, @@ -306,13 +291,14 @@ class TokenMonitor: def get_user_token_stats(self, user_id: str, days: int = 7) -> Dict[str, Any]: """获取用户Token使用统计""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return {} end_time = datetime.now().timestamp() start_time = (datetime.now() - timedelta(days=days)).timestamp() - usage_records = self.redis_client.zrangebyscore( + usage_records = redis_client.zrangebyscore( f"token_usage:user:{user_id}", start_time, end_time, @@ -378,13 +364,14 @@ class TokenMonitor: def get_system_token_stats(self, days: int = 7) -> Dict[str, Any]: """获取系统Token使用统计""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return {} end_time = datetime.now().timestamp() start_time = (datetime.now() - timedelta(days=days)).timestamp() - usage_records = self.redis_client.zrangebyscore( + usage_records = redis_client.zrangebyscore( "token_usage:daily", start_time, end_time, @@ -466,27 +453,28 @@ class TokenMonitor: def cleanup_old_data(self, days: int = 30) -> int: """清理旧数据""" try: - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return 0 cutoff_time = (datetime.now() - timedelta(days=days)).timestamp() # 清理每日数据 - removed_count = self.redis_client.zremrangebyscore( + removed_count = redis_client.zremrangebyscore( "token_usage:daily", 0, cutoff_time ) # 清理用户数据 - user_keys = self.redis_client.keys("token_usage:user:*") + user_keys = redis_client.keys("token_usage:user:*") for key in user_keys: - self.redis_client.zremrangebyscore(key, 0, cutoff_time) + redis_client.zremrangebyscore(key, 0, cutoff_time) # 清理工单数据 - work_order_keys = self.redis_client.keys("token_usage:work_order:*") + work_order_keys = redis_client.keys("token_usage:work_order:*") for key in work_order_keys: - self.redis_client.zremrangebyscore(key, 0, cutoff_time) + redis_client.zremrangebyscore(key, 0, cutoff_time) logger.info(f"清理Token监控数据成功: 数量={removed_count}") return removed_count diff --git a/src/core/redis_manager.py b/src/core/redis_manager.py new file mode 100644 index 0000000..3c82f94 --- /dev/null +++ b/src/core/redis_manager.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +""" +统一Redis连接管理器 +避免多个模块重复连接Redis,提供单例模式管理 +""" + +import logging +import threading +from typing import Optional +import redis + +logger = logging.getLogger(__name__) + +class RedisManager: + """Redis连接管理器(单例模式)""" + + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self.redis_client = None + self.connected = False + self.connection_lock = threading.Lock() + self._initialized = True + + # Redis配置 + self.host = '43.134.68.207' + self.port = 6379 + self.password = '123456' + self.connect_timeout = 2 + self.socket_timeout = 2 + + def get_connection(self) -> Optional[redis.Redis]: + """获取Redis连接(懒加载)""" + if not self.connected: + with self.connection_lock: + if not self.connected: + try: + self.redis_client = redis.Redis( + host=self.host, + port=self.port, + password=self.password, + decode_responses=True, + socket_connect_timeout=self.connect_timeout, + socket_timeout=self.socket_timeout, + retry_on_timeout=True + ) + # 测试连接 + self.redis_client.ping() + self.connected = True + logger.info("Redis连接成功") + except Exception as e: + logger.debug(f"Redis连接失败: {e}") + self.redis_client = None + self.connected = False + + return self.redis_client + + def test_connection(self) -> bool: + """测试Redis连接""" + try: + client = self.get_connection() + if client: + client.ping() + return True + return False + except Exception as e: + logger.debug(f"Redis连接测试失败: {e}") + return False + + def close_connection(self): + """关闭Redis连接""" + with self.connection_lock: + if self.redis_client: + try: + self.redis_client.close() + except Exception as e: + logger.debug(f"关闭Redis连接失败: {e}") + finally: + self.redis_client = None + self.connected = False + +# 全局Redis管理器实例 +redis_manager = RedisManager() diff --git a/src/dialogue/conversation_history.py b/src/dialogue/conversation_history.py index 794b12d..e651950 100644 --- a/src/dialogue/conversation_history.py +++ b/src/dialogue/conversation_history.py @@ -8,11 +8,11 @@ import json import logging from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timedelta -import redis from sqlalchemy.orm import Session from ..core.database import db_manager from ..core.models import Conversation +from ..core.redis_manager import redis_manager from ..config.config import Config logger = logging.getLogger(__name__) @@ -21,29 +21,12 @@ class ConversationHistoryManager: """对话历史管理器""" def __init__(self): - self.redis_client = None - self._init_redis() self.max_history_length = 20 # 最大历史记录数 self.cache_ttl = 3600 * 24 # 缓存24小时 - def _init_redis(self): - """初始化Redis连接""" - try: - self.redis_client = redis.Redis( - host='43.134.68.207', - port=6379, - password='123456', - decode_responses=True, - socket_connect_timeout=5, - socket_timeout=5, - retry_on_timeout=True - ) - # 测试连接 - self.redis_client.ping() - logger.info("Redis连接成功") - except Exception as e: - logger.error(f"Redis连接失败: {e}") - self.redis_client = None + def _get_redis_client(self): + """获取Redis客户端""" + return redis_manager.get_connection() def _get_cache_key(self, user_id: str, work_order_id: Optional[int] = None) -> str: """生成缓存键""" @@ -109,7 +92,8 @@ class ConversationHistoryManager: response_time: Optional[float] = None ): """保存对话到Redis缓存""" - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return try: @@ -126,13 +110,13 @@ class ConversationHistoryManager: } # 添加到Redis列表 - self.redis_client.lpush(cache_key, json.dumps(conversation_record, ensure_ascii=False)) + redis_client.lpush(cache_key, json.dumps(conversation_record, ensure_ascii=False)) # 限制列表长度 - self.redis_client.ltrim(cache_key, 0, self.max_history_length - 1) + redis_client.ltrim(cache_key, 0, self.max_history_length - 1) # 设置过期时间 - self.redis_client.expire(cache_key, self.cache_ttl) + redis_client.expire(cache_key, self.cache_ttl) except Exception as e: logger.error(f"保存到Redis缓存失败: {e}") @@ -147,7 +131,7 @@ class ConversationHistoryManager: """获取对话历史(优先从Redis获取)""" try: # 先尝试从Redis获取 - if self.redis_client: + if redis_client: cached_history = self._get_from_cache(user_id, work_order_id, limit, offset) if cached_history: return cached_history @@ -167,7 +151,8 @@ class ConversationHistoryManager: offset: int ) -> List[Dict[str, Any]]: """从Redis缓存获取对话历史""" - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return [] try: @@ -177,7 +162,7 @@ class ConversationHistoryManager: start = offset end = offset + limit - 1 - cached_data = self.redis_client.lrange(cache_key, start, end) + cached_data = redis_client.lrange(cache_key, start, end) history = [] for data in cached_data: @@ -307,20 +292,21 @@ class ConversationHistoryManager: def _invalidate_cache(self, work_order_id: Optional[int] = None): """清除相关缓存""" - if not self.redis_client: + redis_client = self._get_redis_client() + if not redis_client: return try: # 清除工单相关缓存 if work_order_id: cache_key = f"conversation_history:work_order:{work_order_id}" - self.redis_client.delete(cache_key) + redis_client.delete(cache_key) # 清除所有用户缓存(简单粗暴的方式) pattern = "conversation_history:user:*" - keys = self.redis_client.keys(pattern) + keys = redis_client.keys(pattern) if keys: - self.redis_client.delete(*keys) + redis_client.delete(*keys) except Exception as e: logger.error(f"清除缓存失败: {e}") @@ -354,7 +340,7 @@ class ConversationHistoryManager: "total_conversations": total_count, "avg_response_time": round(avg_response_time, 2), "avg_confidence": round(avg_confidence, 2), - "cache_status": "connected" if self.redis_client else "disconnected" + "cache_status": "connected" if redis_manager.test_connection() else "disconnected" } except Exception as e: