Files
assist/src/web/blueprints/feishu_bot.py
Jeason a7ae72d0b2 fix: Pipeline 处理添加耗时日志和异常捕获
- message_pipeline.handle_message 添加每步耗时日志(租户解析/会话管理/消息处理)
- feishu_longconn_service Pipeline 调用包裹 try/catch,异常时记录完整堆栈
- feishu_bot.py 同样添加 Pipeline 异常捕获
- 防止 Pipeline 内部异常被静默吞掉导致消息无回复
2026-04-08 09:07:50 +08:00

227 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
飞书机器人蓝图
处理来自飞书机器人的事件回调
"""
import logging
import json
import threading
from flask import Blueprint, request, jsonify, current_app
from src.integrations.feishu_service import FeishuService
from src.web.service_manager import service_manager
from src.core.cache_manager import cache_manager
# 初始化日志
logger = logging.getLogger(__name__)
# 创建蓝图
feishu_bot_bp = Blueprint('feishu_bot', __name__, url_prefix='/api/feishu/bot')
def _process_message_in_background(app, event_data: dict):
"""
在后台线程中处理消息,避免阻塞飞书的回调请求。
Args:
app: Flask应用实例
event_data: 飞书事件数据
"""
with app.app_context():
# 每个线程创建独立的飞书服务实例,避免token共享问题
from src.web.blueprints.tenants import resolve_tenant_by_chat_id, get_tenant_feishu_config
try:
# 1. 解析事件数据
event = event_data.get('event', {})
message = event.get('message', {})
message_id = message.get('message_id')
chat_id = message.get('chat_id')
chat_type = message.get('chat_type', 'unknown')
if not message_id or not chat_id:
logger.error(f"[Feishu Bot] 事件数据缺少必要字段: {event_data}")
return
# 解析租户:根据 chat_id 查找绑定的租户
tenant_id = resolve_tenant_by_chat_id(chat_id)
logger.info(f"[Feishu Bot] 群 {chat_id} 对应租户: {tenant_id}")
# 获取租户级飞书凭证(如果配置了)
tenant_feishu_cfg = get_tenant_feishu_config(tenant_id)
feishu_service = FeishuService(
app_id=tenant_feishu_cfg.get('app_id'),
app_secret=tenant_feishu_cfg.get('app_secret')
)
if not message_id or not chat_id:
logger.error(f"[Feishu Bot] 事件数据缺少必要字段: {event_data}")
return
# 记录会话类型
chat_type_desc = '群聊(group)' if chat_type == 'group' else '私聊(p2p)' if chat_type == 'p2p' else chat_type
logger.info(f"[Feishu Bot] 收到 {chat_type_desc} 消息, ChatID: {chat_id}")
# 消息去重检查
if cache_manager.check_and_set_message_processed(message_id):
logger.warning(f"[Feishu Bot] 🔁 消息 {message_id} 已被处理过(可能是长连接已处理),跳过")
return
# 内容是一个JSON字符串,需要再次解析
try:
content_json = json.loads(message.get('content', '{}'))
text_content = content_json.get('text', '').strip()
except json.JSONDecodeError as e:
logger.error(f"[Feishu Bot] 解析消息内容失败: {e}")
return
logger.info(f"[Feishu Bot] 后台开始处理消息ID: {message_id}, 内容: '{text_content}'")
# 2. 移除@机器人的部分
# 飞书的@消息格式通常是 "@机器人名 实际内容"
mentions = message.get('mentions', [])
if mentions:
for mention in mentions:
# mention['key']是@内容,例如"@_user_1"
# mention['name']是显示的名字
mention_name = mention.get('name', '')
if mention_name:
# 尝试多种@格式
for prefix in [f"@{mention_name}", f"@{mention_name} "]:
if text_content.startswith(prefix):
text_content = text_content[len(prefix):].strip()
break
if not text_content:
logger.warning(f"[Feishu Bot] 移除@后内容为空,不处理。消息ID: {message_id}")
# 仍然回复一个提示
feishu_service.reply_to_message(message_id, "您好!请问有什么可以帮助您的吗?")
return
logger.info(f"[Feishu Bot] 清理后的消息内容: '{text_content}'")
# 3. 解析发送者信息
sender = event.get('sender', {})
sender_ids = sender.get('sender_id', {})
sender_open_id = sender_ids.get('open_id', '') or ''
sender_user_id = sender_ids.get('user_id', '') or ''
sender_id = sender_user_id or sender_open_id or 'unknown'
sender_name = sender_id
try:
id_for_query = sender_open_id or sender_user_id
if id_for_query:
user_info = feishu_service.get_user_info(id_for_query, id_type='open_id' if sender_open_id else 'user_id')
sender_name = user_info.get('name') or user_info.get('en_name') or sender_id
except Exception:
pass
user_id = f"feishu_{chat_id}_{sender_id}"
logger.info(f"[Feishu Bot] 发送者={sender_name}({sender_id}), 群={chat_id}, 租户={tenant_id}")
# 4. 使用 Pipeline 统一处理消息
try:
pipeline = service_manager.get_pipeline()
logger.info(f"[Feishu Bot] Pipeline 开始处理: user={user_id}")
response_data = pipeline.handle_message(
user_id=user_id,
message=text_content,
tenant_id=tenant_id,
chat_id=chat_id,
ip_address=f"feishu:{sender_id}:{sender_name}",
invocation_method=f"feishu_bot({chat_type})"
)
except Exception as pipe_err:
logger.error(f"[Feishu Bot] Pipeline 处理异常: {pipe_err}", exc_info=True)
response_data = {"success": False, "error": str(pipe_err)}
logger.info(f"[Feishu Bot] 处理结果: success={response_data.get('success')}")
# 5. 提取回复并发送
if response_data.get("success"):
reply_text = response_data.get("response") or response_data.get("content", "抱歉,我暂时无法回答这个问题。")
else:
error_msg = response_data.get('error', '未知错误')
reply_text = f"抱歉,处理您的问题时遇到了一些问题。请稍后重试或联系客服。\n错误信息: {error_msg}"
logger.error(f"[Feishu Bot] 处理消息失败: {error_msg}")
# 确保回复是字符串
if isinstance(reply_text, dict):
reply_text = reply_text.get('content', str(reply_text))
if not isinstance(reply_text, str):
reply_text = str(reply_text)
logger.info(f"[Feishu Bot] 准备发送回复到飞书 (长度: {len(reply_text)})")
logger.debug(f"[Feishu Bot] 回复内容: {reply_text}")
success = feishu_service.reply_to_message(message_id, reply_text)
if success:
logger.info(f"[Feishu Bot] 成功回复消息到飞书。消息ID: {message_id}")
else:
logger.error(f"[Feishu Bot] 回复消息到飞书失败。消息ID: {message_id}")
except KeyError as e:
logger.error(f"[Feishu Bot] 事件数据格式错误,缺少字段: {e}", exc_info=True)
except Exception as e:
logger.error(f"[Feishu Bot] 后台处理消息时发生严重错误: {e}", exc_info=True)
# 尝试发送错误提示给用户
try:
if 'message_id' in locals():
feishu_service.reply_to_message(message_id, "抱歉,系统遇到了一些问题,请稍后重试。")
except:
pass
@feishu_bot_bp.route('/event', methods=['POST'])
def handle_feishu_event():
"""
接收并处理飞书事件回调Webhook 模式)。
如果系统同时运行了长连接模式,消息去重机制会自动跳过已处理的消息。
建议生产环境只启用一种模式(长连接 OR Webhook避免重复处理。
"""
# 1. 解析请求
data = request.json
if not data:
logger.warning("[Feishu Bot] 收到空的请求数据")
return jsonify({"status": "error", "message": "empty request"}), 400
logger.info(f"[Feishu Bot] 收到飞书事件回调:\n{json.dumps(data, indent=2, ensure_ascii=False)}")
# 2. 安全校验 (如果配置了)
# 可以在这里添加Verification Token的校验逻辑
# from src.config.unified_config import get_config
# config = get_config()
# if config.feishu.verification_token:
# token = request.headers.get('X-Lark-Request-Token')
# if token != config.feishu.verification_token:
# logger.warning("[Feishu Bot] Token验证失败")
# return jsonify({"status": "error", "message": "invalid token"}), 403
# 3. 处理URL验证挑战
if data.get("type") == "url_verification":
challenge = data.get("challenge", "")
logger.info(f"[Feishu Bot] 收到URL验证请求,返回challenge: {challenge}")
return jsonify({"challenge": challenge})
# 4. 处理事件回调
event_type = data.get("header", {}).get("event_type")
if event_type == "im.message.receive_v1":
# 获取当前Flask应用实例
app = current_app._get_current_object()
# 立即在后台线程中处理,避免阻塞飞书回调
threading.Thread(
target=_process_message_in_background,
args=(app, data),
daemon=True # 设置为守护线程
).start()
logger.info("[Feishu Bot] 已将消息处理任务推送到后台线程,并立即响应200 OK")
return jsonify({"status": "processing"})
# 5. 对于其他未知事件,也返回成功,避免飞书重试
logger.warning(f"[Feishu Bot] 收到未知类型的事件: {event_type}")
return jsonify({"status": "ignored"})