""" Layer 3: 洞察引擎 """ import json from typing import Any from core.config import LLM_CONFIG from core.utils import get_llm_client, llm_chat, extract_json_array from layers.explorer import ExplorationStep INSIGHT_SYSTEM = """你是一个数据洞察专家。你会收到探索过程的所有结果,你需要: 1. 从结果中发现异常和有趣现象 2. 对比不同维度,找出差异 3. 输出用户可能没问但值得知道的洞察 ## 输出格式(严格 JSON 数组) ```json [ { "type": "outlier" | "trend" | "distribution" | "correlation" | "recommendation", "severity": "high" | "medium" | "low", "title": "简短标题", "detail": "详细描述,包含具体数字", "evidence": "支撑这个洞察的数据来源" } ] ``` ## 分析原则 - 每个洞察必须有具体数字支撑 - 用对比来说话(A 比 B 高 X%) - 关注异常,不描述平淡的事实 - 如果没有异常,返回空数组""" class Insight: """单条洞察""" def __init__(self, data: dict): self.type = data.get("type", "unknown") self.severity = data.get("severity", "low") self.title = data.get("title", "") self.detail = data.get("detail", "") self.evidence = data.get("evidence", "") @property def emoji(self) -> str: return {"outlier": "⚠️", "trend": "📈", "distribution": "📊", "correlation": "🔗", "recommendation": "💡"}.get(self.type, "📌") @property def severity_emoji(self) -> str: return {"high": "🔴", "medium": "🟡", "low": "🟢"}.get(self.severity, "") def __str__(self): return f"{self.emoji} {self.severity_emoji} {self.title}: {self.detail}" class InsightEngine: """洞察引擎""" def __init__(self): self.client, self.model = get_llm_client(LLM_CONFIG) def analyze(self, steps: list[ExplorationStep], question: str) -> list[Insight]: if not steps: return [] history = self._build_history(steps) content = llm_chat( self.client, self.model, messages=[ {"role": "system", "content": INSIGHT_SYSTEM}, {"role": "user", "content": f"## 用户问题\n{question}\n\n## 探索历史\n{history}\n\n请分析以上数据,输出异常和洞察。"}, ], temperature=0.3, max_tokens=2048, ) return [Insight(d) for d in extract_json_array(content)] def format_insights(self, insights: list[Insight]) -> str: if not insights: return "" severity_order = {"high": 0, "medium": 1, "low": 2} sorted_insights = sorted(insights, key=lambda i: severity_order.get(i.severity, 9)) lines = ["## 💡 主动洞察", "", "_以下是你没问但数据告诉我们的事:_\n"] for insight in sorted_insights: lines.append(f"**{insight.emoji} {insight.title}** {insight.severity_emoji}") lines.append(f" {insight.detail}") lines.append(f" _数据来源: {insight.evidence}_") lines.append("") return "\n".join(lines) def _build_history(self, steps: list[ExplorationStep]) -> str: parts = [] for step in steps: if step.action == "done": parts.append(f"### 结束\n{step.reasoning}") elif step.success: parts.append( f"### 第 {step.round_num} 轮:{step.purpose}\n" f"SQL: `{step.sql}`\n结果 ({step.row_count} 行):\n" f"数据: {json.dumps(step.rows, ensure_ascii=False)}" ) else: parts.append(f"### 第 {step.round_num} 轮:{step.purpose}\nSQL: `{step.sql}`\n失败: {step.error}") return "\n\n".join(parts) def quick_detect(steps: list[ExplorationStep]) -> list[str]: """基于规则的快速异常检测(零 LLM 成本)""" alerts = [] seen = set() for step in steps: if not step.success or not step.rows: continue for col in step.columns: vals = [r.get(col) for r in step.rows if isinstance(r.get(col), (int, float))] if len(vals) < 2: continue col_lower = col.lower() # ── 占比列:集中度过高 ── if col_lower in ("pct", "percent", "percentage", "占比"): max_pct = max(vals) if max_pct > 50: key = f"pct_{step.purpose}" if key not in seen: seen.add(key) alerts.append(f"⚠️ {step.purpose}: 最高占比 {max_pct}%,集中度过高") # ── 计数列:极值差异 ── if col_lower in ("count", "cnt", "n", "total", "order_count") and len(vals) >= 3: avg = sum(vals) / len(vals) if avg > 0: ratio = max(vals) / avg if ratio > 3: key = f"count_{step.purpose}" if key not in seen: seen.add(key) alerts.append(f"⚠️ {step.purpose}: 最大值是均值的 {ratio:.1f} 倍") # ── Z-Score 异常检测 ── if len(vals) >= 5 and col_lower not in ("id", "year", "month"): mean = sum(vals) / len(vals) variance = sum((v - mean) ** 2 for v in vals) / len(vals) std = variance ** 0.5 if std > 0: outliers = [(i, v) for i, v in enumerate(vals) if abs(v - mean) / std > 2] if outliers: key = f"zscore_{step.purpose}_{col}" if key not in seen: seen.add(key) outlier_desc = ", ".join(f"{v:.1f}" for _, v in outliers[:3]) alerts.append( f"⚠️ {step.purpose}「{col}」发现 {len(outliers)} 个异常值 " f"(均值={mean:.1f}, σ={std:.1f}, 异常值={outlier_desc})" ) # ── 离散度检测(变异系数 CV)── if len(vals) >= 3 and col_lower not in ("id", "year", "month"): mean = sum(vals) / len(vals) if mean != 0: variance = sum((v - mean) ** 2 for v in vals) / len(vals) std = variance ** 0.5 cv = std / abs(mean) if cv > 1.0: key = f"cv_{step.purpose}_{col}" if key not in seen: seen.add(key) alerts.append(f"⚠️ {step.purpose}「{col}」离散度高 (CV={cv:.2f}),数据波动大") # ── 零值/缺失检测 ── if col_lower in ("count", "cnt", "total", "amount", "sum", "关闭时长"): zero_count = sum(1 for v in vals if v == 0) if zero_count > 0 and zero_count < len(vals): pct = zero_count / len(vals) * 100 if pct > 10: key = f"zero_{step.purpose}_{col}" if key not in seen: seen.add(key) alerts.append(f"⚠️ {step.purpose}「{col}」有 {zero_count} 个零值 ({pct:.0f}%)") return alerts