Files
vibe_data_ana/docs/API.md

17 KiB
Raw Blame History

API 文档

本文档描述了 AI 数据分析 Agent 系统的核心 API 接口。

目录


主流程 API

run_analysis()

运行完整的数据分析流程。

函数签名

def run_analysis(
    data_file: str,
    user_requirement: Optional[str] = None,
    template_file: Optional[str] = None,
    output_dir: str = "output",
    progress_callback: Optional[callable] = None
) -> Dict[str, Any]

参数

  • data_file (str): 数据文件路径CSV 格式)
  • user_requirement (Optional[str]): 用户需求(自然语言),如果为 None 则自动推断
  • template_file (Optional[str]): 模板文件路径(可选)
  • output_dir (str): 输出目录,默认为 "output"
  • progress_callback (Optional[callable]): 进度回调函数,接收 (stage, current, total) 参数

返回值

{
    'success': bool,              # 是否成功
    'data_type': str,             # 数据类型
    'objectives_count': int,      # 分析目标数量
    'tasks_count': int,           # 任务数量
    'results_count': int,         # 结果数量
    'report_path': str,           # 报告路径
    'elapsed_time': float,        # 执行时间(秒)
    'error': str                  # 错误信息(如果失败)
}

示例

from src.main import run_analysis

# 基本使用
result = run_analysis(
    data_file="data.csv",
    user_requirement="分析工单健康度"
)

if result['success']:
    print(f"报告路径: {result['report_path']}")
    print(f"执行时间: {result['elapsed_time']:.1f}秒")
else:
    print(f"分析失败: {result['error']}")

# 使用进度回调
def progress_handler(stage, current, total):
    print(f"[{current}/{total}] {stage}")

result = run_analysis(
    data_file="data.csv",
    progress_callback=progress_handler
)

AnalysisOrchestrator

分析编排器类,协调五个阶段的执行。

类签名

class AnalysisOrchestrator:
    def __init__(
        self,
        data_file: str,
        user_requirement: Optional[str] = None,
        template_file: Optional[str] = None,
        output_dir: Optional[str] = None,
        progress_callback: Optional[callable] = None
    )

方法

run_analysis()

运行完整的分析流程。

返回值:与 run_analysis() 函数相同

示例

from src.main import AnalysisOrchestrator

orchestrator = AnalysisOrchestrator(
    data_file="data.csv",
    user_requirement="分析工单健康度",
    output_dir="output"
)

result = orchestrator.run_analysis()

配置管理 API

Config

系统配置类。

类签名

@dataclass
class Config:
    llm: LLMConfig
    performance: PerformanceConfig
    output: OutputConfig
    code_repo_enable_reuse: bool = True

类方法

from_env()

从环境变量加载配置。

@classmethod
def from_env(cls) -> "Config"

示例

from src.config import Config

config = Config.from_env()
print(f"模型: {config.llm.model}")
print(f"输出目录: {config.output.output_dir}")

from_file()

从配置文件加载配置。

@classmethod
def from_file(cls, config_file: str) -> "Config"

参数

  • config_file (str): 配置文件路径JSON 格式)

示例

config = Config.from_file("config.json")

from_dict()

从字典加载配置。

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> "Config"

参数

  • config_dict (Dict[str, Any]): 配置字典

to_dict()

转换为字典。

def to_dict(self) -> Dict[str, Any]

save_to_file()

保存配置到文件。

def save_to_file(self, config_file: str)

validate()

验证配置的有效性。

def validate(self) -> bool

LLMConfig

LLM API 配置。

类签名

@dataclass
class LLMConfig:
    provider: str = "openai"
    api_key: str = ""
    base_url: str = "https://api.openai.com/v1"
    model: str = "gpt-4"
    timeout: int = 120
    max_retries: int = 3
    temperature: float = 0.7
    max_tokens: Optional[int] = None

PerformanceConfig

性能参数配置。

类签名

@dataclass
class PerformanceConfig:
    agent_max_rounds: int = 20
    agent_timeout: int = 300
    tool_max_query_rows: int = 10000
    tool_execution_timeout: int = 60
    data_max_rows: int = 1000000
    data_sample_threshold: int = 1000000
    max_concurrent_tasks: int = 1

OutputConfig

输出路径配置。

类签名

@dataclass
class OutputConfig:
    output_dir: str = "output"
    log_dir: Optional[str] = None
    chart_dir: Optional[str] = None
    report_filename: str = "analysis_report.md"
    log_level: str = "INFO"
    log_to_file: bool = True
    log_to_console: bool = True

方法

  • get_output_path() -> Path: 获取输出目录路径
  • get_log_path() -> Path: 获取日志目录路径
  • get_chart_path() -> Path: 获取图表目录路径
  • get_report_path() -> Path: 获取报告文件路径

全局配置函数

get_config()

获取全局配置实例。

def get_config() -> Config

set_config()

设置全局配置实例。

def set_config(config: Config)

load_config_from_env()

从环境变量加载配置并设置为全局配置。

def load_config_from_env() -> Config

load_config_from_file()

从文件加载配置并设置为全局配置。

def load_config_from_file(config_file: str) -> Config

数据访问 API

DataAccessLayer

数据访问层,提供数据加载和隐私保护机制。

类方法

load_from_file()

从文件加载数据。

@classmethod
def load_from_file(cls, file_path: str) -> "DataAccessLayer"

参数

  • file_path (str): 数据文件路径

返回值DataAccessLayer 实例

示例

from src.data_access import DataAccessLayer

data_access = DataAccessLayer.load_from_file("data.csv")
print(f"数据形状: {data_access.shape}")

实例方法

get_profile()

获取数据画像(不包含原始数据)。

def get_profile(self) -> DataProfile

execute_tool()

执行工具并返回聚合结果。

def execute_tool(self, tool: AnalysisTool, **kwargs) -> Dict[str, Any]

参数

  • tool (AnalysisTool): 工具实例
  • **kwargs: 工具参数

返回值:聚合后的结果字典


分析引擎 API

数据理解引擎

understand_data()

AI 驱动的数据理解。

def understand_data(data_access: DataAccessLayer) -> DataProfile

参数

  • data_access (DataAccessLayer): 数据访问层实例

返回值DataProfile 对象

示例

from src.engines import understand_data
from src.data_access import DataAccessLayer

data_access = DataAccessLayer.load_from_file("data.csv")
profile = understand_data(data_access)

print(f"数据类型: {profile.inferred_type}")
print(f"质量分数: {profile.quality_score}")

需求理解引擎

understand_requirement()

AI 驱动的需求理解。

def understand_requirement(
    user_input: str,
    data_profile: DataProfile,
    template_path: Optional[str] = None
) -> RequirementSpec

参数

  • user_input (str): 用户需求(自然语言)
  • data_profile (DataProfile): 数据画像
  • template_path (Optional[str]): 模板文件路径

返回值RequirementSpec 对象

分析规划引擎

plan_analysis()

AI 驱动的分析规划。

def plan_analysis(
    data_profile: DataProfile,
    requirement: RequirementSpec
) -> AnalysisPlan

参数

  • data_profile (DataProfile): 数据画像
  • requirement (RequirementSpec): 需求规格

返回值AnalysisPlan 对象

任务执行引擎

execute_task()

使用 ReAct 模式执行任务。

def execute_task(
    task: AnalysisTask,
    tools: List[AnalysisTool],
    data_access: DataAccessLayer
) -> AnalysisResult

参数

  • task (AnalysisTask): 分析任务
  • tools (List[AnalysisTool]): 可用工具列表
  • data_access (DataAccessLayer): 数据访问层

返回值AnalysisResult 对象

计划调整引擎

adjust_plan()

根据中间结果动态调整计划。

def adjust_plan(
    plan: AnalysisPlan,
    completed_results: List[AnalysisResult]
) -> AnalysisPlan

参数

  • plan (AnalysisPlan): 当前分析计划
  • completed_results (List[AnalysisResult]): 已完成的分析结果

返回值:调整后的 AnalysisPlan 对象

报告生成引擎

generate_report()

AI 驱动的报告生成。

def generate_report(
    results: List[AnalysisResult],
    requirement: RequirementSpec,
    data_profile: DataProfile,
    output_path: str
) -> str

参数

  • results (List[AnalysisResult]): 分析结果列表
  • requirement (RequirementSpec): 需求规格
  • data_profile (DataProfile): 数据画像
  • output_path (str): 输出路径

返回值Markdown 格式的报告内容


工具系统 API

AnalysisTool

分析工具的抽象基类。

抽象属性

name

工具名称。

@property
@abstractmethod
def name(self) -> str

description

工具描述(供 AI 理解)。

@property
@abstractmethod
def description(self) -> str

parameters

参数定义JSON Schema 格式)。

@property
@abstractmethod
def parameters(self) -> Dict[str, Any]

抽象方法

execute()

执行工具。

@abstractmethod
def execute(self, data: pd.DataFrame, **kwargs) -> Dict[str, Any]

参数

  • data (pd.DataFrame): 原始数据
  • **kwargs: 工具参数

返回值:聚合后的结果字典

is_applicable()

判断工具是否适用于当前数据。

@abstractmethod
def is_applicable(self, data_profile: DataProfile) -> bool

参数

  • data_profile (DataProfile): 数据画像

返回值True 如果工具适用False 否则

方法

validate_parameters()

验证参数是否有效。

def validate_parameters(self, **kwargs) -> bool

ToolRegistry

工具注册表,管理所有可用的工具。

方法

register()

注册一个工具。

def register(self, tool: AnalysisTool) -> None

unregister()

注销一个工具。

def unregister(self, tool_name: str) -> None

get_tool()

获取指定名称的工具。

def get_tool(self, tool_name: str) -> AnalysisTool

list_tools()

列出所有已注册的工具名称。

def list_tools(self) -> list[str]

get_applicable_tools()

获取适用于指定数据的所有工具。

def get_applicable_tools(self, data_profile: DataProfile) -> list[AnalysisTool]

全局工具函数

register_tool()

注册工具到全局注册表。

def register_tool(tool: AnalysisTool) -> None

get_tool()

从全局注册表获取工具。

def get_tool(tool_name: str) -> AnalysisTool

list_tools()

列出全局注册表中的所有工具。

def list_tools() -> list[str]

get_applicable_tools()

获取适用于指定数据的所有工具。

def get_applicable_tools(data_profile: DataProfile) -> list[AnalysisTool]

ToolManager

工具管理器,根据数据特征动态选择工具。

方法

select_tools()

根据数据画像选择合适的工具。

def select_tools(self, data_profile: DataProfile) -> List[AnalysisTool]

参数

  • data_profile (DataProfile): 数据画像

返回值:适用的工具列表

get_missing_tools()

获取缺失的工具列表。

def get_missing_tools(self) -> List[str]

返回值:缺失的工具名称列表


数据模型

DataProfile

数据画像,包含数据的元数据和统计摘要。

字段

@dataclass
class DataProfile:
    file_path: str
    row_count: int
    column_count: int
    columns: List[ColumnInfo]
    inferred_type: str
    key_fields: Dict[str, str]
    quality_score: float
    summary: str

ColumnInfo

列信息。

字段

@dataclass
class ColumnInfo:
    name: str
    dtype: str
    missing_rate: float
    unique_count: int
    sample_values: List[Any]
    statistics: Dict[str, Any]

RequirementSpec

需求规格。

字段

@dataclass
class RequirementSpec:
    user_input: str
    objectives: List[AnalysisObjective]
    template_path: Optional[str]
    template_requirements: Optional[Dict[str, Any]]
    constraints: List[str]
    expected_outputs: List[str]

AnalysisObjective

分析目标。

字段

@dataclass
class AnalysisObjective:
    name: str
    description: str
    metrics: List[str]
    priority: int

AnalysisPlan

分析计划。

字段

@dataclass
class AnalysisPlan:
    objectives: List[AnalysisObjective]
    tasks: List[AnalysisTask]
    tool_config: Dict[str, Any]
    estimated_duration: int
    created_at: datetime
    updated_at: datetime

AnalysisTask

分析任务。

字段

@dataclass
class AnalysisTask:
    id: str
    name: str
    description: str
    priority: int
    dependencies: List[str]
    required_tools: List[str]
    expected_output: str
    status: str

AnalysisResult

分析结果。

字段

@dataclass
class AnalysisResult:
    task_id: str
    task_name: str
    success: bool
    data: Dict[str, Any]
    visualizations: List[str]
    insights: List[str]
    error: Optional[str]
    execution_time: float

错误处理 API

execute_task_with_recovery()

带恢复机制的任务执行。

def execute_task_with_recovery(
    task: AnalysisTask,
    plan: AnalysisPlan,
    execute_func: callable,
    **kwargs
) -> AnalysisResult

参数

  • task (AnalysisTask): 分析任务
  • plan (AnalysisPlan): 分析计划
  • execute_func (callable): 执行函数
  • **kwargs: 传递给执行函数的参数

返回值AnalysisResult 对象


使用示例

完整示例:自定义分析流程

from src.main import AnalysisOrchestrator
from src.config import Config, LLMConfig, OutputConfig

# 1. 配置系统
llm_config = LLMConfig(
    provider="openai",
    api_key="your_api_key",
    model="gpt-4",
    temperature=0.7
)

output_config = OutputConfig(
    output_dir="my_output",
    log_level="DEBUG"
)

config = Config(llm=llm_config, output=output_config)

# 2. 创建编排器
orchestrator = AnalysisOrchestrator(
    data_file="data.csv",
    user_requirement="分析工单健康度",
    output_dir="my_output"
)

# 3. 运行分析
result = orchestrator.run_analysis()

# 4. 处理结果
if result['success']:
    print(f"✓ 分析完成")
    print(f"  数据类型: {result['data_type']}")
    print(f"  任务数量: {result['tasks_count']}")
    print(f"  报告路径: {result['report_path']}")
    print(f"  执行时间: {result['elapsed_time']:.1f}秒")
else:
    print(f"✗ 分析失败: {result['error']}")

示例:自定义工具

from src.tools.base import AnalysisTool, register_tool
from src.models import DataProfile
import pandas as pd

class CustomAnalysisTool(AnalysisTool):
    @property
    def name(self) -> str:
        return "custom_analysis"
    
    @property
    def description(self) -> str:
        return "自定义分析工具"
    
    @property
    def parameters(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "column": {"type": "string"}
            },
            "required": ["column"]
        }
    
    def execute(self, data: pd.DataFrame, **kwargs) -> dict:
        column = kwargs['column']
        # 执行自定义分析
        result = {
            "mean": data[column].mean(),
            "median": data[column].median()
        }
        return result
    
    def is_applicable(self, data_profile: DataProfile) -> bool:
        # 检查是否有数值列
        return any(col.dtype == 'numeric' for col in data_profile.columns)

# 注册工具
register_tool(CustomAnalysisTool())

注意事项

  1. 隐私保护:所有工具的 execute() 方法必须返回聚合数据,不能返回原始行级数据
  2. 错误处理:所有 API 调用都应该包含适当的错误处理
  3. 配置验证:在使用配置前,建议调用 config.validate() 验证配置的有效性
  4. 工具注册:自定义工具必须在使用前注册到工具注册表
  5. 线程安全:当前版本不支持并发执行,max_concurrent_tasks 必须设置为 1

版本信息

  • 版本: v1.0.0
  • 日期: 2026-03-06
  • 状态: 稳定版本