Files
iov_ana/layers/playbook.py
openclaw e8f8e2f1ba feat: 四层架构全面增强
安全与稳定性:
- 移除硬编码 API Key,改用 .env + 环境变量
- LLM 调用统一重试机制(指数退避,3 次重试,处理 429/5xx/超时)
- 中文字体检测增强(CJK 关键词兜底 + 无字体时英文 fallback)
- 缺失 API Key 给出友好提示而非崩溃

分析能力提升:
- 异常检测新增 z-score 检测(标准差>2 标记异常)
- 新增变异系数 CV 检测(数据波动性)
- 新增零值/缺失检测
- 上下文管理器升级为关键词语义匹配(替代简单取最近 2 条)

用户体验:
- 报告自动保存为 Markdown(reports/ 目录)
- 新增 export 命令导出查询结果为 CSV
- 新增 reports 命令查看已保存报告
- CLI 支持 readline 命令历史(方向键翻阅)
- CSV 导入工具重写:自动列名映射、容错处理、dry-run 模式
- 新增 .env.example 配置模板
2026-03-31 14:39:17 +08:00

179 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Layer 1.5: 预设分析剧本
"""
import json
import os
import re
from typing import Optional
from core.config import LLM_CONFIG
from core.utils import get_llm_client, llm_chat, extract_json_object, extract_json_array
class Playbook:
"""一个预设分析剧本"""
def __init__(self, data: dict):
self.name = data["name"]
self.description = data["description"]
self.tags = data.get("tags", [])
self.preset_queries: list[dict] = data.get("preset_queries", [])
self.exploration_hints = data.get("exploration_hints", "")
self.placeholders = data.get("placeholders", {})
def to_summary(self) -> str:
return f"[{self.name}] {self.description} (标签: {', '.join(self.tags)})"
def render_queries(self, schema: dict) -> list[dict]:
rendered = []
for q in self.preset_queries:
sql, purpose = q["sql"], q.get("purpose", "")
for key, val in self.placeholders.items():
sql = sql.replace(f"{{{{{key}}}}}", val)
purpose = purpose.replace(f"{{{{{key}}}}}", val)
rendered.append({"sql": sql, "purpose": purpose})
return rendered
class PlaybookManager:
"""加载和匹配 Playbook"""
def __init__(self, playbook_dir: str = ""):
self.playbooks: list[Playbook] = []
self.client, self.model = get_llm_client(LLM_CONFIG)
if playbook_dir and os.path.isdir(playbook_dir):
self._load_from_dir(playbook_dir)
def _load_from_dir(self, dir_path: str):
for fname in sorted(os.listdir(dir_path)):
if not fname.endswith(".json"):
continue
try:
with open(os.path.join(dir_path, fname), "r", encoding="utf-8") as f:
data = json.load(f)
items = data if isinstance(data, list) else [data]
for item in items:
self.playbooks.append(Playbook(item))
except (json.JSONDecodeError, KeyError) as e:
print(f" ⚠️ 加载 playbook 失败 {fname}: {e}")
def add(self, playbook: Playbook):
self.playbooks.append(playbook)
def auto_generate(self, schema_text: str, save_dir: str = "") -> list[Playbook]:
"""让 LLM 根据 Schema 自动生成 Playbook"""
prompt = f"""你是一个数据分析专家。根据以下数据库 Schema生成 3-5 个预设分析剧本。
## 数据库 Schema
{schema_text}
## 输出格式(严格 JSON 数组)
```json
[
{{
"name": "剧本名称",
"description": "一句话描述",
"tags": ["关键词1", "关键词2"],
"preset_queries": [
{{"purpose": "查询目的", "sql": "SELECT ... GROUP BY ..."}}
],
"exploration_hints": "后续探索提示"
}}
]
```
## SQL 规则
- 只用 SELECT必须有聚合函数或 GROUP BY
- 禁止 SELECT *,用 ROUND 控制精度,合理 LIMIT
- 直接使用实际表名和列名"""
try:
content = llm_chat(
self.client, self.model,
messages=[
{"role": "system", "content": "你是数据分析专家。只输出 JSON不要其他内容。"},
{"role": "user", "content": prompt},
],
temperature=0.3, max_tokens=4096,
)
playbooks_data = extract_json_array(content)
if not playbooks_data:
return []
generated = []
for i, pb_data in enumerate(playbooks_data):
pb_data.setdefault("tags", [])
pb_data.setdefault("exploration_hints", "")
pb_data.setdefault("placeholders", {})
try:
pb = Playbook(pb_data)
self.playbooks.append(pb)
generated.append(pb)
if save_dir:
os.makedirs(save_dir, exist_ok=True)
safe = re.sub(r'[^\w\u4e00-\u9fff]', '_', pb.name)[:30]
fpath = os.path.join(save_dir, f"auto_{i+1}_{safe}.json")
with open(fpath, "w", encoding="utf-8") as f:
json.dump(pb_data, f, ensure_ascii=False, indent=2)
except (KeyError, TypeError) as e:
print(f" ⚠️ 跳过无效 Playbook: {e}")
return generated
except Exception as e:
print(f" ⚠️ 自动生成 Playbook 出错: {e}")
return []
def match(self, plan: dict, schema_text: str) -> Optional[dict]:
"""用 LLM 判断当前分析计划是否匹配某个 Playbook"""
if not self.playbooks:
return None
pb_summaries = []
for i, pb in enumerate(self.playbooks):
queries_desc = "\n".join(f" - {q.get('purpose', '')}: {q['sql'][:100]}" for q in pb.preset_queries)
pb_summaries.append(f"{i+1}. {pb.to_summary()}\n 预设查询:\n{queries_desc}")
prompt = f"""判断当前分析计划是否适合使用某个预设剧本。
## 分析计划
```json
{json.dumps(plan, ensure_ascii=False, indent=2)}
```
## Schema
{schema_text}
## 可用剧本
{chr(10).join(pb_summaries)}
## 输出(严格 JSON
匹配: {{"matched": true, "playbook_index": 1, "reasoning": "原因", "placeholders": {{}}}}
不匹配: {{"matched": false, "reasoning": "原因"}}"""
try:
content = llm_chat(
self.client, self.model,
messages=[
{"role": "system", "content": "你是分析计划匹配器。"},
{"role": "user", "content": prompt},
],
temperature=0.1, max_tokens=512,
)
result = extract_json_object(content)
if not result.get("matched"):
return None
idx = result.get("playbook_index", 1) - 1
if idx < 0 or idx >= len(self.playbooks):
return None
pb = self.playbooks[idx]
pb.placeholders = {**pb.placeholders, **result.get("placeholders", {})}
return {
"matched": True, "playbook_name": pb.name,
"reasoning": result.get("reasoning", ""),
"preset_queries": pb.render_queries({}),
"exploration_hints": pb.exploration_hints,
}
except Exception as e:
print(f" ⚠️ Playbook 匹配出错: {e}")
return None