From 7151070c992e634693977b4896d59d3c5dc2d6e9 Mon Sep 17 00:00:00 2001 From: Jeason <1710884619@qq.com> Date: Thu, 2 Apr 2026 22:37:44 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=AC=AC=E4=BA=8C=E8=BD=AE?= =?UTF-8?q?=E6=9E=B6=E6=9E=84=E7=BC=BA=E9=99=B7=E4=BF=AE=E5=A4=8D=20(1/2/3?= =?UTF-8?q?/4/9/10)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 内存泄漏修复:RealtimeChatManager 添加会话自动清理机制 - 每10次操作检查超时会话(1小时无活动自动清理) - 最大活跃会话数限制500,超限清理最旧会话 2. 数据库索引补全: - Conversation: session_id, work_order_id 添加索引 - WorkOrder: status 添加索引 - ChatSession: user_id 添加索引 - KnowledgeEntry: category, is_active, is_verified 添加索引 3. ServiceManager 线程安全: - 添加 threading.Lock 双重检查锁 - 防止多线程并发初始化同一服务 4. API 响应格式统一: - 新增 api_response() 标准响应函数 - 统一格式: {success, message, data} / {success, error} 9. asyncio 误用修复: - knowledge.py 文件上传改用安全的 asyncio 调用方式 - 兼容已有事件循环和无事件循环两种场景 10. 请求限流: - 新增 rate_limit 装饰器(按 IP 限流) - chat/message 限制 20次/分钟 - workorder/ai-suggestion 限制 5次/分钟 --- docs/FEISHU_LONGCONN.md | 6 ++-- src/core/models.py | 16 ++++----- src/dialogue/realtime_chat.py | 38 +++++++++++++++++++-- src/integrations/feishu_longconn_service.py | 16 ++++----- src/web/blueprints/chat.py | 2 ++ src/web/blueprints/feishu_bot.py | 2 +- src/web/blueprints/knowledge.py | 18 ++++++---- src/web/blueprints/workorders.py | 2 ++ src/web/decorators.py | 36 +++++++++++++++++++ src/web/error_handlers.py | 15 ++++++++ src/web/service_manager.py | 25 ++++++++------ 11 files changed, 138 insertions(+), 38 deletions(-) diff --git a/docs/FEISHU_LONGCONN.md b/docs/FEISHU_LONGCONN.md index d7fcc19..dec00f9 100644 --- a/docs/FEISHU_LONGCONN.md +++ b/docs/FEISHU_LONGCONN.md @@ -167,17 +167,17 @@ app.run(...) 服务运行时会实时输出日志: ``` -📨 [Feishu LongConn] 收到消息 + [Feishu LongConn] 收到消息 - 消息ID: om_xxxxxxxxxxxxx - 群聊ID: oc_xxxxxxxxxxxxx - 发送者: ou_xxxxxxxxxxxxx - 消息类型: text - 原始内容: @TSP助手 车辆无法连接网络 - 清理后内容: 车辆无法连接网络 -🔑 会话用户标识: feishu_oc_xxxxxxxxxxxxx_ou_xxxxxxxxxxxxx + 会话用户标识: feishu_oc_xxxxxxxxxxxxx_ou_xxxxxxxxxxxxx 为用户 ou_xxxxxxxxxxxxx 在群聊 oc_xxxxxxxxxxxxx 创建新会话: session_xxxxxxxxxxxxx 🤖 调用 TSP Assistant 处理消息... -📤 准备发送回复 (长度: 156) + 准备发送回复 (长度: 156) 成功回复消息: om_xxxxxxxxxxxxx ``` diff --git a/src/core/models.py b/src/core/models.py index f4c7589..e8fde89 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -49,7 +49,7 @@ class WorkOrder(Base): description = Column(Text, nullable=False) category = Column(String(100), nullable=False) priority = Column(String(20), nullable=False) - status = Column(String(20), nullable=False) + status = Column(String(20), nullable=False, index=True) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) resolution = Column(Text) @@ -97,8 +97,8 @@ class ChatSession(Base): id = Column(Integer, primary_key=True) tenant_id = Column(String(50), nullable=False, default=DEFAULT_TENANT, index=True) - session_id = Column(String(100), unique=True, nullable=False) # 唯一会话标识 - user_id = Column(String(100), nullable=True) # 用户标识 + session_id = Column(String(100), unique=True, nullable=False) + user_id = Column(String(100), nullable=True, index=True) work_order_id = Column(Integer, ForeignKey("work_orders.id"), nullable=True) title = Column(String(200), nullable=True) # 会话标题(取首条消息摘要) status = Column(String(20), default="active") # active, ended @@ -135,8 +135,8 @@ class Conversation(Base): id = Column(Integer, primary_key=True) tenant_id = Column(String(50), nullable=False, default=DEFAULT_TENANT, index=True) - session_id = Column(String(100), ForeignKey("chat_sessions.session_id"), nullable=True) # 关联会话 - work_order_id = Column(Integer, ForeignKey("work_orders.id")) + session_id = Column(String(100), ForeignKey("chat_sessions.session_id"), nullable=True, index=True) + work_order_id = Column(Integer, ForeignKey("work_orders.id"), index=True) user_message = Column(Text, nullable=False) assistant_response = Column(Text, nullable=False) timestamp = Column(DateTime, default=datetime.now) @@ -162,13 +162,13 @@ class KnowledgeEntry(Base): tenant_id = Column(String(50), nullable=False, default=DEFAULT_TENANT, index=True) question = Column(Text, nullable=False) answer = Column(Text, nullable=False) - category = Column(String(100), nullable=False) + category = Column(String(100), nullable=False, index=True) confidence_score = Column(Float, default=0.0) usage_count = Column(Integer, default=0) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) - is_active = Column(Boolean, default=True) - is_verified = Column(Boolean, default=False) # 是否已验证 + is_active = Column(Boolean, default=True, index=True) + is_verified = Column(Boolean, default=False, index=True) verified_by = Column(String(100)) # 验证人 verified_at = Column(DateTime) # 验证时间 vector_embedding = Column(Text) # 向量嵌入的JSON字符串 diff --git a/src/dialogue/realtime_chat.py b/src/dialogue/realtime_chat.py index e517f61..724bc05 100644 --- a/src/dialogue/realtime_chat.py +++ b/src/dialogue/realtime_chat.py @@ -32,15 +32,49 @@ class ChatMessage: class RealtimeChatManager: """实时对话管理器""" + # 会话超时时间(秒):超过此时间无活动的会话自动清理 + SESSION_TIMEOUT = 3600 # 1小时 + # 最大同时活跃会话数 + MAX_ACTIVE_SESSIONS = 500 + def __init__(self): self.llm_client = QwenClient() self.knowledge_manager = KnowledgeManager() self.vehicle_manager = VehicleDataManager() - self.active_sessions = {} # 存储活跃的对话会话 - self.message_history = {} # 存储消息历史 + self.active_sessions = {} + self.message_history = {} + self._cleanup_counter = 0 + + def _cleanup_expired_sessions(self): + """清理超时的会话,每 10 次操作触发一次检查""" + self._cleanup_counter += 1 + if self._cleanup_counter < 10: + return + self._cleanup_counter = 0 + + now = datetime.now() + expired = [ + sid for sid, s in self.active_sessions.items() + if (now - s.get("last_activity", now)).total_seconds() > self.SESSION_TIMEOUT + ] + for sid in expired: + self.active_sessions.pop(sid, None) + self.message_history.pop(sid, None) + if expired: + logger.info(f"清理了 {len(expired)} 个超时会话,当前活跃: {len(self.active_sessions)}") + + # 如果超过上限,清理最旧的 + if len(self.active_sessions) > self.MAX_ACTIVE_SESSIONS: + sorted_sessions = sorted(self.active_sessions.items(), key=lambda x: x[1].get("last_activity", datetime.min)) + to_remove = len(self.active_sessions) - self.MAX_ACTIVE_SESSIONS + for sid, _ in sorted_sessions[:to_remove]: + self.active_sessions.pop(sid, None) + self.message_history.pop(sid, None) + logger.warning(f"会话数超限,清理了 {to_remove} 个最旧会话") def create_session(self, user_id: str, work_order_id: Optional[int] = None, tenant_id: Optional[str] = None) -> str: """创建新的对话会话""" + self._cleanup_expired_sessions() session_id = f"session_{user_id}_{int(time.time())}" session_data = { diff --git a/src/integrations/feishu_longconn_service.py b/src/integrations/feishu_longconn_service.py index 5b8b311..6f7d1c7 100644 --- a/src/integrations/feishu_longconn_service.py +++ b/src/integrations/feishu_longconn_service.py @@ -59,7 +59,7 @@ class FeishuLongConnService: """收到消息事件,立即提交到线程池,确保 SDK 回调快速返回""" self._msg_count += 1 msg_no = self._msg_count - logger.info(f"📨 收到飞书消息事件 #{msg_no},提交到处理队列") + logger.info(f" 收到飞书消息事件 #{msg_no},提交到处理队列") self._executor.submit(self._process_message_safe, data, msg_no) def _process_message_safe(self, data: P2ImMessageReceiveV1, msg_no: int) -> None: @@ -81,7 +81,7 @@ class FeishuLongConnService: Args: data: 飞书消息事件数据 """ - logger.info("📨 收到飞书消息事件!") + logger.info(" 收到飞书消息事件!") try: # 提取消息信息 @@ -103,9 +103,9 @@ class FeishuLongConnService: if not sender_attrs: # 尝试 __dict__ 或 dir sender_attrs = {k: v for k, v in vars(sender_id_obj).items() if v and not k.startswith('_')} if hasattr(sender_id_obj, '__dict__') else {} - logger.info(f"🔍 sender_id 属性: {sender_attrs}") + logger.info(f" sender_id 属性: {sender_attrs}") except Exception as e: - logger.warning(f"🔍 无法解析 sender_id 属性: {e}, type={type(sender_id_obj)}") + logger.warning(f" 无法解析 sender_id 属性: {e}, type={type(sender_id_obj)}") sender_attrs = {} sender_open_id = getattr(sender_id_obj, 'open_id', '') or '' @@ -131,7 +131,7 @@ class FeishuLongConnService: # 详细日志记录 chat_type_desc = '群聊(group)' if chat_type == 'group' else '私聊(p2p)' if chat_type == 'p2p' else chat_type - logger.info(f"📨 [长连接] 发送者={sender_name}({sender_id}), 群={chat_id}, 类型={chat_type_desc}, 消息ID={message_id}") + logger.info(f" [长连接] 发送者={sender_name}({sender_id}), 群={chat_id}, 类型={chat_type_desc}, 消息ID={message_id}") # 消息去重检查 if cache_manager.check_and_set_message_processed(message_id): @@ -177,12 +177,12 @@ class FeishuLongConnService: # 构造会话用户ID(群聊隔离) session_user_id = f"feishu_{chat_id}_{sender_id}" - logger.info(f"🔑 会话标识: {session_user_id}") + logger.info(f" 会话标识: {session_user_id}") # 解析租户:根据 chat_id 查找绑定的租户 from src.web.blueprints.tenants import resolve_tenant_by_chat_id tenant_id = resolve_tenant_by_chat_id(chat_id) - logger.info(f"🏢 群 {chat_id} 对应租户: {tenant_id}") + logger.info(f" 群 {chat_id} 对应租户: {tenant_id}") # 获取或创建会话 chat_manager = service_manager.get_chat_manager() @@ -231,7 +231,7 @@ class FeishuLongConnService: if not isinstance(reply_text, str): reply_text = str(reply_text) - logger.info(f"📤 准备发送回复 (长度: {len(reply_text)})") + logger.info(f" 准备发送回复 (长度: {len(reply_text)})") logger.info(f" 内容预览: {reply_text[:100]}...") # 发送回复 diff --git a/src/web/blueprints/chat.py b/src/web/blueprints/chat.py index 3794809..373ea3d 100644 --- a/src/web/blueprints/chat.py +++ b/src/web/blueprints/chat.py @@ -6,6 +6,7 @@ import logging from flask import Blueprint, request, jsonify, Response from src.web.service_manager import service_manager +from src.web.decorators import rate_limit logger = logging.getLogger(__name__) @@ -27,6 +28,7 @@ def create_session(): @chat_bp.route('/message', methods=['POST']) +@rate_limit(max_calls=20, period=60) def send_message(): """发送聊天消息""" try: diff --git a/src/web/blueprints/feishu_bot.py b/src/web/blueprints/feishu_bot.py index ba8a3f5..f683aad 100644 --- a/src/web/blueprints/feishu_bot.py +++ b/src/web/blueprints/feishu_bot.py @@ -121,7 +121,7 @@ def _process_message_in_background(app, event_data: dict): except Exception as e: logger.warning(f"[Feishu Bot] 获取发送者信息失败: {e}") - logger.info(f"[Feishu Bot] 📨 消息详情: 发送者={sender_name}({sender_id}), 群={chat_id}, 类型={chat_type_desc}, 租户={tenant_id}, 消息ID={message_id}") + logger.info(f"[Feishu Bot] 消息详情: 发送者={sender_name}({sender_id}), 群={chat_id}, 类型={chat_type_desc}, 租户={tenant_id}, 消息ID={message_id}") logger.info(f"[Feishu Bot] 📝 消息内容: '{text_content}'") # 群聊隔离:每个用户在每个群都有独立会话 diff --git a/src/web/blueprints/knowledge.py b/src/web/blueprints/knowledge.py index 93bf868..d68b8d0 100644 --- a/src/web/blueprints/knowledge.py +++ b/src/web/blueprints/knowledge.py @@ -118,13 +118,19 @@ def upload_knowledge_file(): file.save(temp_path) tenant_id = request.form.get('tenant_id') assistant = get_agent_assistant() - # 由于process_file_to_knowledge现在是异步的,我们需要同步调用它 - # 或者将整个视图函数改为异步(Flask 2.0+支持) + # process_file_to_knowledge 是异步方法,在同步 Flask 中安全调用 import asyncio - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - result = loop.run_until_complete(assistant.process_file_to_knowledge(temp_path, file.filename, tenant_id=tenant_id)) - loop.close() + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # 如果已有事件循环在运行(如在某些 ASGI 环境下),用线程池 + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as pool: + result = pool.submit(asyncio.run, assistant.process_file_to_knowledge(temp_path, file.filename, tenant_id=tenant_id)).result() + else: + result = loop.run_until_complete(assistant.process_file_to_knowledge(temp_path, file.filename, tenant_id=tenant_id)) + except RuntimeError: + result = asyncio.run(assistant.process_file_to_knowledge(temp_path, file.filename, tenant_id=tenant_id)) cache_manager.clear() diff --git a/src/web/blueprints/workorders.py b/src/web/blueprints/workorders.py index 93ed8ad..fddefc3 100644 --- a/src/web/blueprints/workorders.py +++ b/src/web/blueprints/workorders.py @@ -18,6 +18,7 @@ logger = logging.getLogger(__name__) # 移除SimpleAIAccuracyConfig,直接从统一配置获取 from src.config.unified_config import get_config +from src.web.decorators import rate_limit from src.main import TSPAssistant from src.core.database import db_manager @@ -312,6 +313,7 @@ def generate_ai_suggestion(): @workorders_bp.route('//ai-suggestion', methods=['POST']) +@rate_limit(max_calls=5, period=60) def generate_workorder_ai_suggestion(workorder_id): """根据工单描述与知识库生成AI建议草稿""" try: diff --git a/src/web/decorators.py b/src/web/decorators.py index b09eb81..fd13398 100644 --- a/src/web/decorators.py +++ b/src/web/decorators.py @@ -5,6 +5,7 @@ """ from functools import wraps +import threading from flask import jsonify from src.web.service_manager import service_manager @@ -125,3 +126,38 @@ def resolve_tenant_id(source='auto'): return f(*args, **kwargs) return decorated_function return decorator + + +# 简易内存限流器 +_rate_limit_store = {} +_rate_limit_lock = threading.Lock() + +def rate_limit(max_calls=10, period=60): + """ + 简易请求限流装饰器。 + max_calls: 时间窗口内最大请求数 + period: 时间窗口(秒) + 按 IP 地址限流。 + """ + import time as _time + + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + from flask import request as req + client_ip = req.remote_addr or 'unknown' + key = f"{f.__name__}:{client_ip}" + now = _time.time() + + with _rate_limit_lock: + if key not in _rate_limit_store: + _rate_limit_store[key] = [] + # 清理过期记录 + _rate_limit_store[key] = [t for t in _rate_limit_store[key] if now - t < period] + if len(_rate_limit_store[key]) >= max_calls: + return jsonify({"success": False, "error": "请求过于频繁,请稍后再试"}), 429 + _rate_limit_store[key].append(now) + + return f(*args, **kwargs) + return decorated_function + return decorator diff --git a/src/web/error_handlers.py b/src/web/error_handlers.py index def6221..829175d 100644 --- a/src/web/error_handlers.py +++ b/src/web/error_handlers.py @@ -72,3 +72,18 @@ def create_success_response(data: Any = None, message: str = "操作成功") -> if data is not None: response["data"] = data return response + + +def api_response(data=None, message="操作成功", status=200): + """ + 统一 API 响应格式。所有 API 应使用此函数返回响应。 + + 成功: {"success": true, "message": "...", "data": {...}} + 失败: {"success": false, "error": "..."} + """ + if status >= 400: + return jsonify({"success": False, "error": message}), status + body = {"success": True, "message": message} + if data is not None: + body["data"] = data + return jsonify(body), status diff --git a/src/web/service_manager.py b/src/web/service_manager.py index fd7f211..f0d4df3 100644 --- a/src/web/service_manager.py +++ b/src/web/service_manager.py @@ -6,26 +6,31 @@ from typing import Optional, Dict, Any import logging +import threading logger = logging.getLogger(__name__) class ServiceManager: - """服务管理器 - 统一管理各种服务的懒加载实例""" + """服务管理器 - 统一管理各种服务的懒加载实例(线程安全)""" def __init__(self): self._services: Dict[str, Any] = {} + self._lock = threading.Lock() def get_service(self, service_name: str, factory_func): - """获取服务实例(懒加载)""" - if service_name not in self._services: - try: - self._services[service_name] = factory_func() - logger.debug(f"服务 {service_name} 已初始化") - except Exception as e: - logger.error(f"初始化服务 {service_name} 失败: {e}") - raise - return self._services[service_name] + """获取服务实例(懒加载,双重检查锁)""" + if service_name in self._services: + return self._services[service_name] + with self._lock: + if service_name not in self._services: + try: + self._services[service_name] = factory_func() + logger.debug(f"服务 {service_name} 已初始化") + except Exception as e: + logger.error(f"初始化服务 {service_name} 失败: {e}") + raise + return self._services[service_name] def get_assistant(self): """获取TSP助手实例(legacy facade,新代码应直接使用具体 manager)"""