fix: 飞书长连接改用 ThreadPoolExecutor 处理消息
- 用 ThreadPoolExecutor(max_workers=5) 替代单次 threading.Thread - 支持并发处理多条消息,避免排队阻塞 - 添加消息序号日志,方便追踪消息接收和处理 - _process_message_safe 包装确保异常不会导致线程崩溃 - 如果消息确实没被 SDK 推送,日志里不会有对应的 #N 记录
This commit is contained in:
@@ -6,7 +6,9 @@
|
||||
import logging
|
||||
import json
|
||||
import threading
|
||||
import queue
|
||||
from typing import Optional
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import lark_oapi as lark
|
||||
from lark_oapi.api.im.v1 import P2ImMessageReceiveV1, ReplyMessageRequest, ReplyMessageRequestBody
|
||||
@@ -38,7 +40,7 @@ class FeishuLongConnService:
|
||||
.log_level(lark.LogLevel.DEBUG) \
|
||||
.build()
|
||||
|
||||
logger.info("✅ 飞书客户端创建成功")
|
||||
logger.info(" 飞书客户端创建成功")
|
||||
|
||||
# 创建事件处理器
|
||||
self.event_handler = lark.EventDispatcherHandler.builder(
|
||||
@@ -46,16 +48,31 @@ class FeishuLongConnService:
|
||||
).register_p2_im_message_receive_v1(self._handle_message) \
|
||||
.build()
|
||||
|
||||
logger.info("✅ 飞书事件处理器创建成功")
|
||||
logger.info("✅ 飞书长连接服务初始化完成")
|
||||
logger.info(" 飞书事件处理器创建成功")
|
||||
logger.info(" 飞书长连接服务初始化完成")
|
||||
|
||||
# 消息处理线程池(最多同时处理 5 条消息)
|
||||
self._executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="feishu_msg")
|
||||
self._msg_count = 0
|
||||
|
||||
def _handle_message(self, data: P2ImMessageReceiveV1) -> None:
|
||||
"""收到消息事件,立即派发到后台线程处理,避免阻塞 SDK 事件循环"""
|
||||
threading.Thread(
|
||||
target=self._process_message,
|
||||
args=(data,),
|
||||
daemon=True
|
||||
).start()
|
||||
"""收到消息事件,立即提交到线程池,确保 SDK 回调快速返回"""
|
||||
self._msg_count += 1
|
||||
msg_no = self._msg_count
|
||||
logger.info(f"📨 收到飞书消息事件 #{msg_no},提交到处理队列")
|
||||
self._executor.submit(self._process_message_safe, data, msg_no)
|
||||
|
||||
def _process_message_safe(self, data: P2ImMessageReceiveV1, msg_no: int) -> None:
|
||||
"""安全包装,确保异常不会导致线程崩溃"""
|
||||
try:
|
||||
self._process_message(data)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ 消息 #{msg_no} 处理异常: {e}", exc_info=True)
|
||||
try:
|
||||
mid = data.event.message.message_id
|
||||
self._reply_message(mid, "抱歉,系统遇到了一些问题,请稍后重试。")
|
||||
except:
|
||||
pass
|
||||
|
||||
def _process_message(self, data: P2ImMessageReceiveV1) -> None:
|
||||
"""
|
||||
@@ -131,7 +148,7 @@ class FeishuLongConnService:
|
||||
content_json = json.loads(content)
|
||||
text_content = content_json.get("text", "").strip()
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"❌ 解析消息内容失败: {e}")
|
||||
logger.error(f"解析消息内容失败: {e}")
|
||||
return
|
||||
|
||||
logger.info(f"📝 文本内容: {text_content}")
|
||||
@@ -151,11 +168,11 @@ class FeishuLongConnService:
|
||||
break
|
||||
|
||||
if not text_content:
|
||||
logger.warning(f"⚠️ 移除@后内容为空,回复提示")
|
||||
logger.warning(f" 移除@后内容为空,回复提示")
|
||||
self._reply_message(message_id, "您好!请问有什么可以帮助您的吗?")
|
||||
return
|
||||
|
||||
logger.info(f"✅ 清理后内容: {text_content}")
|
||||
logger.info(f" 清理后内容: {text_content}")
|
||||
|
||||
# 构造会话用户ID(群聊隔离)
|
||||
session_user_id = f"feishu_{chat_id}_{sender_id}"
|
||||
@@ -178,7 +195,7 @@ class FeishuLongConnService:
|
||||
# 更新会话的 tenant_id(群可能重新绑定了租户)
|
||||
if session_id in chat_manager.active_sessions:
|
||||
chat_manager.active_sessions[session_id]['tenant_id'] = tenant_id
|
||||
logger.info(f"✅ 找到已有会话: {session_id}")
|
||||
logger.info(f" 找到已有会话: {session_id}")
|
||||
break
|
||||
|
||||
# 如果没有会话,创建新会话
|
||||
@@ -188,7 +205,7 @@ class FeishuLongConnService:
|
||||
work_order_id=None,
|
||||
tenant_id=tenant_id
|
||||
)
|
||||
logger.info(f"🆕 创建新会话: {session_id}, 用户={sender_name}({sender_id}), 租户={tenant_id}")
|
||||
logger.info(f" 创建新会话: {session_id}, 用户={sender_name}({sender_id}), 租户={tenant_id}")
|
||||
|
||||
# 调用实时对话接口处理消息
|
||||
response_data = chat_manager.process_message(
|
||||
@@ -198,7 +215,7 @@ class FeishuLongConnService:
|
||||
invocation_method=f"feishu_longconn({chat_type})"
|
||||
)
|
||||
|
||||
logger.info(f"📊 处理结果: {response_data.get('success')}")
|
||||
logger.info(f" 处理结果: {response_data.get('success')}")
|
||||
|
||||
# 提取回复
|
||||
if response_data.get("success"):
|
||||
@@ -206,7 +223,7 @@ class FeishuLongConnService:
|
||||
else:
|
||||
error_msg = response_data.get('error', '未知错误')
|
||||
reply_text = f"处理消息时出错: {error_msg}"
|
||||
logger.error(f"❌ 处理失败: {error_msg}")
|
||||
logger.error(f"处理失败: {error_msg}")
|
||||
|
||||
# 确保回复是字符串
|
||||
if isinstance(reply_text, dict):
|
||||
@@ -220,10 +237,10 @@ class FeishuLongConnService:
|
||||
# 发送回复
|
||||
self._reply_message(message_id, reply_text)
|
||||
|
||||
logger.info("✅ 消息处理完成")
|
||||
logger.info(" 消息处理完成")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ 处理消息时发生错误: {e}", exc_info=True)
|
||||
logger.error(f"处理消息时发生错误: {e}", exc_info=True)
|
||||
# 尝试发送错误提示
|
||||
try:
|
||||
if 'message_id' in locals():
|
||||
@@ -261,14 +278,14 @@ class FeishuLongConnService:
|
||||
response = self.client.im.v1.message.reply(request)
|
||||
|
||||
if not response.success():
|
||||
logger.error(f"❌ 回复失败: {response.code} - {response.msg}")
|
||||
logger.error(f"回复失败: {response.code} - {response.msg}")
|
||||
return False
|
||||
|
||||
logger.info(f"✅ 回复成功: {message_id}")
|
||||
logger.info(f" 回复成功: {message_id}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ 回复消息时发生错误: {e}", exc_info=True)
|
||||
logger.error(f"回复消息时发生错误: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
def start(self):
|
||||
@@ -300,7 +317,7 @@ class FeishuLongConnService:
|
||||
logger.info("")
|
||||
logger.info("⏹️ 用户中断,停止飞书长连接客户端")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ 飞书长连接客户端异常: {e}", exc_info=True)
|
||||
logger.error(f"飞书长连接客户端异常: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
|
||||
@@ -251,11 +251,11 @@ class FeishuPermissionChecker:
|
||||
result = self.check_permissions()
|
||||
|
||||
summary = "飞书权限检查结果:\n"
|
||||
summary += f"整体状态: {'✅ 正常' if result['success'] else '❌ 异常'}\n\n"
|
||||
summary += f"整体状态: {' 正常' if result['success'] else '异常'}\n\n"
|
||||
|
||||
summary += "检查项目:\n"
|
||||
for check_name, check_result in result["checks"].items():
|
||||
status_icon = "✅" if check_result["status"] == "success" else "⚠️" if check_result["status"] == "warning" else "❌"
|
||||
status_icon = "" if check_result["status"] == "success" else "" if check_result["status"] == "warning" else "❌"
|
||||
summary += f" {status_icon} {check_name}: {check_result['message']}\n"
|
||||
|
||||
if result["recommendations"]:
|
||||
|
||||
@@ -90,7 +90,7 @@ def check_login():
|
||||
'/login',
|
||||
'/api/auth/login',
|
||||
'/api/auth/status',
|
||||
'/api/feishu/bot/event', # ✅ 飞书机器人回调
|
||||
'/api/feishu/bot/event', # 飞书机器人回调
|
||||
'/static/',
|
||||
'/uploads/'
|
||||
]
|
||||
|
||||
@@ -187,7 +187,7 @@ def resolve_tenant_by_chat_id(chat_id: str) -> str:
|
||||
return t.tenant_id
|
||||
except Exception as e:
|
||||
logger.error(f"解析飞书群租户映射失败: {e}")
|
||||
logger.warning(f"⚠️ 飞书群 {chat_id} 未绑定任何租户,使用默认租户。请在租户管理页面将此 chat_id 绑定到对应租户。")
|
||||
logger.warning(f" 飞书群 {chat_id} 未绑定任何租户,使用默认租户。请在租户管理页面将此 chat_id 绑定到对应租户。")
|
||||
return DEFAULT_TENANT
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user