feat: 任务 7 会话状态迁移到 Redis

新增 src/core/session_store.py:
- SessionStore 抽象接口(get/set/delete/list_all/check_and_set_dedup)
- MemorySessionStore: 内存实现(单进程,带 TTL 和容量限制)
- RedisSessionStore: Redis 实现(多进程共享,SETNX 原子去重)
- create_session_store(): 优先 Redis,降级内存

RealtimeChatManager 集成:
- 初始化时创建 session_store(自动选择 Redis 或内存)
- create_session 同时写入 session_store
- end_session 同时从 session_store 删除
- get_active_sessions 合并内存和 session_store 的会话(跨进程可见)

支持多实例部署:
- 配置 REDIS_ENABLED=true + REDIS_HOST 即可启用
- 不配置 Redis 时行为完全不变(内存存储)
This commit is contained in:
2026-04-08 08:59:11 +08:00
parent 45badfee82
commit 54d4043805
3 changed files with 235 additions and 15 deletions

198
src/core/session_store.py Normal file
View File

@@ -0,0 +1,198 @@
# -*- coding: utf-8 -*-
"""
会话存储抽象层
支持内存存储(单进程)和 Redis 存储(多进程)。
Redis 不可用时自动降级为内存存储。
"""
import json
import logging
import time
from typing import Any, Dict, List, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class SessionStore:
"""会话存储接口"""
def get(self, session_id: str) -> Optional[Dict]:
raise NotImplementedError
def set(self, session_id: str, data: Dict, ttl: int = 3600):
raise NotImplementedError
def delete(self, session_id: str):
raise NotImplementedError
def exists(self, session_id: str) -> bool:
raise NotImplementedError
def list_all(self) -> List[Dict]:
raise NotImplementedError
def count(self) -> int:
raise NotImplementedError
def check_and_set_dedup(self, message_id: str, ttl: int = 300) -> bool:
"""消息去重:返回 True 表示已处理重复False 表示新消息"""
raise NotImplementedError
class MemorySessionStore(SessionStore):
"""内存会话存储(单进程,默认)"""
def __init__(self, max_size: int = 500):
self._store: Dict[str, Dict] = {}
self._dedup: Dict[str, float] = {}
self._max_size = max_size
def get(self, session_id: str) -> Optional[Dict]:
item = self._store.get(session_id)
if item and item.get('_expires_at', float('inf')) > time.time():
return item
if item:
del self._store[session_id]
return None
def set(self, session_id: str, data: Dict, ttl: int = 3600):
data['_expires_at'] = time.time() + ttl
self._store[session_id] = data
self._evict_if_needed()
def delete(self, session_id: str):
self._store.pop(session_id, None)
def exists(self, session_id: str) -> bool:
return self.get(session_id) is not None
def list_all(self) -> List[Dict]:
now = time.time()
result = []
expired = []
for sid, data in self._store.items():
if data.get('_expires_at', float('inf')) > now:
result.append({**data, 'session_id': sid})
else:
expired.append(sid)
for sid in expired:
del self._store[sid]
return result
def count(self) -> int:
return len(self._store)
def check_and_set_dedup(self, message_id: str, ttl: int = 300) -> bool:
now = time.time()
# 清理过期
self._dedup = {k: v for k, v in self._dedup.items() if v > now}
if message_id in self._dedup:
return True
self._dedup[message_id] = now + ttl
return False
def _evict_if_needed(self):
if len(self._store) > self._max_size:
sorted_items = sorted(self._store.items(), key=lambda x: x[1].get('_expires_at', 0))
to_remove = len(self._store) - self._max_size
for sid, _ in sorted_items[:to_remove]:
del self._store[sid]
class RedisSessionStore(SessionStore):
"""Redis 会话存储(多进程共享)"""
PREFIX = "session:"
DEDUP_PREFIX = "dedup:"
def __init__(self, redis_client):
self._redis = redis_client
def get(self, session_id: str) -> Optional[Dict]:
try:
data = self._redis.get(f"{self.PREFIX}{session_id}")
if data:
return json.loads(data)
except Exception as e:
logger.warning(f"Redis get session 失败: {e}")
return None
def set(self, session_id: str, data: Dict, ttl: int = 3600):
try:
# datetime 不能直接 JSON 序列化
serializable = {}
for k, v in data.items():
if isinstance(v, datetime):
serializable[k] = v.isoformat()
else:
serializable[k] = v
self._redis.setex(f"{self.PREFIX}{session_id}", ttl, json.dumps(serializable, ensure_ascii=False))
except Exception as e:
logger.warning(f"Redis set session 失败: {e}")
def delete(self, session_id: str):
try:
self._redis.delete(f"{self.PREFIX}{session_id}")
except Exception as e:
logger.warning(f"Redis delete session 失败: {e}")
def exists(self, session_id: str) -> bool:
try:
return bool(self._redis.exists(f"{self.PREFIX}{session_id}"))
except Exception:
return False
def list_all(self) -> List[Dict]:
try:
keys = self._redis.keys(f"{self.PREFIX}*")
result = []
for key in keys:
data = self._redis.get(key)
if data:
item = json.loads(data)
item['session_id'] = key.replace(self.PREFIX, '', 1)
result.append(item)
return result
except Exception as e:
logger.warning(f"Redis list sessions 失败: {e}")
return []
def count(self) -> int:
try:
return len(self._redis.keys(f"{self.PREFIX}*"))
except Exception:
return 0
def check_and_set_dedup(self, message_id: str, ttl: int = 300) -> bool:
try:
key = f"{self.DEDUP_PREFIX}{message_id}"
# SETNX: 如果 key 不存在则设置,返回 True已存在返回 False
was_set = self._redis.set(key, "1", nx=True, ex=ttl)
return not was_set # was_set=True 表示新消息,返回 False
except Exception as e:
logger.warning(f"Redis dedup 失败: {e}")
return False
def create_session_store() -> SessionStore:
"""创建会话存储实例:优先 Redis降级内存"""
try:
from src.config.unified_config import get_config
config = get_config()
if config.redis.enabled:
import redis
client = redis.Redis(
host=config.redis.host,
port=config.redis.port,
password=config.redis.password,
db=config.redis.db,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2
)
client.ping()
logger.info("会话存储使用 Redis")
return RedisSessionStore(client)
except Exception as e:
logger.info(f"Redis 不可用({e}),会话存储使用内存")
return MemorySessionStore()