Files
assist/src/integrations/feishu_longconn_service.py
Jeason 28e90d2182 fix: 飞书群绑定租户完善
- tenants.js 去掉已删除的 appid/appsecret 元素引用
- showEditTenantModal 改为从 API 加载完整租户数据(不再传参拼接)
- saveTenant 保留已有的非 feishu 配置,只更新 chat_groups
- 租户列表显示绑定群数量或'未绑定飞书群'
- 飞书 bot/longconn 复用已有会话时同步更新 tenant_id(群重新绑定后立即生效)
- 删除租户后同步刷新租户选择器
2026-04-02 15:25:50 +08:00

295 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
飞书长连接服务(基于官方 SDK- 修正版
使用事件订阅 2.0 - 无需公网域名和 webhook 配置
"""
import logging
import json
from typing import Optional
import lark_oapi as lark
from lark_oapi.api.im.v1 import P2ImMessageReceiveV1, ReplyMessageRequest, ReplyMessageRequestBody
from src.config.unified_config import get_config
from src.web.service_manager import service_manager
from src.core.cache_manager import cache_manager
logger = logging.getLogger(__name__)
class FeishuLongConnService:
"""飞书长连接服务 - 使用官方 SDK"""
def __init__(self):
"""初始化飞书长连接服务"""
config = get_config()
self.app_id = config.feishu.app_id
self.app_secret = config.feishu.app_secret
logger.info("🚀 初始化飞书长连接服务...")
logger.info(f" - App ID: {self.app_id}")
logger.info(f" - App Secret: {self.app_secret[:10]}...")
# 创建飞书客户端
self.client = lark.Client.builder() \
.app_id(self.app_id) \
.app_secret(self.app_secret) \
.log_level(lark.LogLevel.DEBUG) \
.build()
logger.info("✅ 飞书客户端创建成功")
# 创建事件处理器
self.event_handler = lark.EventDispatcherHandler.builder(
"", "" # 长连接模式不需要 verification_token 和 encrypt_key
).register_p2_im_message_receive_v1(self._handle_message) \
.build()
logger.info("✅ 飞书事件处理器创建成功")
logger.info("✅ 飞书长连接服务初始化完成")
def _handle_message(self, data: P2ImMessageReceiveV1) -> None:
"""
处理接收到的消息事件
Args:
data: 飞书消息事件数据
"""
logger.info("=" * 80)
logger.info("📨 收到飞书消息事件!")
logger.info("=" * 80)
try:
# 提取消息信息
event = data.event
message = event.message
message_id = message.message_id
chat_id = message.chat_id
message_type = message.message_type
chat_type = getattr(message, 'chat_type', 'unknown') # 获取会话类型
content = message.content
sender = event.sender
# 获取发送者ID和群信息
sender_id = sender.sender_id.user_id
try:
tenant_key = sender.sender_id.tenant_key
except:
tenant_key = "unknown"
# 详细日志记录
logger.info(f"📋 消息详情 [长连接]:")
logger.info(f" - 消息ID: {message_id}")
logger.info(f" - 会话类型: {'群聊(group)' if chat_type == 'group' else '私聊(p2p)' if chat_type == 'p2p' else chat_type}")
logger.info(f" - 会话ID: {chat_id}")
logger.info(f" - 发送者ID: {sender_id}")
logger.info(f" - 租户Key: {tenant_key}")
logger.info(f" - 消息类型: {message_type}")
# 消息去重检查
if cache_manager.check_and_set_message_processed(message_id):
logger.warning(f"🔁 消息 {message_id} 已被处理过可能是Webhook已处理跳过")
return
# 只处理文本消息
if message_type != "text":
logger.info(f"⏭️ 跳过非文本消息类型: {message_type}")
return
# 解析消息内容
try:
content_json = json.loads(content)
text_content = content_json.get("text", "").strip()
except json.JSONDecodeError as e:
logger.error(f"❌ 解析消息内容失败: {e}")
return
logger.info(f"📝 文本内容: {text_content}")
# 移除 @机器人 的部分
mentions = message.mentions
if mentions:
logger.info(f"👥 检测到 {len(mentions)} 个提及")
for mention in mentions:
mention_name = mention.name
if mention_name:
logger.info(f" - @{mention_name}")
# 尝试多种 @ 格式
for prefix in [f"@{mention_name}", f"@{mention_name} "]:
if text_content.startswith(prefix):
text_content = text_content[len(prefix):].strip()
break
if not text_content:
logger.warning(f"⚠️ 移除@后内容为空,回复提示")
self._reply_message(message_id, "您好!请问有什么可以帮助您的吗?")
return
logger.info(f"✅ 清理后内容: {text_content}")
# 获取发送者ID
sender_id = sender.sender_id.user_id
# 构造会话用户ID群聊隔离
# 格式: feishu_群聊ID_用户ID
session_user_id = f"feishu_{chat_id}_{sender_id}"
logger.info(f"🔑 会话标识: {session_user_id}")
# 解析租户:根据 chat_id 查找绑定的租户
from src.web.blueprints.tenants import resolve_tenant_by_chat_id
tenant_id = resolve_tenant_by_chat_id(chat_id)
logger.info(f"🏢 群 {chat_id} 对应租户: {tenant_id}")
# 获取或创建会话
chat_manager = service_manager.get_chat_manager()
active_sessions = chat_manager.get_active_sessions()
session_id = None
for session in active_sessions:
if session.get('user_id') == session_user_id:
session_id = session.get('session_id')
# 更新会话的 tenant_id群可能重新绑定了租户
if session_id in chat_manager.active_sessions:
chat_manager.active_sessions[session_id]['tenant_id'] = tenant_id
logger.info(f"✅ 找到已有会话: {session_id}")
break
# 如果没有会话,创建新会话
if not session_id:
session_id = chat_manager.create_session(
user_id=session_user_id,
work_order_id=None,
tenant_id=tenant_id
)
logger.info(f"🆕 创建新会话: {session_id} (租户: {tenant_id})")
# 调用实时对话接口处理消息
logger.info(f"🤖 调用 TSP Assistant 处理消息...")
response_data = chat_manager.process_message(
session_id=session_id,
user_message=text_content,
ip_address=f"Feishu:{sender_id}",
invocation_method=f"Feishu({chat_type})"
)
logger.info(f"📊 处理结果: {response_data.get('success')}")
# 提取回复
if response_data.get("success"):
reply_text = response_data.get("response") or response_data.get("content", "抱歉,我暂时无法回答这个问题。")
else:
error_msg = response_data.get('error', '未知错误')
reply_text = f"处理消息时出错: {error_msg}"
logger.error(f"❌ 处理失败: {error_msg}")
# 确保回复是字符串
if isinstance(reply_text, dict):
reply_text = reply_text.get('content', str(reply_text))
if not isinstance(reply_text, str):
reply_text = str(reply_text)
logger.info(f"📤 准备发送回复 (长度: {len(reply_text)})")
logger.info(f" 内容预览: {reply_text[:100]}...")
# 发送回复
self._reply_message(message_id, reply_text)
logger.info("=" * 80)
logger.info("✅ 消息处理完成")
logger.info("=" * 80)
except Exception as e:
logger.error(f"❌ 处理消息时发生错误: {e}", exc_info=True)
# 尝试发送错误提示
try:
if 'message_id' in locals():
self._reply_message(message_id, "抱歉,系统遇到了一些问题,请稍后重试。")
except:
pass
def _reply_message(self, message_id: str, content: str) -> bool:
"""
回复消息
Args:
message_id: 消息ID
content: 回复内容
Returns:
是否成功
"""
try:
logger.info(f"📧 发送回复到消息 {message_id}")
# 转义 JSON 特殊字符
content_escaped = content.replace('"', '\\"').replace('\n', '\\n')
# 构造回复请求
request = ReplyMessageRequest.builder() \
.message_id(message_id) \
.request_body(ReplyMessageRequestBody.builder()
.msg_type("text")
.content(f'{{"text":"{content_escaped}"}}')
.build()) \
.build()
# 发送回复
response = self.client.im.v1.message.reply(request)
if not response.success():
logger.error(f"❌ 回复失败: {response.code} - {response.msg}")
return False
logger.info(f"✅ 回复成功: {message_id}")
return True
except Exception as e:
logger.error(f"❌ 回复消息时发生错误: {e}", exc_info=True)
return False
def start(self):
"""
启动长连接客户端
这个方法会阻塞当前线程,持续监听飞书事件
"""
logger.info("=" * 80)
logger.info("🚀 启动飞书长连接客户端")
logger.info("=" * 80)
logger.info(f"📋 配置信息:")
logger.info(f" - App ID: {self.app_id}")
logger.info(f" - 模式: 事件订阅 2.0(长连接)")
logger.info(f" - 优势: 无需公网域名和 webhook 配置")
logger.info("=" * 80)
logger.info("💡 等待消息中... (按 Ctrl+C 停止)")
logger.info("=" * 80)
try:
# 创建长连接客户端
cli = lark.ws.Client(self.app_id, self.app_secret, event_handler=self.event_handler)
logger.info("🔌 正在建立与飞书服务器的连接...")
# 启动长连接(会阻塞)
cli.start()
except KeyboardInterrupt:
logger.info("")
logger.info("⏹️ 用户中断,停止飞书长连接客户端")
except Exception as e:
logger.error(f"❌ 飞书长连接客户端异常: {e}", exc_info=True)
raise
# 全局服务实例
_feishu_longconn_service = None
def get_feishu_longconn_service() -> FeishuLongConnService:
"""获取飞书长连接服务单例"""
global _feishu_longconn_service
if _feishu_longconn_service is None:
_feishu_longconn_service = FeishuLongConnService()
return _feishu_longconn_service