18 KiB
18 KiB
开发者指南
本指南帮助开发者理解系统架构、扩展功能和添加新工具。
目录
系统架构
整体架构
系统采用五阶段流水线架构,每个阶段由 AI 驱动:
数据输入 → 数据理解 → 需求理解 → 分析规划 → 任务执行 → 报告生成
核心组件
src/
├── main.py # 主流程编排
├── cli.py # 命令行接口
├── config.py # 配置管理
├── data_access.py # 数据访问层(隐私保护)
├── error_handling.py # 错误处理
├── logging_config.py # 日志配置
├── env_loader.py # 环境变量加载
├── engines/ # 分析引擎
│ ├── data_understanding.py # 数据理解
│ ├── requirement_understanding.py # 需求理解
│ ├── analysis_planning.py # 分析规划
│ ├── task_execution.py # 任务执行(ReAct)
│ ├── plan_adjustment.py # 计划调整
│ └── report_generation.py # 报告生成
├── models/ # 数据模型
│ ├── data_profile.py
│ ├── requirement_spec.py
│ ├── analysis_plan.py
│ └── analysis_result.py
└── tools/ # 分析工具
├── base.py # 工具基类和注册表
├── query_tools.py # 数据查询工具
├── stats_tools.py # 统计分析工具
├── viz_tools.py # 可视化工具
└── tool_manager.py # 工具管理器
数据流
CSV 文件
↓
DataAccessLayer(数据访问层)
↓
DataProfile(数据画像:元数据 + 统计摘要)
↓
RequirementSpec(需求规格)
↓
AnalysisPlan(分析计划:任务列表)
↓
AnalysisResult[](分析结果列表)
↓
Markdown 报告
设计原则
- AI 优先:让 AI 做决策,而不是执行预定义的规则
- 动态适应:根据数据特征和发现动态调整分析计划
- 隐私保护:AI 不读取原始数据,只通过工具获取摘要信息
- 工具驱动:通过动态工具集赋能 AI 的分析能力
- 可扩展性:易于添加新工具和扩展功能
开发环境设置
1. 克隆仓库
git clone <repository-url>
cd <repository-name>
2. 创建虚拟环境
# 使用 venv
python -m venv .venv
# 激活虚拟环境
# Windows
.venv\Scripts\activate
# Linux/Mac
source .venv/bin/activate
3. 安装依赖
# 安装生产依赖
pip install -r requirements.txt
# 安装开发依赖(如果有)
pip install pytest hypothesis pytest-cov black flake8
4. 配置环境变量
cp .env.example .env
# 编辑 .env 文件,设置 API 密钥
5. 运行测试
# 运行所有测试
pytest
# 运行特定测试
pytest tests/test_integration.py -v
# 查看覆盖率
pytest --cov=src --cov-report=html
添加新工具
步骤1:创建工具类
创建一个继承自 AnalysisTool 的新类:
# src/tools/my_custom_tools.py
from src.tools.base import AnalysisTool
from src.models import DataProfile
import pandas as pd
from typing import Dict, Any
class MyCustomTool(AnalysisTool):
"""
自定义分析工具。
功能:[描述工具的功能]
"""
@property
def name(self) -> str:
"""工具名称(唯一标识)。"""
return "my_custom_tool"
@property
def description(self) -> str:
"""工具描述(供 AI 理解)。"""
return """
这个工具用于 [具体功能描述]。
适用场景:
- [场景1]
- [场景2]
输入参数:
- column: 要分析的列名
- threshold: 阈值参数
输出:
- result: 分析结果
- insights: 洞察列表
"""
@property
def parameters(self) -> Dict[str, Any]:
"""参数定义(JSON Schema 格式)。"""
return {
"type": "object",
"properties": {
"column": {
"type": "string",
"description": "要分析的列名"
},
"threshold": {
"type": "number",
"description": "阈值参数",
"default": 0.5
}
},
"required": ["column"]
}
def execute(self, data: pd.DataFrame, **kwargs) -> Dict[str, Any]:
"""
执行工具。
参数:
data: 原始数据(工具内部使用,不暴露给 AI)
**kwargs: 工具参数
返回:
聚合后的结果(不包含原始数据)
"""
# 1. 验证参数
if not self.validate_parameters(**kwargs):
raise ValueError("参数验证失败")
column = kwargs['column']
threshold = kwargs.get('threshold', 0.5)
# 2. 检查列是否存在
if column not in data.columns:
raise ValueError(f"列 '{column}' 不存在")
# 3. 执行分析
# 注意:只返回聚合数据,不返回原始行级数据
result = {
"column": column,
"threshold": threshold,
"count": len(data),
"result_value": data[column].mean(), # 示例
"insights": [
f"列 {column} 的平均值为 {data[column].mean():.2f}"
]
}
return result
def is_applicable(self, data_profile: DataProfile) -> bool:
"""
判断工具是否适用于当前数据。
参数:
data_profile: 数据画像
返回:
True 如果工具适用,False 否则
"""
# 示例:检查是否有数值列
has_numeric = any(
col.dtype == 'numeric'
for col in data_profile.columns
)
return has_numeric
步骤2:注册工具
在 src/tools/__init__.py 中注册工具:
from src.tools.base import register_tool
from src.tools.my_custom_tools import MyCustomTool
# 注册工具
register_tool(MyCustomTool())
或者在工具管理器中动态注册:
from src.tools.tool_manager import ToolManager
from src.tools.my_custom_tools import MyCustomTool
tool_manager = ToolManager()
tool_manager.registry.register(MyCustomTool())
步骤3:编写测试
创建测试文件 tests/test_my_custom_tools.py:
import pytest
import pandas as pd
from hypothesis import given, strategies as st
from src.tools.my_custom_tools import MyCustomTool
from src.models import DataProfile, ColumnInfo
def test_my_custom_tool_basic():
"""测试工具的基本功能。"""
# 准备测试数据
data = pd.DataFrame({
'value': [1, 2, 3, 4, 5]
})
# 创建工具
tool = MyCustomTool()
# 执行工具
result = tool.execute(data, column='value', threshold=0.5)
# 验证结果
assert result['column'] == 'value'
assert result['threshold'] == 0.5
assert result['count'] == 5
assert 'insights' in result
def test_my_custom_tool_invalid_column():
"""测试无效列名的处理。"""
data = pd.DataFrame({'value': [1, 2, 3]})
tool = MyCustomTool()
with pytest.raises(ValueError, match="列 .* 不存在"):
tool.execute(data, column='invalid_column')
@given(data=st.data())
def test_my_custom_tool_property(data):
"""属性测试:工具应该总是返回聚合数据。"""
# 生成随机数据
df = pd.DataFrame({
'value': data.draw(st.lists(st.floats(), min_size=10, max_size=100))
})
tool = MyCustomTool()
result = tool.execute(df, column='value')
# 验证:结果不应包含原始行级数据
assert 'data' not in result or len(result.get('data', [])) <= 100
assert 'insights' in result
步骤4:更新文档
在 docs/API.md 中添加工具文档:
### MyCustomTool
自定义分析工具。
**功能**:[描述]
**参数**:
- `column` (str): 要分析的列名
- `threshold` (float): 阈值参数,默认 0.5
**返回值**:
```python
{
"column": str,
"threshold": float,
"count": int,
"result_value": float,
"insights": List[str]
}
示例:
tool = MyCustomTool()
result = tool.execute(data, column='value', threshold=0.5)
### 工具开发最佳实践
1. **隐私保护**:
- 永远不要返回原始行级数据
- 只返回聚合数据(统计值、计数、分组结果等)
- 限制返回的数据行数(最多 100 行)
2. **参数验证**:
- 使用 JSON Schema 定义参数
- 在 `execute()` 中验证参数
- 提供清晰的错误信息
3. **错误处理**:
- 捕获并处理异常
- 返回有意义的错误信息
- 不要让工具崩溃整个流程
4. **性能优化**:
- 避免不必要的数据复制
- 使用 pandas 的向量化操作
- 考虑大数据集的性能
5. **文档完善**:
- 提供清晰的工具描述
- 说明适用场景
- 提供使用示例
---
## 扩展分析引擎
### 添加新的分析阶段
如果需要添加新的分析阶段,按以下步骤操作:
#### 1. 创建引擎模块
```python
# src/engines/my_new_engine.py
import logging
from typing import Dict, Any
from src.models import DataProfile, AnalysisPlan
logger = logging.getLogger(__name__)
def my_new_analysis_stage(
data_profile: DataProfile,
analysis_plan: AnalysisPlan
) -> Dict[str, Any]:
"""
新的分析阶段。
参数:
data_profile: 数据画像
analysis_plan: 分析计划
返回:
分析结果
"""
logger.info("执行新的分析阶段...")
# 实现分析逻辑
result = {
"status": "completed",
"findings": []
}
return result
2. 集成到主流程
在 src/main.py 中添加新阶段:
class AnalysisOrchestrator:
def run_analysis(self):
# ... 现有阶段 ...
# 新阶段
self._report_progress("新分析阶段", 5, 6)
self.tracker.track_stage("新分析阶段", "started")
new_result = self._stage_new_analysis()
self.tracker.track_stage("新分析阶段", "completed")
# ... 继续 ...
def _stage_new_analysis(self) -> Dict[str, Any]:
"""新的分析阶段。"""
from src.engines.my_new_engine import my_new_analysis_stage
log_stage_start(logger, "新分析阶段")
result = my_new_analysis_stage(
self.data_profile,
self.analysis_plan
)
log_stage_end(logger, "新分析阶段")
return result
自定义 ReAct 执行逻辑
如果需要自定义任务执行逻辑:
# src/engines/custom_execution.py
from typing import List, Dict, Any
from src.models import AnalysisTask, AnalysisResult
from src.tools.base import AnalysisTool
from src.data_access import DataAccessLayer
def custom_execute_task(
task: AnalysisTask,
tools: List[AnalysisTool],
data_access: DataAccessLayer
) -> AnalysisResult:
"""
自定义任务执行逻辑。
参数:
task: 分析任务
tools: 可用工具列表
data_access: 数据访问层
返回:
分析结果
"""
# 实现自定义执行逻辑
# 例如:使用不同的 AI 模型、不同的提示策略等
pass
自定义数据模型
扩展数据画像
如果需要添加新的数据特征:
# src/models/data_profile.py
from dataclasses import dataclass, field
from typing import List, Dict, Any
@dataclass
class DataProfile:
# 现有字段...
# 新增字段
custom_features: Dict[str, Any] = field(default_factory=dict)
def add_custom_feature(self, name: str, value: Any):
"""添加自定义特征。"""
self.custom_features[name] = value
添加新的分析任务类型
# src/models/analysis_plan.py
from dataclasses import dataclass
from typing import Optional
@dataclass
class CustomAnalysisTask(AnalysisTask):
"""自定义分析任务。"""
custom_param: Optional[str] = None
def validate(self) -> bool:
"""验证任务参数。"""
# 实现验证逻辑
return True
测试指南
测试策略
系统采用双重测试方法:
- 单元测试:验证特定示例、边缘情况和错误条件
- 属性测试:验证跨所有输入的通用属性
编写单元测试
# tests/test_my_feature.py
import pytest
from src.my_module import my_function
def test_my_function_basic():
"""测试基本功能。"""
result = my_function(input_data)
assert result == expected_output
def test_my_function_edge_case():
"""测试边缘情况。"""
result = my_function(edge_case_input)
assert result is not None
def test_my_function_error_handling():
"""测试错误处理。"""
with pytest.raises(ValueError):
my_function(invalid_input)
编写属性测试
# tests/test_my_feature_properties.py
from hypothesis import given, strategies as st
import hypothesis
# Feature: my-feature, Property 1: 输出总是有效
@given(input_data=st.data())
@hypothesis.settings(max_examples=100)
def test_output_always_valid(input_data):
"""
属性 1:对于任何有效输入,输出总是有效的。
"""
# 生成随机输入
data = generate_random_input(input_data)
# 执行函数
result = my_function(data)
# 验证属性
assert result is not None
assert validate_output(result)
运行测试
# 运行所有测试
pytest
# 运行单元测试
pytest tests/ -k "not properties"
# 运行属性测试
pytest tests/ -k "properties"
# 运行特定测试文件
pytest tests/test_my_feature.py -v
# 查看覆盖率
pytest --cov=src --cov-report=html
# 生成覆盖率报告
open htmlcov/index.html
代码规范
Python 代码风格
遵循 PEP 8 规范:
# 检查代码风格
flake8 src/
# 自动格式化代码
black src/
文档字符串
使用 Google 风格的文档字符串:
def my_function(param1: str, param2: int) -> Dict[str, Any]:
"""
函数的简短描述。
更详细的描述(如果需要)。
参数:
param1: 参数1的描述
param2: 参数2的描述
返回:
返回值的描述
异常:
ValueError: 参数无效时抛出
示例:
>>> result = my_function("test", 42)
>>> print(result)
{'status': 'success'}
"""
pass
类型注解
使用类型注解提高代码可读性:
from typing import List, Dict, Optional, Any
def process_data(
data: List[Dict[str, Any]],
config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""处理数据。"""
pass
命名规范
- 模块名:小写,下划线分隔(
my_module.py) - 类名:驼峰命名(
MyClass) - 函数名:小写,下划线分隔(
my_function) - 常量:大写,下划线分隔(
MY_CONSTANT) - 私有成员:前缀下划线(
_private_method)
调试技巧
启用详细日志
# 设置日志级别为 DEBUG
export LOG_LEVEL=DEBUG
# 或在代码中设置
import logging
logging.basicConfig(level=logging.DEBUG)
使用 Python 调试器
# 在代码中设置断点
import pdb; pdb.set_trace()
# 或使用 ipdb(更友好)
import ipdb; ipdb.set_trace()
查看 AI 的思考过程
# 使用 -v 参数显示详细日志
python -m src.cli data.csv -v
保存中间结果
# 在 AnalysisOrchestrator 中保存中间结果
import json
# 保存数据画像
with open('debug_data_profile.json', 'w') as f:
json.dump(self.data_profile.__dict__, f, indent=2)
# 保存分析计划
with open('debug_analysis_plan.json', 'w') as f:
json.dump([task.__dict__ for task in self.analysis_plan.tasks], f, indent=2)
模拟 AI 调用
在测试时模拟 AI 调用以避免 API 费用:
from unittest.mock import patch
with patch('src.engines.data_understanding.call_llm') as mock_llm:
mock_llm.return_value = {
'data_type': 'ticket',
'key_fields': {'status': '工单状态'},
'quality_score': 85.0
}
# 执行测试
result = understand_data(data_access)
常见问题
Q1: 如何添加对新数据格式的支持?
修改 src/data_access.py 中的 load_from_file() 方法:
@classmethod
def load_from_file(cls, file_path: str) -> "DataAccessLayer":
"""从文件加载数据。"""
if file_path.endswith('.csv'):
data = cls._load_csv(file_path)
elif file_path.endswith('.xlsx'):
data = cls._load_excel(file_path)
elif file_path.endswith('.json'):
data = cls._load_json(file_path)
else:
raise ValueError(f"不支持的文件格式: {file_path}")
return cls(data)
Q2: 如何更换 LLM 提供商?
修改 .env 文件:
# 使用 Gemini
LLM_PROVIDER=gemini
GEMINI_API_KEY=your_gemini_key
GEMINI_MODEL=gemini-2.0-flash-exp
Q3: 如何优化性能?
- 增加并发任务数(未来版本支持)
- 使用更快的 LLM 模型
- 减少 ReAct 最大迭代次数
- 对大数据集进行采样
Q4: 如何贡献代码?
- Fork 项目
- 创建特性分支
- 编写代码和测试
- 确保所有测试通过
- 提交 Pull Request
资源链接
- 项目文档:
docs/ - API 文档:
docs/API.md - 配置指南:
docs/configuration_guide.md - 示例代码:
examples/ - 测试数据:
test_data/
版本信息
- 版本: v1.0.0
- 日期: 2026-03-06
- 状态: 稳定版本
联系方式
如有问题或建议,请创建 Issue 或联系维护者。