From a1f435ef140eb2d3a2e6c048d25dcc6095ef57b9 Mon Sep 17 00:00:00 2001 From: Jeason <1710884619@qq.com> Date: Wed, 8 Apr 2026 09:22:18 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20Pipeline=20=E5=88=9D=E5=A7=8B=E5=8C=96?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=E5=90=8E=E5=8F=B0=E7=BA=BF=E7=A8=8B=EF=BC=8C?= =?UTF-8?q?=E4=B8=8D=E9=98=BB=E5=A1=9E=E9=A3=9E=E4=B9=A6=E9=95=BF=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E5=90=AF=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pipeline 预初始化移到后台线程(不阻塞 FeishuLongConnService.__init__) - 消息处理时等待 Pipeline 就绪(最多 10 秒),超时则即时初始化 - Redis socket_connect_timeout 从 2s 降为 1s - 解决 Redis 连接慢导致整个飞书服务卡死的问题 --- src/core/session_store.py | 5 ++-- src/integrations/feishu_longconn_service.py | 29 ++++++++++++++------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/core/session_store.py b/src/core/session_store.py index 7860ca8..97dc9fb 100644 --- a/src/core/session_store.py +++ b/src/core/session_store.py @@ -187,10 +187,11 @@ def create_session_store() -> SessionStore: password=config.redis.password, db=config.redis.db, decode_responses=True, - socket_connect_timeout=2, - socket_timeout=2 + socket_connect_timeout=1, + socket_timeout=1 ) client.ping() + logger.info(f"会话存储使用 Redis ({config.redis.host})") return RedisSessionStore(client) except Exception as e: logger.info(f"Redis 不可用({e}),会话存储使用内存") diff --git a/src/integrations/feishu_longconn_service.py b/src/integrations/feishu_longconn_service.py index e5cc7ec..6fac812 100644 --- a/src/integrations/feishu_longconn_service.py +++ b/src/integrations/feishu_longconn_service.py @@ -55,13 +55,16 @@ class FeishuLongConnService: self._executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="feishu_msg") self._msg_count = 0 - # 预初始化 Pipeline(避免首条消息时懒加载导致超时) - try: - self._pipeline = service_manager.get_pipeline() - logger.info("✅ MessagePipeline 预初始化成功") - except Exception as e: - logger.error(f"❌ MessagePipeline 预初始化失败: {e}", exc_info=True) - self._pipeline = None + # 预初始化 Pipeline(在后台线程,不阻塞启动) + self._pipeline = None + def _init_pipeline(): + try: + self._pipeline = service_manager.get_pipeline() + logger.info("✅ MessagePipeline 预初始化成功") + except Exception as e: + logger.error(f"❌ MessagePipeline 预初始化失败: {e}", exc_info=True) + import threading as _threading + _threading.Thread(target=_init_pipeline, daemon=True).start() def _handle_message(self, data: P2ImMessageReceiveV1) -> None: """收到消息事件,立即提交到线程池,确保 SDK 回调快速返回""" @@ -176,8 +179,16 @@ class FeishuLongConnService: # 使用 Pipeline 统一处理消息 session_user_id = f"feishu_{chat_id}_{sender_id}" try: - logger.info(f"正在获取 Pipeline...") - pipeline = self._pipeline or service_manager.get_pipeline() + # 等待 Pipeline 就绪(最多 10 秒) + import time as _wait_time + for _ in range(100): + if self._pipeline: + break + _wait_time.sleep(0.1) + pipeline = self._pipeline + if not pipeline: + logger.warning("Pipeline 未就绪,尝试即时初始化") + pipeline = service_manager.get_pipeline() logger.info(f"Pipeline 开始处理: user={session_user_id}") response_data = pipeline.handle_message( user_id=session_user_id,