Files
iov_ana/layers/insights.py

188 lines
7.4 KiB
Python
Raw Permalink Normal View History

"""
Layer 3: 洞察引擎
"""
import json
from typing import Any
from core.config import LLM_CONFIG
from core.utils import get_llm_client, llm_chat, extract_json_array
from layers.explorer import ExplorationStep
INSIGHT_SYSTEM = """你是一个数据洞察专家。你会收到探索过程的所有结果,你需要:
1. 从结果中发现异常和有趣现象
2. 对比不同维度找出差异
3. 输出用户可能没问但值得知道的洞察
## 输出格式(严格 JSON 数组)
```json
[
{
"type": "outlier" | "trend" | "distribution" | "correlation" | "recommendation",
"severity": "high" | "medium" | "low",
"title": "简短标题",
"detail": "详细描述,包含具体数字",
"evidence": "支撑这个洞察的数据来源"
}
]
```
## 分析原则
- 每个洞察必须有具体数字支撑
- 用对比来说话A B X%
- 关注异常不描述平淡的事实
- 如果没有异常返回空数组"""
class Insight:
"""单条洞察"""
def __init__(self, data: dict):
self.type = data.get("type", "unknown")
self.severity = data.get("severity", "low")
self.title = data.get("title", "")
self.detail = data.get("detail", "")
self.evidence = data.get("evidence", "")
@property
def emoji(self) -> str:
return {"outlier": "⚠️", "trend": "📈", "distribution": "📊",
"correlation": "🔗", "recommendation": "💡"}.get(self.type, "📌")
@property
def severity_emoji(self) -> str:
return {"high": "🔴", "medium": "🟡", "low": "🟢"}.get(self.severity, "")
def __str__(self):
return f"{self.emoji} {self.severity_emoji} {self.title}: {self.detail}"
class InsightEngine:
"""洞察引擎"""
def __init__(self):
self.client, self.model = get_llm_client(LLM_CONFIG)
def analyze(self, steps: list[ExplorationStep], question: str) -> list[Insight]:
if not steps:
return []
history = self._build_history(steps)
content = llm_chat(
self.client, self.model,
messages=[
{"role": "system", "content": INSIGHT_SYSTEM},
{"role": "user", "content": f"## 用户问题\n{question}\n\n## 探索历史\n{history}\n\n请分析以上数据,输出异常和洞察。"},
],
temperature=0.3, max_tokens=2048,
)
return [Insight(d) for d in extract_json_array(content)]
def format_insights(self, insights: list[Insight]) -> str:
if not insights:
return ""
severity_order = {"high": 0, "medium": 1, "low": 2}
sorted_insights = sorted(insights, key=lambda i: severity_order.get(i.severity, 9))
lines = ["## 💡 主动洞察", "", "_以下是你没问但数据告诉我们的事_\n"]
for insight in sorted_insights:
lines.append(f"**{insight.emoji} {insight.title}** {insight.severity_emoji}")
lines.append(f" {insight.detail}")
lines.append(f" _数据来源: {insight.evidence}_")
lines.append("")
return "\n".join(lines)
def _build_history(self, steps: list[ExplorationStep]) -> str:
parts = []
for step in steps:
if step.action == "done":
parts.append(f"### 结束\n{step.reasoning}")
elif step.success:
parts.append(
f"### 第 {step.round_num} 轮:{step.purpose}\n"
f"SQL: `{step.sql}`\n结果 ({step.row_count} 行):\n"
f"数据: {json.dumps(step.rows, ensure_ascii=False)}"
)
else:
parts.append(f"### 第 {step.round_num} 轮:{step.purpose}\nSQL: `{step.sql}`\n失败: {step.error}")
return "\n\n".join(parts)
def quick_detect(steps: list[ExplorationStep]) -> list[str]:
"""基于规则的快速异常检测(零 LLM 成本)"""
alerts = []
seen = set()
for step in steps:
if not step.success or not step.rows:
continue
for col in step.columns:
vals = [r.get(col) for r in step.rows if isinstance(r.get(col), (int, float))]
if len(vals) < 2:
continue
col_lower = col.lower()
# ── 占比列:集中度过高 ──
if col_lower in ("pct", "percent", "percentage", "占比"):
max_pct = max(vals)
if max_pct > 50:
key = f"pct_{step.purpose}"
if key not in seen:
seen.add(key)
alerts.append(f"⚠️ {step.purpose}: 最高占比 {max_pct}%,集中度过高")
# ── 计数列:极值差异 ──
if col_lower in ("count", "cnt", "n", "total", "order_count") and len(vals) >= 3:
avg = sum(vals) / len(vals)
if avg > 0:
ratio = max(vals) / avg
if ratio > 3:
key = f"count_{step.purpose}"
if key not in seen:
seen.add(key)
alerts.append(f"⚠️ {step.purpose}: 最大值是均值的 {ratio:.1f}")
# ── Z-Score 异常检测 ──
if len(vals) >= 5 and col_lower not in ("id", "year", "month"):
mean = sum(vals) / len(vals)
variance = sum((v - mean) ** 2 for v in vals) / len(vals)
std = variance ** 0.5
if std > 0:
outliers = [(i, v) for i, v in enumerate(vals) if abs(v - mean) / std > 2]
if outliers:
key = f"zscore_{step.purpose}_{col}"
if key not in seen:
seen.add(key)
outlier_desc = ", ".join(f"{v:.1f}" for _, v in outliers[:3])
alerts.append(
f"⚠️ {step.purpose}{col}」发现 {len(outliers)} 个异常值 "
f"(均值={mean:.1f}, σ={std:.1f}, 异常值={outlier_desc})"
)
# ── 离散度检测(变异系数 CV──
if len(vals) >= 3 and col_lower not in ("id", "year", "month"):
mean = sum(vals) / len(vals)
if mean != 0:
variance = sum((v - mean) ** 2 for v in vals) / len(vals)
std = variance ** 0.5
cv = std / abs(mean)
if cv > 1.0:
key = f"cv_{step.purpose}_{col}"
if key not in seen:
seen.add(key)
alerts.append(f"⚠️ {step.purpose}{col}」离散度高 (CV={cv:.2f}),数据波动大")
# ── 零值/缺失检测 ──
if col_lower in ("count", "cnt", "total", "amount", "sum", "关闭时长"):
zero_count = sum(1 for v in vals if v == 0)
if zero_count > 0 and zero_count < len(vals):
pct = zero_count / len(vals) * 100
if pct > 10:
key = f"zero_{step.purpose}_{col}"
if key not in seen:
seen.add(key)
alerts.append(f"⚠️ {step.purpose}{col}」有 {zero_count} 个零值 ({pct:.0f}%)")
return alerts