Files
vibe_data_ana/docs/DEVELOPER_GUIDE.md

852 lines
18 KiB
Markdown
Raw Permalink Normal View History

# 开发者指南
本指南帮助开发者理解系统架构、扩展功能和添加新工具。
## 目录
- [系统架构](#系统架构)
- [开发环境设置](#开发环境设置)
- [添加新工具](#添加新工具)
- [扩展分析引擎](#扩展分析引擎)
- [自定义数据模型](#自定义数据模型)
- [测试指南](#测试指南)
- [代码规范](#代码规范)
- [调试技巧](#调试技巧)
---
## 系统架构
### 整体架构
系统采用五阶段流水线架构,每个阶段由 AI 驱动:
```
数据输入 → 数据理解 → 需求理解 → 分析规划 → 任务执行 → 报告生成
```
### 核心组件
```
src/
├── main.py # 主流程编排
├── cli.py # 命令行接口
├── config.py # 配置管理
├── data_access.py # 数据访问层(隐私保护)
├── error_handling.py # 错误处理
├── logging_config.py # 日志配置
├── env_loader.py # 环境变量加载
├── engines/ # 分析引擎
│ ├── data_understanding.py # 数据理解
│ ├── requirement_understanding.py # 需求理解
│ ├── analysis_planning.py # 分析规划
│ ├── task_execution.py # 任务执行ReAct
│ ├── plan_adjustment.py # 计划调整
│ └── report_generation.py # 报告生成
├── models/ # 数据模型
│ ├── data_profile.py
│ ├── requirement_spec.py
│ ├── analysis_plan.py
│ └── analysis_result.py
└── tools/ # 分析工具
├── base.py # 工具基类和注册表
├── query_tools.py # 数据查询工具
├── stats_tools.py # 统计分析工具
├── viz_tools.py # 可视化工具
└── tool_manager.py # 工具管理器
```
### 数据流
```
CSV 文件
DataAccessLayer数据访问层
DataProfile数据画像元数据 + 统计摘要)
RequirementSpec需求规格
AnalysisPlan分析计划任务列表
AnalysisResult[](分析结果列表)
Markdown 报告
```
### 设计原则
1. **AI 优先**:让 AI 做决策,而不是执行预定义的规则
2. **动态适应**:根据数据特征和发现动态调整分析计划
3. **隐私保护**AI 不读取原始数据,只通过工具获取摘要信息
4. **工具驱动**:通过动态工具集赋能 AI 的分析能力
5. **可扩展性**:易于添加新工具和扩展功能
---
## 开发环境设置
### 1. 克隆仓库
```bash
git clone <repository-url>
cd <repository-name>
```
### 2. 创建虚拟环境
```bash
# 使用 venv
python -m venv .venv
# 激活虚拟环境
# Windows
.venv\Scripts\activate
# Linux/Mac
source .venv/bin/activate
```
### 3. 安装依赖
```bash
# 安装生产依赖
pip install -r requirements.txt
# 安装开发依赖(如果有)
pip install pytest hypothesis pytest-cov black flake8
```
### 4. 配置环境变量
```bash
cp .env.example .env
# 编辑 .env 文件,设置 API 密钥
```
### 5. 运行测试
```bash
# 运行所有测试
pytest
# 运行特定测试
pytest tests/test_integration.py -v
# 查看覆盖率
pytest --cov=src --cov-report=html
```
---
## 添加新工具
### 步骤1创建工具类
创建一个继承自 `AnalysisTool` 的新类:
```python
# src/tools/my_custom_tools.py
from src.tools.base import AnalysisTool
from src.models import DataProfile
import pandas as pd
from typing import Dict, Any
class MyCustomTool(AnalysisTool):
"""
自定义分析工具。
功能:[描述工具的功能]
"""
@property
def name(self) -> str:
"""工具名称(唯一标识)。"""
return "my_custom_tool"
@property
def description(self) -> str:
"""工具描述(供 AI 理解)。"""
return """
这个工具用于 [具体功能描述]。
适用场景:
- [场景1]
- [场景2]
输入参数:
- column: 要分析的列名
- threshold: 阈值参数
输出:
- result: 分析结果
- insights: 洞察列表
"""
@property
def parameters(self) -> Dict[str, Any]:
"""参数定义JSON Schema 格式)。"""
return {
"type": "object",
"properties": {
"column": {
"type": "string",
"description": "要分析的列名"
},
"threshold": {
"type": "number",
"description": "阈值参数",
"default": 0.5
}
},
"required": ["column"]
}
def execute(self, data: pd.DataFrame, **kwargs) -> Dict[str, Any]:
"""
执行工具。
参数:
data: 原始数据(工具内部使用,不暴露给 AI
**kwargs: 工具参数
返回:
聚合后的结果(不包含原始数据)
"""
# 1. 验证参数
if not self.validate_parameters(**kwargs):
raise ValueError("参数验证失败")
column = kwargs['column']
threshold = kwargs.get('threshold', 0.5)
# 2. 检查列是否存在
if column not in data.columns:
raise ValueError(f"列 '{column}' 不存在")
# 3. 执行分析
# 注意:只返回聚合数据,不返回原始行级数据
result = {
"column": column,
"threshold": threshold,
"count": len(data),
"result_value": data[column].mean(), # 示例
"insights": [
f"列 {column} 的平均值为 {data[column].mean():.2f}"
]
}
return result
def is_applicable(self, data_profile: DataProfile) -> bool:
"""
判断工具是否适用于当前数据。
参数:
data_profile: 数据画像
返回:
True 如果工具适用False 否则
"""
# 示例:检查是否有数值列
has_numeric = any(
col.dtype == 'numeric'
for col in data_profile.columns
)
return has_numeric
```
### 步骤2注册工具
`src/tools/__init__.py` 中注册工具:
```python
from src.tools.base import register_tool
from src.tools.my_custom_tools import MyCustomTool
# 注册工具
register_tool(MyCustomTool())
```
或者在工具管理器中动态注册:
```python
from src.tools.tool_manager import ToolManager
from src.tools.my_custom_tools import MyCustomTool
tool_manager = ToolManager()
tool_manager.registry.register(MyCustomTool())
```
### 步骤3编写测试
创建测试文件 `tests/test_my_custom_tools.py`
```python
import pytest
import pandas as pd
from hypothesis import given, strategies as st
from src.tools.my_custom_tools import MyCustomTool
from src.models import DataProfile, ColumnInfo
def test_my_custom_tool_basic():
"""测试工具的基本功能。"""
# 准备测试数据
data = pd.DataFrame({
'value': [1, 2, 3, 4, 5]
})
# 创建工具
tool = MyCustomTool()
# 执行工具
result = tool.execute(data, column='value', threshold=0.5)
# 验证结果
assert result['column'] == 'value'
assert result['threshold'] == 0.5
assert result['count'] == 5
assert 'insights' in result
def test_my_custom_tool_invalid_column():
"""测试无效列名的处理。"""
data = pd.DataFrame({'value': [1, 2, 3]})
tool = MyCustomTool()
with pytest.raises(ValueError, match="列 .* 不存在"):
tool.execute(data, column='invalid_column')
@given(data=st.data())
def test_my_custom_tool_property(data):
"""属性测试:工具应该总是返回聚合数据。"""
# 生成随机数据
df = pd.DataFrame({
'value': data.draw(st.lists(st.floats(), min_size=10, max_size=100))
})
tool = MyCustomTool()
result = tool.execute(df, column='value')
# 验证:结果不应包含原始行级数据
assert 'data' not in result or len(result.get('data', [])) <= 100
assert 'insights' in result
```
### 步骤4更新文档
`docs/API.md` 中添加工具文档:
```markdown
### MyCustomTool
自定义分析工具。
**功能**[描述]
**参数**
- `column` (str): 要分析的列名
- `threshold` (float): 阈值参数,默认 0.5
**返回值**
```python
{
"column": str,
"threshold": float,
"count": int,
"result_value": float,
"insights": List[str]
}
```
**示例**
```python
tool = MyCustomTool()
result = tool.execute(data, column='value', threshold=0.5)
```
```
### 工具开发最佳实践
1. **隐私保护**
- 永远不要返回原始行级数据
- 只返回聚合数据(统计值、计数、分组结果等)
- 限制返回的数据行数(最多 100 行)
2. **参数验证**
- 使用 JSON Schema 定义参数
-`execute()` 中验证参数
- 提供清晰的错误信息
3. **错误处理**
- 捕获并处理异常
- 返回有意义的错误信息
- 不要让工具崩溃整个流程
4. **性能优化**
- 避免不必要的数据复制
- 使用 pandas 的向量化操作
- 考虑大数据集的性能
5. **文档完善**
- 提供清晰的工具描述
- 说明适用场景
- 提供使用示例
---
## 扩展分析引擎
### 添加新的分析阶段
如果需要添加新的分析阶段,按以下步骤操作:
#### 1. 创建引擎模块
```python
# src/engines/my_new_engine.py
import logging
from typing import Dict, Any
from src.models import DataProfile, AnalysisPlan
logger = logging.getLogger(__name__)
def my_new_analysis_stage(
data_profile: DataProfile,
analysis_plan: AnalysisPlan
) -> Dict[str, Any]:
"""
新的分析阶段。
参数:
data_profile: 数据画像
analysis_plan: 分析计划
返回:
分析结果
"""
logger.info("执行新的分析阶段...")
# 实现分析逻辑
result = {
"status": "completed",
"findings": []
}
return result
```
#### 2. 集成到主流程
`src/main.py` 中添加新阶段:
```python
class AnalysisOrchestrator:
def run_analysis(self):
# ... 现有阶段 ...
# 新阶段
self._report_progress("新分析阶段", 5, 6)
self.tracker.track_stage("新分析阶段", "started")
new_result = self._stage_new_analysis()
self.tracker.track_stage("新分析阶段", "completed")
# ... 继续 ...
def _stage_new_analysis(self) -> Dict[str, Any]:
"""新的分析阶段。"""
from src.engines.my_new_engine import my_new_analysis_stage
log_stage_start(logger, "新分析阶段")
result = my_new_analysis_stage(
self.data_profile,
self.analysis_plan
)
log_stage_end(logger, "新分析阶段")
return result
```
### 自定义 ReAct 执行逻辑
如果需要自定义任务执行逻辑:
```python
# src/engines/custom_execution.py
from typing import List, Dict, Any
from src.models import AnalysisTask, AnalysisResult
from src.tools.base import AnalysisTool
from src.data_access import DataAccessLayer
def custom_execute_task(
task: AnalysisTask,
tools: List[AnalysisTool],
data_access: DataAccessLayer
) -> AnalysisResult:
"""
自定义任务执行逻辑。
参数:
task: 分析任务
tools: 可用工具列表
data_access: 数据访问层
返回:
分析结果
"""
# 实现自定义执行逻辑
# 例如:使用不同的 AI 模型、不同的提示策略等
pass
```
---
## 自定义数据模型
### 扩展数据画像
如果需要添加新的数据特征:
```python
# src/models/data_profile.py
from dataclasses import dataclass, field
from typing import List, Dict, Any
@dataclass
class DataProfile:
# 现有字段...
# 新增字段
custom_features: Dict[str, Any] = field(default_factory=dict)
def add_custom_feature(self, name: str, value: Any):
"""添加自定义特征。"""
self.custom_features[name] = value
```
### 添加新的分析任务类型
```python
# src/models/analysis_plan.py
from dataclasses import dataclass
from typing import Optional
@dataclass
class CustomAnalysisTask(AnalysisTask):
"""自定义分析任务。"""
custom_param: Optional[str] = None
def validate(self) -> bool:
"""验证任务参数。"""
# 实现验证逻辑
return True
```
---
## 测试指南
### 测试策略
系统采用双重测试方法:
1. **单元测试**:验证特定示例、边缘情况和错误条件
2. **属性测试**:验证跨所有输入的通用属性
### 编写单元测试
```python
# tests/test_my_feature.py
import pytest
from src.my_module import my_function
def test_my_function_basic():
"""测试基本功能。"""
result = my_function(input_data)
assert result == expected_output
def test_my_function_edge_case():
"""测试边缘情况。"""
result = my_function(edge_case_input)
assert result is not None
def test_my_function_error_handling():
"""测试错误处理。"""
with pytest.raises(ValueError):
my_function(invalid_input)
```
### 编写属性测试
```python
# tests/test_my_feature_properties.py
from hypothesis import given, strategies as st
import hypothesis
# Feature: my-feature, Property 1: 输出总是有效
@given(input_data=st.data())
@hypothesis.settings(max_examples=100)
def test_output_always_valid(input_data):
"""
属性 1对于任何有效输入输出总是有效的。
"""
# 生成随机输入
data = generate_random_input(input_data)
# 执行函数
result = my_function(data)
# 验证属性
assert result is not None
assert validate_output(result)
```
### 运行测试
```bash
# 运行所有测试
pytest
# 运行单元测试
pytest tests/ -k "not properties"
# 运行属性测试
pytest tests/ -k "properties"
# 运行特定测试文件
pytest tests/test_my_feature.py -v
# 查看覆盖率
pytest --cov=src --cov-report=html
# 生成覆盖率报告
open htmlcov/index.html
```
---
## 代码规范
### Python 代码风格
遵循 PEP 8 规范:
```bash
# 检查代码风格
flake8 src/
# 自动格式化代码
black src/
```
### 文档字符串
使用 Google 风格的文档字符串:
```python
def my_function(param1: str, param2: int) -> Dict[str, Any]:
"""
函数的简短描述。
更详细的描述(如果需要)。
参数:
param1: 参数1的描述
param2: 参数2的描述
返回:
返回值的描述
异常:
ValueError: 参数无效时抛出
示例:
>>> result = my_function("test", 42)
>>> print(result)
{'status': 'success'}
"""
pass
```
### 类型注解
使用类型注解提高代码可读性:
```python
from typing import List, Dict, Optional, Any
def process_data(
data: List[Dict[str, Any]],
config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""处理数据。"""
pass
```
### 命名规范
- **模块名**:小写,下划线分隔(`my_module.py`
- **类名**:驼峰命名(`MyClass`
- **函数名**:小写,下划线分隔(`my_function`
- **常量**:大写,下划线分隔(`MY_CONSTANT`
- **私有成员**:前缀下划线(`_private_method`
---
## 调试技巧
### 启用详细日志
```bash
# 设置日志级别为 DEBUG
export LOG_LEVEL=DEBUG
# 或在代码中设置
import logging
logging.basicConfig(level=logging.DEBUG)
```
### 使用 Python 调试器
```python
# 在代码中设置断点
import pdb; pdb.set_trace()
# 或使用 ipdb更友好
import ipdb; ipdb.set_trace()
```
### 查看 AI 的思考过程
```bash
# 使用 -v 参数显示详细日志
python -m src.cli data.csv -v
```
### 保存中间结果
```python
# 在 AnalysisOrchestrator 中保存中间结果
import json
# 保存数据画像
with open('debug_data_profile.json', 'w') as f:
json.dump(self.data_profile.__dict__, f, indent=2)
# 保存分析计划
with open('debug_analysis_plan.json', 'w') as f:
json.dump([task.__dict__ for task in self.analysis_plan.tasks], f, indent=2)
```
### 模拟 AI 调用
在测试时模拟 AI 调用以避免 API 费用:
```python
from unittest.mock import patch
with patch('src.engines.data_understanding.call_llm') as mock_llm:
mock_llm.return_value = {
'data_type': 'ticket',
'key_fields': {'status': '工单状态'},
'quality_score': 85.0
}
# 执行测试
result = understand_data(data_access)
```
---
## 常见问题
### Q1: 如何添加对新数据格式的支持?
修改 `src/data_access.py` 中的 `load_from_file()` 方法:
```python
@classmethod
def load_from_file(cls, file_path: str) -> "DataAccessLayer":
"""从文件加载数据。"""
if file_path.endswith('.csv'):
data = cls._load_csv(file_path)
elif file_path.endswith('.xlsx'):
data = cls._load_excel(file_path)
elif file_path.endswith('.json'):
data = cls._load_json(file_path)
else:
raise ValueError(f"不支持的文件格式: {file_path}")
return cls(data)
```
### Q2: 如何更换 LLM 提供商?
修改 `.env` 文件:
```bash
# 使用 Gemini
LLM_PROVIDER=gemini
GEMINI_API_KEY=your_gemini_key
GEMINI_MODEL=gemini-2.0-flash-exp
```
### Q3: 如何优化性能?
1. 增加并发任务数(未来版本支持)
2. 使用更快的 LLM 模型
3. 减少 ReAct 最大迭代次数
4. 对大数据集进行采样
### Q4: 如何贡献代码?
1. Fork 项目
2. 创建特性分支
3. 编写代码和测试
4. 确保所有测试通过
5. 提交 Pull Request
---
## 资源链接
- **项目文档**: `docs/`
- **API 文档**: `docs/API.md`
- **配置指南**: `docs/configuration_guide.md`
- **示例代码**: `examples/`
- **测试数据**: `test_data/`
---
## 版本信息
- **版本**: v1.0.0
- **日期**: 2026-03-06
- **状态**: 稳定版本
---
## 联系方式
如有问题或建议,请创建 Issue 或联系维护者。