fix: Pipeline 改为同步初始化 + 日志精简 + 预警去重
1. Pipeline 初始化改回同步(Redis 已正常,不会阻塞) 2. 去掉消息处理时的 10 秒等待循环 3. 飞书长连接启动日志精简(去掉分隔线和重复信息) 4. 性能预警添加 5 分钟冷却期,同类型预警不重复触发
This commit is contained in:
@@ -198,7 +198,15 @@ class SystemOptimizer:
|
||||
logger.error(f"检查安全指标失败: {e}")
|
||||
|
||||
def _trigger_performance_alert(self, alert_type: str, message: str):
|
||||
"""触发性能预警"""
|
||||
"""触发性能预警(同类型预警 5 分钟内不重复)"""
|
||||
now = datetime.now()
|
||||
if not hasattr(self, '_alert_cooldowns'):
|
||||
self._alert_cooldowns = {}
|
||||
last = self._alert_cooldowns.get(alert_type)
|
||||
if last and (now - last).total_seconds() < 300:
|
||||
return # 冷却中,跳过
|
||||
self._alert_cooldowns[alert_type] = now
|
||||
|
||||
try:
|
||||
from ..core.models import Alert
|
||||
|
||||
|
||||
@@ -55,16 +55,13 @@ class FeishuLongConnService:
|
||||
self._executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="feishu_msg")
|
||||
self._msg_count = 0
|
||||
|
||||
# 预初始化 Pipeline(在后台线程,不阻塞启动)
|
||||
# 预初始化 Pipeline
|
||||
self._pipeline = None
|
||||
def _init_pipeline():
|
||||
try:
|
||||
self._pipeline = service_manager.get_pipeline()
|
||||
logger.info("✅ MessagePipeline 预初始化成功")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ MessagePipeline 预初始化失败: {e}", exc_info=True)
|
||||
import threading as _threading
|
||||
_threading.Thread(target=_init_pipeline, daemon=True).start()
|
||||
try:
|
||||
self._pipeline = service_manager.get_pipeline()
|
||||
logger.info("✅ MessagePipeline 初始化成功")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ MessagePipeline 初始化失败(将在首条消息时重试): {e}")
|
||||
|
||||
def _handle_message(self, data: P2ImMessageReceiveV1) -> None:
|
||||
"""收到消息事件,立即提交到线程池,确保 SDK 回调快速返回"""
|
||||
@@ -179,16 +176,7 @@ class FeishuLongConnService:
|
||||
# 使用 Pipeline 统一处理消息
|
||||
session_user_id = f"feishu_{chat_id}_{sender_id}"
|
||||
try:
|
||||
# 等待 Pipeline 就绪(最多 10 秒)
|
||||
import time as _wait_time
|
||||
for _ in range(100):
|
||||
if self._pipeline:
|
||||
break
|
||||
_wait_time.sleep(0.1)
|
||||
pipeline = self._pipeline
|
||||
if not pipeline:
|
||||
logger.warning("Pipeline 未就绪,尝试即时初始化")
|
||||
pipeline = service_manager.get_pipeline()
|
||||
pipeline = self._pipeline or service_manager.get_pipeline()
|
||||
logger.info(f"Pipeline 开始处理: user={session_user_id}")
|
||||
response_data = pipeline.handle_message(
|
||||
user_id=session_user_id,
|
||||
@@ -275,25 +263,11 @@ class FeishuLongConnService:
|
||||
return False
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
启动长连接客户端
|
||||
这个方法会阻塞当前线程,持续监听飞书事件
|
||||
"""
|
||||
logger.info("=" * 80)
|
||||
logger.info(" 启动飞书长连接客户端")
|
||||
logger.info("=" * 80)
|
||||
logger.info(f"📋 配置信息:")
|
||||
logger.info(f" - App ID: {self.app_id}")
|
||||
logger.info(" 等待消息中... (按 Ctrl+C 停止)")
|
||||
logger.info("=" * 80)
|
||||
"""启动长连接客户端(阻塞当前线程)"""
|
||||
logger.info(f"🚀 启动飞书长连接 (App: {self.app_id})")
|
||||
|
||||
try:
|
||||
# 创建长连接客户端
|
||||
cli = lark.ws.Client(self.app_id, self.app_secret, event_handler=self.event_handler)
|
||||
|
||||
logger.info("正在建立与飞书服务器的连接...")
|
||||
|
||||
# 启动长连接(会阻塞)
|
||||
cli.start()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
|
||||
Reference in New Issue
Block a user