""" Layer 4: 上下文管理器 —— 增强版 - 关键词语义匹配,替代简单取最近 N 条 - 会话摘要去重 """ import time import re from dataclasses import dataclass, field from typing import Optional from layers.explorer import ExplorationStep from layers.insights import Insight @dataclass class AnalysisSession: """一次分析的完整记录""" question: str plan: dict steps: list[ExplorationStep] insights: list[Insight] report: str timestamp: float = field(default_factory=time.time) @property def keywords(self) -> set[str]: """提取会话关键词(中文分字 + 英文词切分)""" text = f"{self.question} {self.plan.get('intent', '')} {' '.join(self.plan.get('dimensions', []))}" # 中文字符 cn_chars = set(re.findall(r'[\u4e00-\u9fff]+', text)) # 英文单词(小写) en_words = set(re.findall(r'[a-zA-Z]{2,}', text.lower())) return cn_chars | en_words def similarity(self, question: str) -> float: """与新问题的关键词相似度(Jaccard-like)""" q_cn = set(re.findall(r'[\u4e00-\u9fff]+', question)) q_en = set(re.findall(r'[a-zA-Z]{2,}', question.lower())) q_kw = q_cn | q_en if not q_kw: return 0.0 overlap = self.keywords & q_kw return len(overlap) / len(q_kw) def summary(self) -> str: parts = [f"**问题**: {self.question}"] if self.plan: parts.append(f"**分析类型**: {self.plan.get('analysis_type', 'unknown')}") parts.append(f"**维度**: {', '.join(self.plan.get('dimensions', []))}") key_findings = [] for step in self.steps: if step.success and step.rows: top_row = step.rows[0] if step.rows else {} finding = f"{step.purpose}: " + ", ".join(f"{k}={v}" for k, v in top_row.items() if k.lower() != "id") key_findings.append(finding) if key_findings: parts.append("**核心发现**:") for f in key_findings[:5]: parts.append(f" - {f}") if self.insights: parts.append("**洞察**:") for i in self.insights[:3]: parts.append(f" - {i}") return "\n".join(parts) def to_reference_text(self) -> str: return ( f"## 之前的分析\n### 问题\n{self.question}\n### 摘要\n{self.summary()}\n### 发现\n" + "\n".join(f"- {s.purpose}: {s.row_count} 行" for s in self.steps if s.success) ) class ContextManager: """上下文管理器 —— 语义匹配增强版""" def __init__(self, max_history: int = 20): self.sessions: list[AnalysisSession] = [] self.max_history = max_history def add_session(self, question: str, plan: dict, steps: list[ExplorationStep], insights: list[Insight], report: str) -> AnalysisSession: session = AnalysisSession(question=question, plan=plan, steps=steps, insights=insights, report=report) self.sessions.append(session) if len(self.sessions) > self.max_history: self.sessions = self.sessions[-self.max_history:] return session def get_context_for(self, new_question: str) -> Optional[str]: """ 智能匹配最相关的 1~3 个历史分析作为上下文。 相似度 > 0.3 才引用,最多 3 条,按相似度降序。 """ if not self.sessions: return None scored = [] for s in self.sessions: sim = s.similarity(new_question) if sim > 0.3: # 相关性阈值 scored.append((sim, s)) if not scored: # 无相关历史,返回最近 1 条作为兜底 return self.sessions[-1].to_reference_text() scored.sort(key=lambda x: x[0], reverse=True) return "\n\n---\n\n".join(s.to_reference_text() for _, s in scored[:3]) def get_history_summary(self) -> str: if not self.sessions: return "(无历史分析)" lines = [f"共 {len(self.sessions)} 次分析:"] for i, s in enumerate(self.sessions, 1): ts = time.strftime("%H:%M", time.localtime(s.timestamp)) lines.append(f" {i}. [{ts}] {s.question}") return "\n".join(lines) def clear(self): self.sessions.clear()