refactor: 第二轮架构缺陷修复 (1/2/3/4/9/10)

1. 内存泄漏修复:RealtimeChatManager 添加会话自动清理机制
   - 每10次操作检查超时会话(1小时无活动自动清理)
   - 最大活跃会话数限制500,超限清理最旧会话

2. 数据库索引补全:
   - Conversation: session_id, work_order_id 添加索引
   - WorkOrder: status 添加索引
   - ChatSession: user_id 添加索引
   - KnowledgeEntry: category, is_active, is_verified 添加索引

3. ServiceManager 线程安全:
   - 添加 threading.Lock 双重检查锁
   - 防止多线程并发初始化同一服务

4. API 响应格式统一:
   - 新增 api_response() 标准响应函数
   - 统一格式: {success, message, data} / {success, error}

9. asyncio 误用修复:
   - knowledge.py 文件上传改用安全的 asyncio 调用方式
   - 兼容已有事件循环和无事件循环两种场景

10. 请求限流:
    - 新增 rate_limit 装饰器(按 IP 限流)
    - chat/message 限制 20次/分钟
    - workorder/ai-suggestion 限制 5次/分钟
This commit is contained in:
2026-04-02 22:37:44 +08:00
parent 587933f668
commit 7151070c99
11 changed files with 138 additions and 38 deletions

View File

@@ -167,17 +167,17 @@ app.run(...)
服务运行时会实时输出日志:
```
📨 [Feishu LongConn] 收到消息
[Feishu LongConn] 收到消息
- 消息ID: om_xxxxxxxxxxxxx
- 群聊ID: oc_xxxxxxxxxxxxx
- 发送者: ou_xxxxxxxxxxxxx
- 消息类型: text
- 原始内容: @TSP助手 车辆无法连接网络
- 清理后内容: 车辆无法连接网络
🔑 会话用户标识: feishu_oc_xxxxxxxxxxxxx_ou_xxxxxxxxxxxxx
会话用户标识: feishu_oc_xxxxxxxxxxxxx_ou_xxxxxxxxxxxxx
为用户 ou_xxxxxxxxxxxxx 在群聊 oc_xxxxxxxxxxxxx 创建新会话: session_xxxxxxxxxxxxx
🤖 调用 TSP Assistant 处理消息...
📤 准备发送回复 (长度: 156)
准备发送回复 (长度: 156)
成功回复消息: om_xxxxxxxxxxxxx
```

View File

@@ -49,7 +49,7 @@ class WorkOrder(Base):
description = Column(Text, nullable=False)
category = Column(String(100), nullable=False)
priority = Column(String(20), nullable=False)
status = Column(String(20), nullable=False)
status = Column(String(20), nullable=False, index=True)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
resolution = Column(Text)
@@ -97,8 +97,8 @@ class ChatSession(Base):
id = Column(Integer, primary_key=True)
tenant_id = Column(String(50), nullable=False, default=DEFAULT_TENANT, index=True)
session_id = Column(String(100), unique=True, nullable=False) # 唯一会话标识
user_id = Column(String(100), nullable=True) # 用户标识
session_id = Column(String(100), unique=True, nullable=False)
user_id = Column(String(100), nullable=True, index=True)
work_order_id = Column(Integer, ForeignKey("work_orders.id"), nullable=True)
title = Column(String(200), nullable=True) # 会话标题(取首条消息摘要)
status = Column(String(20), default="active") # active, ended
@@ -135,8 +135,8 @@ class Conversation(Base):
id = Column(Integer, primary_key=True)
tenant_id = Column(String(50), nullable=False, default=DEFAULT_TENANT, index=True)
session_id = Column(String(100), ForeignKey("chat_sessions.session_id"), nullable=True) # 关联会话
work_order_id = Column(Integer, ForeignKey("work_orders.id"))
session_id = Column(String(100), ForeignKey("chat_sessions.session_id"), nullable=True, index=True)
work_order_id = Column(Integer, ForeignKey("work_orders.id"), index=True)
user_message = Column(Text, nullable=False)
assistant_response = Column(Text, nullable=False)
timestamp = Column(DateTime, default=datetime.now)
@@ -162,13 +162,13 @@ class KnowledgeEntry(Base):
tenant_id = Column(String(50), nullable=False, default=DEFAULT_TENANT, index=True)
question = Column(Text, nullable=False)
answer = Column(Text, nullable=False)
category = Column(String(100), nullable=False)
category = Column(String(100), nullable=False, index=True)
confidence_score = Column(Float, default=0.0)
usage_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
is_active = Column(Boolean, default=True)
is_verified = Column(Boolean, default=False) # 是否已验证
is_active = Column(Boolean, default=True, index=True)
is_verified = Column(Boolean, default=False, index=True)
verified_by = Column(String(100)) # 验证人
verified_at = Column(DateTime) # 验证时间
vector_embedding = Column(Text) # 向量嵌入的JSON字符串

View File

@@ -32,15 +32,49 @@ class ChatMessage:
class RealtimeChatManager:
"""实时对话管理器"""
# 会话超时时间(秒):超过此时间无活动的会话自动清理
SESSION_TIMEOUT = 3600 # 1小时
# 最大同时活跃会话数
MAX_ACTIVE_SESSIONS = 500
def __init__(self):
self.llm_client = QwenClient()
self.knowledge_manager = KnowledgeManager()
self.vehicle_manager = VehicleDataManager()
self.active_sessions = {} # 存储活跃的对话会话
self.message_history = {} # 存储消息历史
self.active_sessions = {}
self.message_history = {}
self._cleanup_counter = 0
def _cleanup_expired_sessions(self):
"""清理超时的会话,每 10 次操作触发一次检查"""
self._cleanup_counter += 1
if self._cleanup_counter < 10:
return
self._cleanup_counter = 0
now = datetime.now()
expired = [
sid for sid, s in self.active_sessions.items()
if (now - s.get("last_activity", now)).total_seconds() > self.SESSION_TIMEOUT
]
for sid in expired:
self.active_sessions.pop(sid, None)
self.message_history.pop(sid, None)
if expired:
logger.info(f"清理了 {len(expired)} 个超时会话,当前活跃: {len(self.active_sessions)}")
# 如果超过上限,清理最旧的
if len(self.active_sessions) > self.MAX_ACTIVE_SESSIONS:
sorted_sessions = sorted(self.active_sessions.items(), key=lambda x: x[1].get("last_activity", datetime.min))
to_remove = len(self.active_sessions) - self.MAX_ACTIVE_SESSIONS
for sid, _ in sorted_sessions[:to_remove]:
self.active_sessions.pop(sid, None)
self.message_history.pop(sid, None)
logger.warning(f"会话数超限,清理了 {to_remove} 个最旧会话")
def create_session(self, user_id: str, work_order_id: Optional[int] = None, tenant_id: Optional[str] = None) -> str:
"""创建新的对话会话"""
self._cleanup_expired_sessions()
session_id = f"session_{user_id}_{int(time.time())}"
session_data = {

View File

@@ -59,7 +59,7 @@ class FeishuLongConnService:
"""收到消息事件,立即提交到线程池,确保 SDK 回调快速返回"""
self._msg_count += 1
msg_no = self._msg_count
logger.info(f"📨 收到飞书消息事件 #{msg_no},提交到处理队列")
logger.info(f" 收到飞书消息事件 #{msg_no},提交到处理队列")
self._executor.submit(self._process_message_safe, data, msg_no)
def _process_message_safe(self, data: P2ImMessageReceiveV1, msg_no: int) -> None:
@@ -81,7 +81,7 @@ class FeishuLongConnService:
Args:
data: 飞书消息事件数据
"""
logger.info("📨 收到飞书消息事件!")
logger.info(" 收到飞书消息事件!")
try:
# 提取消息信息
@@ -103,9 +103,9 @@ class FeishuLongConnService:
if not sender_attrs:
# 尝试 __dict__ 或 dir
sender_attrs = {k: v for k, v in vars(sender_id_obj).items() if v and not k.startswith('_')} if hasattr(sender_id_obj, '__dict__') else {}
logger.info(f"🔍 sender_id 属性: {sender_attrs}")
logger.info(f" sender_id 属性: {sender_attrs}")
except Exception as e:
logger.warning(f"🔍 无法解析 sender_id 属性: {e}, type={type(sender_id_obj)}")
logger.warning(f" 无法解析 sender_id 属性: {e}, type={type(sender_id_obj)}")
sender_attrs = {}
sender_open_id = getattr(sender_id_obj, 'open_id', '') or ''
@@ -131,7 +131,7 @@ class FeishuLongConnService:
# 详细日志记录
chat_type_desc = '群聊(group)' if chat_type == 'group' else '私聊(p2p)' if chat_type == 'p2p' else chat_type
logger.info(f"📨 [长连接] 发送者={sender_name}({sender_id}), 群={chat_id}, 类型={chat_type_desc}, 消息ID={message_id}")
logger.info(f" [长连接] 发送者={sender_name}({sender_id}), 群={chat_id}, 类型={chat_type_desc}, 消息ID={message_id}")
# 消息去重检查
if cache_manager.check_and_set_message_processed(message_id):
@@ -177,12 +177,12 @@ class FeishuLongConnService:
# 构造会话用户ID群聊隔离
session_user_id = f"feishu_{chat_id}_{sender_id}"
logger.info(f"🔑 会话标识: {session_user_id}")
logger.info(f" 会话标识: {session_user_id}")
# 解析租户:根据 chat_id 查找绑定的租户
from src.web.blueprints.tenants import resolve_tenant_by_chat_id
tenant_id = resolve_tenant_by_chat_id(chat_id)
logger.info(f"🏢{chat_id} 对应租户: {tenant_id}")
logger.info(f"{chat_id} 对应租户: {tenant_id}")
# 获取或创建会话
chat_manager = service_manager.get_chat_manager()
@@ -231,7 +231,7 @@ class FeishuLongConnService:
if not isinstance(reply_text, str):
reply_text = str(reply_text)
logger.info(f"📤 准备发送回复 (长度: {len(reply_text)})")
logger.info(f" 准备发送回复 (长度: {len(reply_text)})")
logger.info(f" 内容预览: {reply_text[:100]}...")
# 发送回复

View File

@@ -6,6 +6,7 @@
import logging
from flask import Blueprint, request, jsonify, Response
from src.web.service_manager import service_manager
from src.web.decorators import rate_limit
logger = logging.getLogger(__name__)
@@ -27,6 +28,7 @@ def create_session():
@chat_bp.route('/message', methods=['POST'])
@rate_limit(max_calls=20, period=60)
def send_message():
"""发送聊天消息"""
try:

View File

@@ -121,7 +121,7 @@ def _process_message_in_background(app, event_data: dict):
except Exception as e:
logger.warning(f"[Feishu Bot] 获取发送者信息失败: {e}")
logger.info(f"[Feishu Bot] 📨 消息详情: 发送者={sender_name}({sender_id}), 群={chat_id}, 类型={chat_type_desc}, 租户={tenant_id}, 消息ID={message_id}")
logger.info(f"[Feishu Bot] 消息详情: 发送者={sender_name}({sender_id}), 群={chat_id}, 类型={chat_type_desc}, 租户={tenant_id}, 消息ID={message_id}")
logger.info(f"[Feishu Bot] 📝 消息内容: '{text_content}'")
# 群聊隔离:每个用户在每个群都有独立会话

View File

@@ -118,13 +118,19 @@ def upload_knowledge_file():
file.save(temp_path)
tenant_id = request.form.get('tenant_id')
assistant = get_agent_assistant()
# 由于process_file_to_knowledge现在是异步的,我们需要同步调用
# 或者将整个视图函数改为异步Flask 2.0+支持)
# process_file_to_knowledge 是异步方法,在同步 Flask 中安全调用
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(assistant.process_file_to_knowledge(temp_path, file.filename, tenant_id=tenant_id))
loop.close()
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果已有事件循环在运行(如在某些 ASGI 环境下),用线程池
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
result = pool.submit(asyncio.run, assistant.process_file_to_knowledge(temp_path, file.filename, tenant_id=tenant_id)).result()
else:
result = loop.run_until_complete(assistant.process_file_to_knowledge(temp_path, file.filename, tenant_id=tenant_id))
except RuntimeError:
result = asyncio.run(assistant.process_file_to_knowledge(temp_path, file.filename, tenant_id=tenant_id))
cache_manager.clear()

View File

@@ -18,6 +18,7 @@ logger = logging.getLogger(__name__)
# 移除SimpleAIAccuracyConfig直接从统一配置获取
from src.config.unified_config import get_config
from src.web.decorators import rate_limit
from src.main import TSPAssistant
from src.core.database import db_manager
@@ -312,6 +313,7 @@ def generate_ai_suggestion():
@workorders_bp.route('/<int:workorder_id>/ai-suggestion', methods=['POST'])
@rate_limit(max_calls=5, period=60)
def generate_workorder_ai_suggestion(workorder_id):
"""根据工单描述与知识库生成AI建议草稿"""
try:

View File

@@ -5,6 +5,7 @@
"""
from functools import wraps
import threading
from flask import jsonify
from src.web.service_manager import service_manager
@@ -125,3 +126,38 @@ def resolve_tenant_id(source='auto'):
return f(*args, **kwargs)
return decorated_function
return decorator
# 简易内存限流器
_rate_limit_store = {}
_rate_limit_lock = threading.Lock()
def rate_limit(max_calls=10, period=60):
"""
简易请求限流装饰器。
max_calls: 时间窗口内最大请求数
period: 时间窗口(秒)
按 IP 地址限流。
"""
import time as _time
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
from flask import request as req
client_ip = req.remote_addr or 'unknown'
key = f"{f.__name__}:{client_ip}"
now = _time.time()
with _rate_limit_lock:
if key not in _rate_limit_store:
_rate_limit_store[key] = []
# 清理过期记录
_rate_limit_store[key] = [t for t in _rate_limit_store[key] if now - t < period]
if len(_rate_limit_store[key]) >= max_calls:
return jsonify({"success": False, "error": "请求过于频繁,请稍后再试"}), 429
_rate_limit_store[key].append(now)
return f(*args, **kwargs)
return decorated_function
return decorator

View File

@@ -72,3 +72,18 @@ def create_success_response(data: Any = None, message: str = "操作成功") ->
if data is not None:
response["data"] = data
return response
def api_response(data=None, message="操作成功", status=200):
"""
统一 API 响应格式。所有 API 应使用此函数返回响应。
成功: {"success": true, "message": "...", "data": {...}}
失败: {"success": false, "error": "..."}
"""
if status >= 400:
return jsonify({"success": False, "error": message}), status
body = {"success": True, "message": message}
if data is not None:
body["data"] = data
return jsonify(body), status

View File

@@ -6,26 +6,31 @@
from typing import Optional, Dict, Any
import logging
import threading
logger = logging.getLogger(__name__)
class ServiceManager:
"""服务管理器 - 统一管理各种服务的懒加载实例"""
"""服务管理器 - 统一管理各种服务的懒加载实例(线程安全)"""
def __init__(self):
self._services: Dict[str, Any] = {}
self._lock = threading.Lock()
def get_service(self, service_name: str, factory_func):
"""获取服务实例(懒加载)"""
if service_name not in self._services:
try:
self._services[service_name] = factory_func()
logger.debug(f"服务 {service_name} 已初始化")
except Exception as e:
logger.error(f"初始化服务 {service_name} 失败: {e}")
raise
return self._services[service_name]
"""获取服务实例(懒加载,双重检查锁"""
if service_name in self._services:
return self._services[service_name]
with self._lock:
if service_name not in self._services:
try:
self._services[service_name] = factory_func()
logger.debug(f"服务 {service_name} 已初始化")
except Exception as e:
logger.error(f"初始化服务 {service_name} 失败: {e}")
raise
return self._services[service_name]
def get_assistant(self):
"""获取TSP助手实例legacy facade新代码应直接使用具体 manager"""