Files
vibe_data_ana/docs/DEVELOPER_GUIDE.md

18 KiB
Raw Blame History

开发者指南

本指南帮助开发者理解系统架构、扩展功能和添加新工具。

目录


系统架构

整体架构

系统采用五阶段流水线架构,每个阶段由 AI 驱动:

数据输入 → 数据理解 → 需求理解 → 分析规划 → 任务执行 → 报告生成

核心组件

src/
├── main.py                 # 主流程编排
├── cli.py                  # 命令行接口
├── config.py               # 配置管理
├── data_access.py          # 数据访问层(隐私保护)
├── error_handling.py       # 错误处理
├── logging_config.py       # 日志配置
├── env_loader.py           # 环境变量加载
├── engines/                # 分析引擎
│   ├── data_understanding.py       # 数据理解
│   ├── requirement_understanding.py # 需求理解
│   ├── analysis_planning.py        # 分析规划
│   ├── task_execution.py           # 任务执行ReAct
│   ├── plan_adjustment.py          # 计划调整
│   └── report_generation.py        # 报告生成
├── models/                 # 数据模型
│   ├── data_profile.py
│   ├── requirement_spec.py
│   ├── analysis_plan.py
│   └── analysis_result.py
└── tools/                  # 分析工具
    ├── base.py             # 工具基类和注册表
    ├── query_tools.py      # 数据查询工具
    ├── stats_tools.py      # 统计分析工具
    ├── viz_tools.py        # 可视化工具
    └── tool_manager.py     # 工具管理器

数据流

CSV 文件
  ↓
DataAccessLayer数据访问层
  ↓
DataProfile数据画像元数据 + 统计摘要)
  ↓
RequirementSpec需求规格
  ↓
AnalysisPlan分析计划任务列表
  ↓
AnalysisResult[](分析结果列表)
  ↓
Markdown 报告

设计原则

  1. AI 优先:让 AI 做决策,而不是执行预定义的规则
  2. 动态适应:根据数据特征和发现动态调整分析计划
  3. 隐私保护AI 不读取原始数据,只通过工具获取摘要信息
  4. 工具驱动:通过动态工具集赋能 AI 的分析能力
  5. 可扩展性:易于添加新工具和扩展功能

开发环境设置

1. 克隆仓库

git clone <repository-url>
cd <repository-name>

2. 创建虚拟环境

# 使用 venv
python -m venv .venv

# 激活虚拟环境
# Windows
.venv\Scripts\activate
# Linux/Mac
source .venv/bin/activate

3. 安装依赖

# 安装生产依赖
pip install -r requirements.txt

# 安装开发依赖(如果有)
pip install pytest hypothesis pytest-cov black flake8

4. 配置环境变量

cp .env.example .env
# 编辑 .env 文件,设置 API 密钥

5. 运行测试

# 运行所有测试
pytest

# 运行特定测试
pytest tests/test_integration.py -v

# 查看覆盖率
pytest --cov=src --cov-report=html

添加新工具

步骤1创建工具类

创建一个继承自 AnalysisTool 的新类:

# src/tools/my_custom_tools.py

from src.tools.base import AnalysisTool
from src.models import DataProfile
import pandas as pd
from typing import Dict, Any


class MyCustomTool(AnalysisTool):
    """
    自定义分析工具。
    
    功能:[描述工具的功能]
    """
    
    @property
    def name(self) -> str:
        """工具名称(唯一标识)。"""
        return "my_custom_tool"
    
    @property
    def description(self) -> str:
        """工具描述(供 AI 理解)。"""
        return """
        这个工具用于 [具体功能描述]。
        
        适用场景:
        - [场景1]
        - [场景2]
        
        输入参数:
        - column: 要分析的列名
        - threshold: 阈值参数
        
        输出:
        - result: 分析结果
        - insights: 洞察列表
        """
    
    @property
    def parameters(self) -> Dict[str, Any]:
        """参数定义JSON Schema 格式)。"""
        return {
            "type": "object",
            "properties": {
                "column": {
                    "type": "string",
                    "description": "要分析的列名"
                },
                "threshold": {
                    "type": "number",
                    "description": "阈值参数",
                    "default": 0.5
                }
            },
            "required": ["column"]
        }
    
    def execute(self, data: pd.DataFrame, **kwargs) -> Dict[str, Any]:
        """
        执行工具。
        
        参数:
            data: 原始数据(工具内部使用,不暴露给 AI
            **kwargs: 工具参数
        
        返回:
            聚合后的结果(不包含原始数据)
        """
        # 1. 验证参数
        if not self.validate_parameters(**kwargs):
            raise ValueError("参数验证失败")
        
        column = kwargs['column']
        threshold = kwargs.get('threshold', 0.5)
        
        # 2. 检查列是否存在
        if column not in data.columns:
            raise ValueError(f"列 '{column}' 不存在")
        
        # 3. 执行分析
        # 注意:只返回聚合数据,不返回原始行级数据
        result = {
            "column": column,
            "threshold": threshold,
            "count": len(data),
            "result_value": data[column].mean(),  # 示例
            "insights": [
                f"列 {column} 的平均值为 {data[column].mean():.2f}"
            ]
        }
        
        return result
    
    def is_applicable(self, data_profile: DataProfile) -> bool:
        """
        判断工具是否适用于当前数据。
        
        参数:
            data_profile: 数据画像
        
        返回:
            True 如果工具适用False 否则
        """
        # 示例:检查是否有数值列
        has_numeric = any(
            col.dtype == 'numeric' 
            for col in data_profile.columns
        )
        
        return has_numeric

步骤2注册工具

src/tools/__init__.py 中注册工具:

from src.tools.base import register_tool
from src.tools.my_custom_tools import MyCustomTool

# 注册工具
register_tool(MyCustomTool())

或者在工具管理器中动态注册:

from src.tools.tool_manager import ToolManager
from src.tools.my_custom_tools import MyCustomTool

tool_manager = ToolManager()
tool_manager.registry.register(MyCustomTool())

步骤3编写测试

创建测试文件 tests/test_my_custom_tools.py

import pytest
import pandas as pd
from hypothesis import given, strategies as st

from src.tools.my_custom_tools import MyCustomTool
from src.models import DataProfile, ColumnInfo


def test_my_custom_tool_basic():
    """测试工具的基本功能。"""
    # 准备测试数据
    data = pd.DataFrame({
        'value': [1, 2, 3, 4, 5]
    })
    
    # 创建工具
    tool = MyCustomTool()
    
    # 执行工具
    result = tool.execute(data, column='value', threshold=0.5)
    
    # 验证结果
    assert result['column'] == 'value'
    assert result['threshold'] == 0.5
    assert result['count'] == 5
    assert 'insights' in result


def test_my_custom_tool_invalid_column():
    """测试无效列名的处理。"""
    data = pd.DataFrame({'value': [1, 2, 3]})
    tool = MyCustomTool()
    
    with pytest.raises(ValueError, match="列 .* 不存在"):
        tool.execute(data, column='invalid_column')


@given(data=st.data())
def test_my_custom_tool_property(data):
    """属性测试:工具应该总是返回聚合数据。"""
    # 生成随机数据
    df = pd.DataFrame({
        'value': data.draw(st.lists(st.floats(), min_size=10, max_size=100))
    })
    
    tool = MyCustomTool()
    result = tool.execute(df, column='value')
    
    # 验证:结果不应包含原始行级数据
    assert 'data' not in result or len(result.get('data', [])) <= 100
    assert 'insights' in result

步骤4更新文档

docs/API.md 中添加工具文档:

### MyCustomTool

自定义分析工具。

**功能**[描述]

**参数**
- `column` (str): 要分析的列名
- `threshold` (float): 阈值参数,默认 0.5

**返回值**
```python
{
    "column": str,
    "threshold": float,
    "count": int,
    "result_value": float,
    "insights": List[str]
}

示例

tool = MyCustomTool()
result = tool.execute(data, column='value', threshold=0.5)

### 工具开发最佳实践

1. **隐私保护**
   - 永远不要返回原始行级数据
   - 只返回聚合数据(统计值、计数、分组结果等)
   - 限制返回的数据行数(最多 100 行)

2. **参数验证**
   - 使用 JSON Schema 定义参数
   - 在 `execute()` 中验证参数
   - 提供清晰的错误信息

3. **错误处理**
   - 捕获并处理异常
   - 返回有意义的错误信息
   - 不要让工具崩溃整个流程

4. **性能优化**
   - 避免不必要的数据复制
   - 使用 pandas 的向量化操作
   - 考虑大数据集的性能

5. **文档完善**
   - 提供清晰的工具描述
   - 说明适用场景
   - 提供使用示例

---

## 扩展分析引擎

### 添加新的分析阶段

如果需要添加新的分析阶段,按以下步骤操作:

#### 1. 创建引擎模块

```python
# src/engines/my_new_engine.py

import logging
from typing import Dict, Any

from src.models import DataProfile, AnalysisPlan

logger = logging.getLogger(__name__)


def my_new_analysis_stage(
    data_profile: DataProfile,
    analysis_plan: AnalysisPlan
) -> Dict[str, Any]:
    """
    新的分析阶段。
    
    参数:
        data_profile: 数据画像
        analysis_plan: 分析计划
    
    返回:
        分析结果
    """
    logger.info("执行新的分析阶段...")
    
    # 实现分析逻辑
    result = {
        "status": "completed",
        "findings": []
    }
    
    return result

2. 集成到主流程

src/main.py 中添加新阶段:

class AnalysisOrchestrator:
    def run_analysis(self):
        # ... 现有阶段 ...
        
        # 新阶段
        self._report_progress("新分析阶段", 5, 6)
        self.tracker.track_stage("新分析阶段", "started")
        new_result = self._stage_new_analysis()
        self.tracker.track_stage("新分析阶段", "completed")
        
        # ... 继续 ...
    
    def _stage_new_analysis(self) -> Dict[str, Any]:
        """新的分析阶段。"""
        from src.engines.my_new_engine import my_new_analysis_stage
        
        log_stage_start(logger, "新分析阶段")
        result = my_new_analysis_stage(
            self.data_profile,
            self.analysis_plan
        )
        log_stage_end(logger, "新分析阶段")
        
        return result

自定义 ReAct 执行逻辑

如果需要自定义任务执行逻辑:

# src/engines/custom_execution.py

from typing import List, Dict, Any
from src.models import AnalysisTask, AnalysisResult
from src.tools.base import AnalysisTool
from src.data_access import DataAccessLayer


def custom_execute_task(
    task: AnalysisTask,
    tools: List[AnalysisTool],
    data_access: DataAccessLayer
) -> AnalysisResult:
    """
    自定义任务执行逻辑。
    
    参数:
        task: 分析任务
        tools: 可用工具列表
        data_access: 数据访问层
    
    返回:
        分析结果
    """
    # 实现自定义执行逻辑
    # 例如:使用不同的 AI 模型、不同的提示策略等
    
    pass

自定义数据模型

扩展数据画像

如果需要添加新的数据特征:

# src/models/data_profile.py

from dataclasses import dataclass, field
from typing import List, Dict, Any

@dataclass
class DataProfile:
    # 现有字段...
    
    # 新增字段
    custom_features: Dict[str, Any] = field(default_factory=dict)
    
    def add_custom_feature(self, name: str, value: Any):
        """添加自定义特征。"""
        self.custom_features[name] = value

添加新的分析任务类型

# src/models/analysis_plan.py

from dataclasses import dataclass
from typing import Optional

@dataclass
class CustomAnalysisTask(AnalysisTask):
    """自定义分析任务。"""
    
    custom_param: Optional[str] = None
    
    def validate(self) -> bool:
        """验证任务参数。"""
        # 实现验证逻辑
        return True

测试指南

测试策略

系统采用双重测试方法:

  1. 单元测试:验证特定示例、边缘情况和错误条件
  2. 属性测试:验证跨所有输入的通用属性

编写单元测试

# tests/test_my_feature.py

import pytest
from src.my_module import my_function


def test_my_function_basic():
    """测试基本功能。"""
    result = my_function(input_data)
    assert result == expected_output


def test_my_function_edge_case():
    """测试边缘情况。"""
    result = my_function(edge_case_input)
    assert result is not None


def test_my_function_error_handling():
    """测试错误处理。"""
    with pytest.raises(ValueError):
        my_function(invalid_input)

编写属性测试

# tests/test_my_feature_properties.py

from hypothesis import given, strategies as st
import hypothesis


# Feature: my-feature, Property 1: 输出总是有效
@given(input_data=st.data())
@hypothesis.settings(max_examples=100)
def test_output_always_valid(input_data):
    """
    属性 1对于任何有效输入输出总是有效的。
    """
    # 生成随机输入
    data = generate_random_input(input_data)
    
    # 执行函数
    result = my_function(data)
    
    # 验证属性
    assert result is not None
    assert validate_output(result)

运行测试

# 运行所有测试
pytest

# 运行单元测试
pytest tests/ -k "not properties"

# 运行属性测试
pytest tests/ -k "properties"

# 运行特定测试文件
pytest tests/test_my_feature.py -v

# 查看覆盖率
pytest --cov=src --cov-report=html

# 生成覆盖率报告
open htmlcov/index.html

代码规范

Python 代码风格

遵循 PEP 8 规范:

# 检查代码风格
flake8 src/

# 自动格式化代码
black src/

文档字符串

使用 Google 风格的文档字符串:

def my_function(param1: str, param2: int) -> Dict[str, Any]:
    """
    函数的简短描述。
    
    更详细的描述(如果需要)。
    
    参数:
        param1: 参数1的描述
        param2: 参数2的描述
    
    返回:
        返回值的描述
    
    异常:
        ValueError: 参数无效时抛出
    
    示例:
        >>> result = my_function("test", 42)
        >>> print(result)
        {'status': 'success'}
    """
    pass

类型注解

使用类型注解提高代码可读性:

from typing import List, Dict, Optional, Any

def process_data(
    data: List[Dict[str, Any]],
    config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """处理数据。"""
    pass

命名规范

  • 模块名:小写,下划线分隔(my_module.py
  • 类名:驼峰命名(MyClass
  • 函数名:小写,下划线分隔(my_function
  • 常量:大写,下划线分隔(MY_CONSTANT
  • 私有成员:前缀下划线(_private_method

调试技巧

启用详细日志

# 设置日志级别为 DEBUG
export LOG_LEVEL=DEBUG

# 或在代码中设置
import logging
logging.basicConfig(level=logging.DEBUG)

使用 Python 调试器

# 在代码中设置断点
import pdb; pdb.set_trace()

# 或使用 ipdb更友好
import ipdb; ipdb.set_trace()

查看 AI 的思考过程

# 使用 -v 参数显示详细日志
python -m src.cli data.csv -v

保存中间结果

# 在 AnalysisOrchestrator 中保存中间结果
import json

# 保存数据画像
with open('debug_data_profile.json', 'w') as f:
    json.dump(self.data_profile.__dict__, f, indent=2)

# 保存分析计划
with open('debug_analysis_plan.json', 'w') as f:
    json.dump([task.__dict__ for task in self.analysis_plan.tasks], f, indent=2)

模拟 AI 调用

在测试时模拟 AI 调用以避免 API 费用:

from unittest.mock import patch

with patch('src.engines.data_understanding.call_llm') as mock_llm:
    mock_llm.return_value = {
        'data_type': 'ticket',
        'key_fields': {'status': '工单状态'},
        'quality_score': 85.0
    }
    
    # 执行测试
    result = understand_data(data_access)

常见问题

Q1: 如何添加对新数据格式的支持?

修改 src/data_access.py 中的 load_from_file() 方法:

@classmethod
def load_from_file(cls, file_path: str) -> "DataAccessLayer":
    """从文件加载数据。"""
    if file_path.endswith('.csv'):
        data = cls._load_csv(file_path)
    elif file_path.endswith('.xlsx'):
        data = cls._load_excel(file_path)
    elif file_path.endswith('.json'):
        data = cls._load_json(file_path)
    else:
        raise ValueError(f"不支持的文件格式: {file_path}")
    
    return cls(data)

Q2: 如何更换 LLM 提供商?

修改 .env 文件:

# 使用 Gemini
LLM_PROVIDER=gemini
GEMINI_API_KEY=your_gemini_key
GEMINI_MODEL=gemini-2.0-flash-exp

Q3: 如何优化性能?

  1. 增加并发任务数(未来版本支持)
  2. 使用更快的 LLM 模型
  3. 减少 ReAct 最大迭代次数
  4. 对大数据集进行采样

Q4: 如何贡献代码?

  1. Fork 项目
  2. 创建特性分支
  3. 编写代码和测试
  4. 确保所有测试通过
  5. 提交 Pull Request

资源链接

  • 项目文档: docs/
  • API 文档: docs/API.md
  • 配置指南: docs/configuration_guide.md
  • 示例代码: examples/
  • 测试数据: test_data/

版本信息

  • 版本: v1.0.0
  • 日期: 2026-03-06
  • 状态: 稳定版本

联系方式

如有问题或建议,请创建 Issue 或联系维护者。