# -*- coding: utf-8 -*- """ 飞书长连接服务(基于官方 SDK)- 修正版 使用事件订阅 2.0 - 无需公网域名和 webhook 配置 """ import logging import json from typing import Optional import lark_oapi as lark from lark_oapi.api.im.v1 import P2ImMessageReceiveV1, ReplyMessageRequest, ReplyMessageRequestBody from src.config.unified_config import get_config from src.web.service_manager import service_manager from src.core.cache_manager import cache_manager logger = logging.getLogger(__name__) class FeishuLongConnService: """飞书长连接服务 - 使用官方 SDK""" def __init__(self): """初始化飞书长连接服务""" config = get_config() self.app_id = config.feishu.app_id self.app_secret = config.feishu.app_secret logger.info("🚀 初始化飞书长连接服务...") logger.info(f" - App ID: {self.app_id}") logger.info(f" - App Secret: {self.app_secret[:10]}...") # 创建飞书客户端 self.client = lark.Client.builder() \ .app_id(self.app_id) \ .app_secret(self.app_secret) \ .log_level(lark.LogLevel.DEBUG) \ .build() logger.info("✅ 飞书客户端创建成功") # 创建事件处理器 self.event_handler = lark.EventDispatcherHandler.builder( "", "" # 长连接模式不需要 verification_token 和 encrypt_key ).register_p2_im_message_receive_v1(self._handle_message) \ .build() logger.info("✅ 飞书事件处理器创建成功") logger.info("✅ 飞书长连接服务初始化完成") def _handle_message(self, data: P2ImMessageReceiveV1) -> None: """ 处理接收到的消息事件 Args: data: 飞书消息事件数据 """ logger.info("=" * 80) logger.info("📨 收到飞书消息事件!") logger.info("=" * 80) try: # 提取消息信息 event = data.event message = event.message message_id = message.message_id chat_id = message.chat_id message_type = message.message_type chat_type = getattr(message, 'chat_type', 'unknown') # 获取会话类型 content = message.content sender = event.sender # 获取发送者ID和群信息 sender_id = sender.sender_id.user_id sender_open_id = getattr(sender.sender_id, 'open_id', '') try: tenant_key = sender.sender_id.tenant_key except: tenant_key = "unknown" # 获取发送者姓名 sender_name = sender_id try: from src.integrations.feishu_service import FeishuService fs = FeishuService() user_info = fs.get_user_info(sender_id) sender_name = user_info.get('name') or user_info.get('en_name') or sender_id except Exception as e: logger.warning(f"获取发送者信息失败: {e}") # 详细日志记录 chat_type_desc = '群聊(group)' if chat_type == 'group' else '私聊(p2p)' if chat_type == 'p2p' else chat_type logger.info(f"📨 [长连接] 发送者={sender_name}({sender_id}), 群={chat_id}, 类型={chat_type_desc}, 消息ID={message_id}") # 消息去重检查 if cache_manager.check_and_set_message_processed(message_id): logger.warning(f"🔁 消息 {message_id} 已被处理过(可能是Webhook已处理),跳过") return # 只处理文本消息 if message_type != "text": logger.info(f"⏭️ 跳过非文本消息类型: {message_type}") return # 解析消息内容 try: content_json = json.loads(content) text_content = content_json.get("text", "").strip() except json.JSONDecodeError as e: logger.error(f"❌ 解析消息内容失败: {e}") return logger.info(f"📝 文本内容: {text_content}") # 移除 @机器人 的部分 mentions = message.mentions if mentions: logger.info(f"👥 检测到 {len(mentions)} 个提及") for mention in mentions: mention_name = mention.name if mention_name: logger.info(f" - @{mention_name}") # 尝试多种 @ 格式 for prefix in [f"@{mention_name}", f"@{mention_name} "]: if text_content.startswith(prefix): text_content = text_content[len(prefix):].strip() break if not text_content: logger.warning(f"⚠️ 移除@后内容为空,回复提示") self._reply_message(message_id, "您好!请问有什么可以帮助您的吗?") return logger.info(f"✅ 清理后内容: {text_content}") # 获取发送者ID sender_id = sender.sender_id.user_id # 构造会话用户ID(群聊隔离) # 格式: feishu_群聊ID_用户ID session_user_id = f"feishu_{chat_id}_{sender_id}" logger.info(f"🔑 会话标识: {session_user_id}") # 解析租户:根据 chat_id 查找绑定的租户 from src.web.blueprints.tenants import resolve_tenant_by_chat_id tenant_id = resolve_tenant_by_chat_id(chat_id) logger.info(f"🏢 群 {chat_id} 对应租户: {tenant_id}") # 获取或创建会话 chat_manager = service_manager.get_chat_manager() active_sessions = chat_manager.get_active_sessions() session_id = None for session in active_sessions: if session.get('user_id') == session_user_id: session_id = session.get('session_id') # 更新会话的 tenant_id(群可能重新绑定了租户) if session_id in chat_manager.active_sessions: chat_manager.active_sessions[session_id]['tenant_id'] = tenant_id logger.info(f"✅ 找到已有会话: {session_id}") break # 如果没有会话,创建新会话 if not session_id: session_id = chat_manager.create_session( user_id=session_user_id, work_order_id=None, tenant_id=tenant_id ) logger.info(f"🆕 创建新会话: {session_id}, 用户={sender_name}({sender_id}), 租户={tenant_id}") # 调用实时对话接口处理消息 response_data = chat_manager.process_message( session_id=session_id, user_message=text_content, ip_address=f"feishu:{sender_id}:{sender_name}", invocation_method=f"feishu_longconn({chat_type})" ) logger.info(f"📊 处理结果: {response_data.get('success')}") # 提取回复 if response_data.get("success"): reply_text = response_data.get("response") or response_data.get("content", "抱歉,我暂时无法回答这个问题。") else: error_msg = response_data.get('error', '未知错误') reply_text = f"处理消息时出错: {error_msg}" logger.error(f"❌ 处理失败: {error_msg}") # 确保回复是字符串 if isinstance(reply_text, dict): reply_text = reply_text.get('content', str(reply_text)) if not isinstance(reply_text, str): reply_text = str(reply_text) logger.info(f"📤 准备发送回复 (长度: {len(reply_text)})") logger.info(f" 内容预览: {reply_text[:100]}...") # 发送回复 self._reply_message(message_id, reply_text) logger.info("=" * 80) logger.info("✅ 消息处理完成") logger.info("=" * 80) except Exception as e: logger.error(f"❌ 处理消息时发生错误: {e}", exc_info=True) # 尝试发送错误提示 try: if 'message_id' in locals(): self._reply_message(message_id, "抱歉,系统遇到了一些问题,请稍后重试。") except: pass def _reply_message(self, message_id: str, content: str) -> bool: """ 回复消息 Args: message_id: 消息ID content: 回复内容 Returns: 是否成功 """ try: logger.info(f"📧 发送回复到消息 {message_id}") # 转义 JSON 特殊字符 content_escaped = content.replace('"', '\\"').replace('\n', '\\n') # 构造回复请求 request = ReplyMessageRequest.builder() \ .message_id(message_id) \ .request_body(ReplyMessageRequestBody.builder() .msg_type("text") .content(f'{{"text":"{content_escaped}"}}') .build()) \ .build() # 发送回复 response = self.client.im.v1.message.reply(request) if not response.success(): logger.error(f"❌ 回复失败: {response.code} - {response.msg}") return False logger.info(f"✅ 回复成功: {message_id}") return True except Exception as e: logger.error(f"❌ 回复消息时发生错误: {e}", exc_info=True) return False def start(self): """ 启动长连接客户端 这个方法会阻塞当前线程,持续监听飞书事件 """ logger.info("=" * 80) logger.info("🚀 启动飞书长连接客户端") logger.info("=" * 80) logger.info(f"📋 配置信息:") logger.info(f" - App ID: {self.app_id}") logger.info(f" - 模式: 事件订阅 2.0(长连接)") logger.info(f" - 优势: 无需公网域名和 webhook 配置") logger.info("=" * 80) logger.info("💡 等待消息中... (按 Ctrl+C 停止)") logger.info("=" * 80) try: # 创建长连接客户端 cli = lark.ws.Client(self.app_id, self.app_secret, event_handler=self.event_handler) logger.info("🔌 正在建立与飞书服务器的连接...") # 启动长连接(会阻塞) cli.start() except KeyboardInterrupt: logger.info("") logger.info("⏹️ 用户中断,停止飞书长连接客户端") except Exception as e: logger.error(f"❌ 飞书长连接客户端异常: {e}", exc_info=True) raise # 全局服务实例 _feishu_longconn_service = None def get_feishu_longconn_service() -> FeishuLongConnService: """获取飞书长连接服务单例""" global _feishu_longconn_service if _feishu_longconn_service is None: _feishu_longconn_service = FeishuLongConnService() return _feishu_longconn_service