17 KiB
API 文档
本文档描述了 AI 数据分析 Agent 系统的核心 API 接口。
目录
主流程 API
run_analysis()
运行完整的数据分析流程。
函数签名:
def run_analysis(
data_file: str,
user_requirement: Optional[str] = None,
template_file: Optional[str] = None,
output_dir: str = "output",
progress_callback: Optional[callable] = None
) -> Dict[str, Any]
参数:
data_file(str): 数据文件路径(CSV 格式)user_requirement(Optional[str]): 用户需求(自然语言),如果为 None 则自动推断template_file(Optional[str]): 模板文件路径(可选)output_dir(str): 输出目录,默认为 "output"progress_callback(Optional[callable]): 进度回调函数,接收 (stage, current, total) 参数
返回值:
{
'success': bool, # 是否成功
'data_type': str, # 数据类型
'objectives_count': int, # 分析目标数量
'tasks_count': int, # 任务数量
'results_count': int, # 结果数量
'report_path': str, # 报告路径
'elapsed_time': float, # 执行时间(秒)
'error': str # 错误信息(如果失败)
}
示例:
from src.main import run_analysis
# 基本使用
result = run_analysis(
data_file="data.csv",
user_requirement="分析工单健康度"
)
if result['success']:
print(f"报告路径: {result['report_path']}")
print(f"执行时间: {result['elapsed_time']:.1f}秒")
else:
print(f"分析失败: {result['error']}")
# 使用进度回调
def progress_handler(stage, current, total):
print(f"[{current}/{total}] {stage}")
result = run_analysis(
data_file="data.csv",
progress_callback=progress_handler
)
AnalysisOrchestrator
分析编排器类,协调五个阶段的执行。
类签名:
class AnalysisOrchestrator:
def __init__(
self,
data_file: str,
user_requirement: Optional[str] = None,
template_file: Optional[str] = None,
output_dir: Optional[str] = None,
progress_callback: Optional[callable] = None
)
方法:
run_analysis()
运行完整的分析流程。
返回值:与 run_analysis() 函数相同
示例:
from src.main import AnalysisOrchestrator
orchestrator = AnalysisOrchestrator(
data_file="data.csv",
user_requirement="分析工单健康度",
output_dir="output"
)
result = orchestrator.run_analysis()
配置管理 API
Config
系统配置类。
类签名:
@dataclass
class Config:
llm: LLMConfig
performance: PerformanceConfig
output: OutputConfig
code_repo_enable_reuse: bool = True
类方法:
from_env()
从环境变量加载配置。
@classmethod
def from_env(cls) -> "Config"
示例:
from src.config import Config
config = Config.from_env()
print(f"模型: {config.llm.model}")
print(f"输出目录: {config.output.output_dir}")
from_file()
从配置文件加载配置。
@classmethod
def from_file(cls, config_file: str) -> "Config"
参数:
config_file(str): 配置文件路径(JSON 格式)
示例:
config = Config.from_file("config.json")
from_dict()
从字典加载配置。
@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> "Config"
参数:
config_dict(Dict[str, Any]): 配置字典
to_dict()
转换为字典。
def to_dict(self) -> Dict[str, Any]
save_to_file()
保存配置到文件。
def save_to_file(self, config_file: str)
validate()
验证配置的有效性。
def validate(self) -> bool
LLMConfig
LLM API 配置。
类签名:
@dataclass
class LLMConfig:
provider: str = "openai"
api_key: str = ""
base_url: str = "https://api.openai.com/v1"
model: str = "gpt-4"
timeout: int = 120
max_retries: int = 3
temperature: float = 0.7
max_tokens: Optional[int] = None
PerformanceConfig
性能参数配置。
类签名:
@dataclass
class PerformanceConfig:
agent_max_rounds: int = 20
agent_timeout: int = 300
tool_max_query_rows: int = 10000
tool_execution_timeout: int = 60
data_max_rows: int = 1000000
data_sample_threshold: int = 1000000
max_concurrent_tasks: int = 1
OutputConfig
输出路径配置。
类签名:
@dataclass
class OutputConfig:
output_dir: str = "output"
log_dir: Optional[str] = None
chart_dir: Optional[str] = None
report_filename: str = "analysis_report.md"
log_level: str = "INFO"
log_to_file: bool = True
log_to_console: bool = True
方法:
get_output_path() -> Path: 获取输出目录路径get_log_path() -> Path: 获取日志目录路径get_chart_path() -> Path: 获取图表目录路径get_report_path() -> Path: 获取报告文件路径
全局配置函数
get_config()
获取全局配置实例。
def get_config() -> Config
set_config()
设置全局配置实例。
def set_config(config: Config)
load_config_from_env()
从环境变量加载配置并设置为全局配置。
def load_config_from_env() -> Config
load_config_from_file()
从文件加载配置并设置为全局配置。
def load_config_from_file(config_file: str) -> Config
数据访问 API
DataAccessLayer
数据访问层,提供数据加载和隐私保护机制。
类方法:
load_from_file()
从文件加载数据。
@classmethod
def load_from_file(cls, file_path: str) -> "DataAccessLayer"
参数:
file_path(str): 数据文件路径
返回值:DataAccessLayer 实例
示例:
from src.data_access import DataAccessLayer
data_access = DataAccessLayer.load_from_file("data.csv")
print(f"数据形状: {data_access.shape}")
实例方法:
get_profile()
获取数据画像(不包含原始数据)。
def get_profile(self) -> DataProfile
execute_tool()
执行工具并返回聚合结果。
def execute_tool(self, tool: AnalysisTool, **kwargs) -> Dict[str, Any]
参数:
tool(AnalysisTool): 工具实例**kwargs: 工具参数
返回值:聚合后的结果字典
分析引擎 API
数据理解引擎
understand_data()
AI 驱动的数据理解。
def understand_data(data_access: DataAccessLayer) -> DataProfile
参数:
data_access(DataAccessLayer): 数据访问层实例
返回值:DataProfile 对象
示例:
from src.engines import understand_data
from src.data_access import DataAccessLayer
data_access = DataAccessLayer.load_from_file("data.csv")
profile = understand_data(data_access)
print(f"数据类型: {profile.inferred_type}")
print(f"质量分数: {profile.quality_score}")
需求理解引擎
understand_requirement()
AI 驱动的需求理解。
def understand_requirement(
user_input: str,
data_profile: DataProfile,
template_path: Optional[str] = None
) -> RequirementSpec
参数:
user_input(str): 用户需求(自然语言)data_profile(DataProfile): 数据画像template_path(Optional[str]): 模板文件路径
返回值:RequirementSpec 对象
分析规划引擎
plan_analysis()
AI 驱动的分析规划。
def plan_analysis(
data_profile: DataProfile,
requirement: RequirementSpec
) -> AnalysisPlan
参数:
data_profile(DataProfile): 数据画像requirement(RequirementSpec): 需求规格
返回值:AnalysisPlan 对象
任务执行引擎
execute_task()
使用 ReAct 模式执行任务。
def execute_task(
task: AnalysisTask,
tools: List[AnalysisTool],
data_access: DataAccessLayer
) -> AnalysisResult
参数:
task(AnalysisTask): 分析任务tools(List[AnalysisTool]): 可用工具列表data_access(DataAccessLayer): 数据访问层
返回值:AnalysisResult 对象
计划调整引擎
adjust_plan()
根据中间结果动态调整计划。
def adjust_plan(
plan: AnalysisPlan,
completed_results: List[AnalysisResult]
) -> AnalysisPlan
参数:
plan(AnalysisPlan): 当前分析计划completed_results(List[AnalysisResult]): 已完成的分析结果
返回值:调整后的 AnalysisPlan 对象
报告生成引擎
generate_report()
AI 驱动的报告生成。
def generate_report(
results: List[AnalysisResult],
requirement: RequirementSpec,
data_profile: DataProfile,
output_path: str
) -> str
参数:
results(List[AnalysisResult]): 分析结果列表requirement(RequirementSpec): 需求规格data_profile(DataProfile): 数据画像output_path(str): 输出路径
返回值:Markdown 格式的报告内容
工具系统 API
AnalysisTool
分析工具的抽象基类。
抽象属性:
name
工具名称。
@property
@abstractmethod
def name(self) -> str
description
工具描述(供 AI 理解)。
@property
@abstractmethod
def description(self) -> str
parameters
参数定义(JSON Schema 格式)。
@property
@abstractmethod
def parameters(self) -> Dict[str, Any]
抽象方法:
execute()
执行工具。
@abstractmethod
def execute(self, data: pd.DataFrame, **kwargs) -> Dict[str, Any]
参数:
data(pd.DataFrame): 原始数据**kwargs: 工具参数
返回值:聚合后的结果字典
is_applicable()
判断工具是否适用于当前数据。
@abstractmethod
def is_applicable(self, data_profile: DataProfile) -> bool
参数:
data_profile(DataProfile): 数据画像
返回值:True 如果工具适用,False 否则
方法:
validate_parameters()
验证参数是否有效。
def validate_parameters(self, **kwargs) -> bool
ToolRegistry
工具注册表,管理所有可用的工具。
方法:
register()
注册一个工具。
def register(self, tool: AnalysisTool) -> None
unregister()
注销一个工具。
def unregister(self, tool_name: str) -> None
get_tool()
获取指定名称的工具。
def get_tool(self, tool_name: str) -> AnalysisTool
list_tools()
列出所有已注册的工具名称。
def list_tools(self) -> list[str]
get_applicable_tools()
获取适用于指定数据的所有工具。
def get_applicable_tools(self, data_profile: DataProfile) -> list[AnalysisTool]
全局工具函数
register_tool()
注册工具到全局注册表。
def register_tool(tool: AnalysisTool) -> None
get_tool()
从全局注册表获取工具。
def get_tool(tool_name: str) -> AnalysisTool
list_tools()
列出全局注册表中的所有工具。
def list_tools() -> list[str]
get_applicable_tools()
获取适用于指定数据的所有工具。
def get_applicable_tools(data_profile: DataProfile) -> list[AnalysisTool]
ToolManager
工具管理器,根据数据特征动态选择工具。
方法:
select_tools()
根据数据画像选择合适的工具。
def select_tools(self, data_profile: DataProfile) -> List[AnalysisTool]
参数:
data_profile(DataProfile): 数据画像
返回值:适用的工具列表
get_missing_tools()
获取缺失的工具列表。
def get_missing_tools(self) -> List[str]
返回值:缺失的工具名称列表
数据模型
DataProfile
数据画像,包含数据的元数据和统计摘要。
字段:
@dataclass
class DataProfile:
file_path: str
row_count: int
column_count: int
columns: List[ColumnInfo]
inferred_type: str
key_fields: Dict[str, str]
quality_score: float
summary: str
ColumnInfo
列信息。
字段:
@dataclass
class ColumnInfo:
name: str
dtype: str
missing_rate: float
unique_count: int
sample_values: List[Any]
statistics: Dict[str, Any]
RequirementSpec
需求规格。
字段:
@dataclass
class RequirementSpec:
user_input: str
objectives: List[AnalysisObjective]
template_path: Optional[str]
template_requirements: Optional[Dict[str, Any]]
constraints: List[str]
expected_outputs: List[str]
AnalysisObjective
分析目标。
字段:
@dataclass
class AnalysisObjective:
name: str
description: str
metrics: List[str]
priority: int
AnalysisPlan
分析计划。
字段:
@dataclass
class AnalysisPlan:
objectives: List[AnalysisObjective]
tasks: List[AnalysisTask]
tool_config: Dict[str, Any]
estimated_duration: int
created_at: datetime
updated_at: datetime
AnalysisTask
分析任务。
字段:
@dataclass
class AnalysisTask:
id: str
name: str
description: str
priority: int
dependencies: List[str]
required_tools: List[str]
expected_output: str
status: str
AnalysisResult
分析结果。
字段:
@dataclass
class AnalysisResult:
task_id: str
task_name: str
success: bool
data: Dict[str, Any]
visualizations: List[str]
insights: List[str]
error: Optional[str]
execution_time: float
错误处理 API
execute_task_with_recovery()
带恢复机制的任务执行。
def execute_task_with_recovery(
task: AnalysisTask,
plan: AnalysisPlan,
execute_func: callable,
**kwargs
) -> AnalysisResult
参数:
task(AnalysisTask): 分析任务plan(AnalysisPlan): 分析计划execute_func(callable): 执行函数**kwargs: 传递给执行函数的参数
返回值:AnalysisResult 对象
使用示例
完整示例:自定义分析流程
from src.main import AnalysisOrchestrator
from src.config import Config, LLMConfig, OutputConfig
# 1. 配置系统
llm_config = LLMConfig(
provider="openai",
api_key="your_api_key",
model="gpt-4",
temperature=0.7
)
output_config = OutputConfig(
output_dir="my_output",
log_level="DEBUG"
)
config = Config(llm=llm_config, output=output_config)
# 2. 创建编排器
orchestrator = AnalysisOrchestrator(
data_file="data.csv",
user_requirement="分析工单健康度",
output_dir="my_output"
)
# 3. 运行分析
result = orchestrator.run_analysis()
# 4. 处理结果
if result['success']:
print(f"✓ 分析完成")
print(f" 数据类型: {result['data_type']}")
print(f" 任务数量: {result['tasks_count']}")
print(f" 报告路径: {result['report_path']}")
print(f" 执行时间: {result['elapsed_time']:.1f}秒")
else:
print(f"✗ 分析失败: {result['error']}")
示例:自定义工具
from src.tools.base import AnalysisTool, register_tool
from src.models import DataProfile
import pandas as pd
class CustomAnalysisTool(AnalysisTool):
@property
def name(self) -> str:
return "custom_analysis"
@property
def description(self) -> str:
return "自定义分析工具"
@property
def parameters(self) -> dict:
return {
"type": "object",
"properties": {
"column": {"type": "string"}
},
"required": ["column"]
}
def execute(self, data: pd.DataFrame, **kwargs) -> dict:
column = kwargs['column']
# 执行自定义分析
result = {
"mean": data[column].mean(),
"median": data[column].median()
}
return result
def is_applicable(self, data_profile: DataProfile) -> bool:
# 检查是否有数值列
return any(col.dtype == 'numeric' for col in data_profile.columns)
# 注册工具
register_tool(CustomAnalysisTool())
注意事项
- 隐私保护:所有工具的
execute()方法必须返回聚合数据,不能返回原始行级数据 - 错误处理:所有 API 调用都应该包含适当的错误处理
- 配置验证:在使用配置前,建议调用
config.validate()验证配置的有效性 - 工具注册:自定义工具必须在使用前注册到工具注册表
- 线程安全:当前版本不支持并发执行,
max_concurrent_tasks必须设置为 1
版本信息
- 版本: v1.0.0
- 日期: 2026-03-06
- 状态: 稳定版本