# -*- coding: utf-8 -*- """ Redis连接管理器 统一管理所有Redis连接,避免重复连接 """ import redis import logging import threading from typing import Optional logger = logging.getLogger(__name__) class RedisManager: """Redis连接管理器(单例模式)""" _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.redis_client = None self.connected = False self.connection_lock = threading.Lock() self._initialized = True # Redis配置 self.config = { 'host': '43.134.68.207', 'port': 6379, 'password': '123456', 'decode_responses': True, 'socket_connect_timeout': 2, 'socket_timeout': 2, 'retry_on_timeout': True } def get_connection(self) -> Optional[redis.Redis]: """获取Redis连接(懒加载)""" with self.connection_lock: if not self.connected: try: self.redis_client = redis.Redis(**self.config) self.redis_client.ping() self.connected = True logger.info("Redis连接成功") except Exception as e: logger.debug(f"Redis连接失败: {e}") self.redis_client = None self.connected = False return self.redis_client def is_connected(self) -> bool: """检查Redis是否已连接""" return self.connected and self.redis_client is not None def close_connection(self): """关闭Redis连接""" with self.connection_lock: if self.redis_client: try: self.redis_client.close() except Exception as e: logger.debug(f"关闭Redis连接失败: {e}") finally: self.redis_client = None self.connected = False # 全局Redis管理器实例 redis_manager = RedisManager()