diff --git a/.env b/.env index 811f54c..e1ba196 100644 --- a/.env +++ b/.env @@ -17,10 +17,8 @@ WEBSOCKET_PORT=8765 # Set to "True" for development to enable debug mode and auto-reloading. # Set to "False" for production. DEBUG_MODE=False - # Logging level for the application. Options: DEBUG, INFO, WARNING, ERROR, CRITICAL LOG_LEVEL=INFO - # 租户标识 — 多项目共用同一套代码时,用不同的 TENANT_ID 隔离数据 TENANT_ID=default @@ -42,7 +40,7 @@ DATABASE_URL=sqlite:///./data/tsp_assistant.db # LARGE LANGUAGE MODEL (LLM) CONFIGURATION # ============================================================================ # The provider of the LLM. Supported: "qwen", "openai", "anthropic" -LLM_PROVIDER=qwen +LLM_PROVIDER=xiaomi # The API key for your chosen LLM provider. LLM_API_KEY=sk-Gce85QLROESeOWf3icd2mQnYHOrmMYojwVPQ0AubMjGQ5ZE2 diff --git a/src/core/session_store.py b/src/core/session_store.py new file mode 100644 index 0000000..4cd59db --- /dev/null +++ b/src/core/session_store.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +""" +会话存储抽象层 +支持内存存储(单进程)和 Redis 存储(多进程)。 +Redis 不可用时自动降级为内存存储。 +""" +import json +import logging +import time +from typing import Any, Dict, List, Optional +from datetime import datetime + +logger = logging.getLogger(__name__) + + +class SessionStore: + """会话存储接口""" + + def get(self, session_id: str) -> Optional[Dict]: + raise NotImplementedError + + def set(self, session_id: str, data: Dict, ttl: int = 3600): + raise NotImplementedError + + def delete(self, session_id: str): + raise NotImplementedError + + def exists(self, session_id: str) -> bool: + raise NotImplementedError + + def list_all(self) -> List[Dict]: + raise NotImplementedError + + def count(self) -> int: + raise NotImplementedError + + def check_and_set_dedup(self, message_id: str, ttl: int = 300) -> bool: + """消息去重:返回 True 表示已处理(重复),False 表示新消息""" + raise NotImplementedError + + +class MemorySessionStore(SessionStore): + """内存会话存储(单进程,默认)""" + + def __init__(self, max_size: int = 500): + self._store: Dict[str, Dict] = {} + self._dedup: Dict[str, float] = {} + self._max_size = max_size + + def get(self, session_id: str) -> Optional[Dict]: + item = self._store.get(session_id) + if item and item.get('_expires_at', float('inf')) > time.time(): + return item + if item: + del self._store[session_id] + return None + + def set(self, session_id: str, data: Dict, ttl: int = 3600): + data['_expires_at'] = time.time() + ttl + self._store[session_id] = data + self._evict_if_needed() + + def delete(self, session_id: str): + self._store.pop(session_id, None) + + def exists(self, session_id: str) -> bool: + return self.get(session_id) is not None + + def list_all(self) -> List[Dict]: + now = time.time() + result = [] + expired = [] + for sid, data in self._store.items(): + if data.get('_expires_at', float('inf')) > now: + result.append({**data, 'session_id': sid}) + else: + expired.append(sid) + for sid in expired: + del self._store[sid] + return result + + def count(self) -> int: + return len(self._store) + + def check_and_set_dedup(self, message_id: str, ttl: int = 300) -> bool: + now = time.time() + # 清理过期 + self._dedup = {k: v for k, v in self._dedup.items() if v > now} + if message_id in self._dedup: + return True + self._dedup[message_id] = now + ttl + return False + + def _evict_if_needed(self): + if len(self._store) > self._max_size: + sorted_items = sorted(self._store.items(), key=lambda x: x[1].get('_expires_at', 0)) + to_remove = len(self._store) - self._max_size + for sid, _ in sorted_items[:to_remove]: + del self._store[sid] + + +class RedisSessionStore(SessionStore): + """Redis 会话存储(多进程共享)""" + + PREFIX = "session:" + DEDUP_PREFIX = "dedup:" + + def __init__(self, redis_client): + self._redis = redis_client + + def get(self, session_id: str) -> Optional[Dict]: + try: + data = self._redis.get(f"{self.PREFIX}{session_id}") + if data: + return json.loads(data) + except Exception as e: + logger.warning(f"Redis get session 失败: {e}") + return None + + def set(self, session_id: str, data: Dict, ttl: int = 3600): + try: + # datetime 不能直接 JSON 序列化 + serializable = {} + for k, v in data.items(): + if isinstance(v, datetime): + serializable[k] = v.isoformat() + else: + serializable[k] = v + self._redis.setex(f"{self.PREFIX}{session_id}", ttl, json.dumps(serializable, ensure_ascii=False)) + except Exception as e: + logger.warning(f"Redis set session 失败: {e}") + + def delete(self, session_id: str): + try: + self._redis.delete(f"{self.PREFIX}{session_id}") + except Exception as e: + logger.warning(f"Redis delete session 失败: {e}") + + def exists(self, session_id: str) -> bool: + try: + return bool(self._redis.exists(f"{self.PREFIX}{session_id}")) + except Exception: + return False + + def list_all(self) -> List[Dict]: + try: + keys = self._redis.keys(f"{self.PREFIX}*") + result = [] + for key in keys: + data = self._redis.get(key) + if data: + item = json.loads(data) + item['session_id'] = key.replace(self.PREFIX, '', 1) + result.append(item) + return result + except Exception as e: + logger.warning(f"Redis list sessions 失败: {e}") + return [] + + def count(self) -> int: + try: + return len(self._redis.keys(f"{self.PREFIX}*")) + except Exception: + return 0 + + def check_and_set_dedup(self, message_id: str, ttl: int = 300) -> bool: + try: + key = f"{self.DEDUP_PREFIX}{message_id}" + # SETNX: 如果 key 不存在则设置,返回 True;已存在返回 False + was_set = self._redis.set(key, "1", nx=True, ex=ttl) + return not was_set # was_set=True 表示新消息,返回 False + except Exception as e: + logger.warning(f"Redis dedup 失败: {e}") + return False + + +def create_session_store() -> SessionStore: + """创建会话存储实例:优先 Redis,降级内存""" + try: + from src.config.unified_config import get_config + config = get_config() + if config.redis.enabled: + import redis + client = redis.Redis( + host=config.redis.host, + port=config.redis.port, + password=config.redis.password, + db=config.redis.db, + decode_responses=True, + socket_connect_timeout=2, + socket_timeout=2 + ) + client.ping() + logger.info("会话存储使用 Redis") + return RedisSessionStore(client) + except Exception as e: + logger.info(f"Redis 不可用({e}),会话存储使用内存") + return MemorySessionStore() diff --git a/src/dialogue/realtime_chat.py b/src/dialogue/realtime_chat.py index 481b8bd..8de2fef 100644 --- a/src/dialogue/realtime_chat.py +++ b/src/dialogue/realtime_chat.py @@ -32,15 +32,17 @@ class ChatMessage: class RealtimeChatManager: """实时对话管理器""" - # 会话超时时间(秒):超过此时间无活动的会话自动清理 - SESSION_TIMEOUT = 3600 # 1小时 - # 最大同时活跃会话数 + SESSION_TIMEOUT = 3600 MAX_ACTIVE_SESSIONS = 500 def __init__(self): self.llm_client = QwenClient() self.knowledge_manager = KnowledgeManager() self.vehicle_manager = VehicleDataManager() + # 会话存储:优先 Redis,降级内存 + from src.core.session_store import create_session_store + self.session_store = create_session_store() + # 内存中的会话数据和消息历史(用于快速访问上下文) self.active_sessions = {} self.message_history = {} self._cleanup_counter = 0 @@ -90,6 +92,12 @@ class RealtimeChatManager: self.active_sessions[session_id] = session_data self.message_history[session_id] = [] + # 写入 session store(Redis 或内存) + self.session_store.set(session_id, { + "user_id": user_id, "work_order_id": work_order_id, + "tenant_id": tenant_id, "created_at": datetime.now().isoformat() + }, ttl=self.SESSION_TIMEOUT) + # 持久化会话到数据库 try: from src.core.models import DEFAULT_TENANT @@ -655,12 +663,12 @@ class RealtimeChatManager: except Exception as e: logger.warning(f"更新会话状态失败: {e}") - # 清理内存 + # 清理内存和 session store if session_id in self.active_sessions: del self.active_sessions[session_id] - if session_id in self.message_history: del self.message_history[session_id] + self.session_store.delete(session_id) logger.info(f"结束会话: {session_id}") return True @@ -670,16 +678,32 @@ class RealtimeChatManager: return False def get_active_sessions(self) -> List[Dict[str, Any]]: - """获取活跃会话列表""" + """获取活跃会话列表(合并内存和 session store)""" sessions = [] + seen = set() + # 内存中的会话 for session_id, session_data in self.active_sessions.items(): + seen.add(session_id) sessions.append({ "session_id": session_id, - "user_id": session_data["user_id"], - "work_order_id": session_data["work_order_id"], - "created_at": session_data["created_at"].isoformat(), - "last_activity": session_data["last_activity"].isoformat(), - "message_count": session_data["message_count"] + "user_id": session_data.get("user_id"), + "work_order_id": session_data.get("work_order_id"), + "tenant_id": session_data.get("tenant_id"), + "created_at": session_data["created_at"].isoformat() if isinstance(session_data.get("created_at"), datetime) else session_data.get("created_at", ""), + "last_activity": session_data["last_activity"].isoformat() if isinstance(session_data.get("last_activity"), datetime) else session_data.get("last_activity", ""), + "message_count": session_data.get("message_count", 0) }) - + # session store 中的会话(可能来自其他进程) + for item in self.session_store.list_all(): + sid = item.get('session_id', '') + if sid and sid not in seen: + sessions.append({ + "session_id": sid, + "user_id": item.get("user_id"), + "work_order_id": item.get("work_order_id"), + "tenant_id": item.get("tenant_id"), + "created_at": item.get("created_at", ""), + "last_activity": item.get("last_activity", item.get("created_at", "")), + "message_count": item.get("message_count", 0) + }) return sessions