Files
iov_data_analysis_agent/utils/data_loader.py
2026-01-31 18:00:05 +08:00

218 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import os
import pandas as pd
import io
import hashlib
from pathlib import Path
from typing import Optional, Iterator
from config.app_config import app_config
from utils.cache_manager import CacheManager
# 初始化缓存管理器
data_cache = CacheManager(
cache_dir=app_config.cache_dir,
enabled=app_config.data_cache_enabled
)
def load_and_profile_data(file_paths: list) -> str:
"""
加载数据并生成数据画像
Args:
file_paths: 文件路径列表
Returns:
包含数据画像的Markdown字符串
"""
profile_summary = "# 数据画像报告 (Data Profile)\n\n"
if not file_paths:
return profile_summary + "未提供数据文件。"
for file_path in file_paths:
file_name = os.path.basename(file_path)
profile_summary += f"## 文件: {file_name}\n\n"
if not os.path.exists(file_path):
profile_summary += f"[WARN] 文件不存在: {file_path}\n\n"
continue
try:
# 根据扩展名选择加载方式
ext = os.path.splitext(file_path)[1].lower()
if ext == '.csv':
# 尝试多种编码
try:
df = pd.read_csv(file_path, encoding='utf-8')
except UnicodeDecodeError:
try:
df = pd.read_csv(file_path, encoding='gbk')
except Exception:
df = pd.read_csv(file_path, encoding='latin1')
elif ext in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
else:
profile_summary += f"[WARN] 不支持的文件格式: {ext}\n\n"
continue
# 基础信息
rows, cols = df.shape
profile_summary += f"- **维度**: {rows} 行 x {cols}\n"
profile_summary += f"- **列名**: `{', '.join(df.columns)}`\n\n"
profile_summary += "### 列详细分布:\n"
# 遍历分析每列
for col in df.columns:
dtype = df[col].dtype
null_count = df[col].isnull().sum()
null_ratio = (null_count / rows) * 100
profile_summary += f"#### {col} ({dtype})\n"
if null_count > 0:
profile_summary += f"- [WARN] 空值: {null_count} ({null_ratio:.1f}%)\n"
# 数值列分析
if pd.api.types.is_numeric_dtype(dtype):
desc = df[col].describe()
profile_summary += f"- 统计: Min={desc['min']:.2f}, Max={desc['max']:.2f}, Mean={desc['mean']:.2f}\n"
# 文本/分类列分析
elif pd.api.types.is_object_dtype(dtype) or pd.api.types.is_categorical_dtype(dtype):
unique_count = df[col].nunique()
profile_summary += f"- 唯一值数量: {unique_count}\n"
# 如果唯一值较少(<50或者看起来是分类数据显示Top分布
# 这对识别“高频问题”至关重要
if unique_count > 0:
top_n = df[col].value_counts().head(5)
top_items_str = ", ".join([f"{k}({v})" for k, v in top_n.items()])
profile_summary += f"- **TOP 5 高频值**: {top_items_str}\n"
# 时间列分析
elif pd.api.types.is_datetime64_any_dtype(dtype):
profile_summary += f"- 范围: {df[col].min()}{df[col].max()}\n"
profile_summary += "\n"
except Exception as e:
profile_summary += f"[ERROR] 读取或分析文件失败: {str(e)}\n\n"
return profile_summary
def get_file_hash(file_path: str) -> str:
"""计算文件哈希值,用于缓存键"""
hasher = hashlib.md5()
hasher.update(file_path.encode())
# 添加文件修改时间
if os.path.exists(file_path):
mtime = os.path.getmtime(file_path)
hasher.update(str(mtime).encode())
return hasher.hexdigest()
def load_data_chunked(file_path: str, chunksize: Optional[int] = None) -> Iterator[pd.DataFrame]:
"""
流式读取大文件分块返回DataFrame
Args:
file_path: 文件路径
chunksize: 每块行数,默认使用配置值
Yields:
DataFrame块
"""
if chunksize is None:
chunksize = app_config.chunk_size
ext = os.path.splitext(file_path)[1].lower()
if ext == '.csv':
# 尝试多种编码
for encoding in ['utf-8', 'gbk', 'latin1']:
try:
chunks = pd.read_csv(file_path, encoding=encoding, chunksize=chunksize)
for chunk in chunks:
yield chunk
break
except UnicodeDecodeError:
continue
except Exception as e:
print(f"[ERROR] 读取CSV文件失败: {e}")
break
elif ext in ['.xlsx', '.xls']:
# Excel文件不支持chunksize直接读取
try:
df = pd.read_excel(file_path)
# 手动分块
for i in range(0, len(df), chunksize):
yield df.iloc[i:i+chunksize]
except Exception as e:
print(f"[ERROR] 读取Excel文件失败: {e}")
def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional[pd.DataFrame]:
"""
带缓存的数据加载
Args:
file_path: 文件路径
force_reload: 是否强制重新加载
Returns:
DataFrame或None
"""
if not os.path.exists(file_path):
print(f"[WARN] 文件不存在: {file_path}")
return None
# 检查文件大小
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
# 对于大文件,建议使用流式处理
if file_size_mb > app_config.max_file_size_mb:
print(f"[WARN] 文件过大 ({file_size_mb:.1f}MB),建议使用 load_data_chunked() 流式处理")
# 生成缓存键
cache_key = get_file_hash(file_path)
# 尝试从缓存加载
if not force_reload and app_config.data_cache_enabled:
cached_data = data_cache.get(cache_key)
if cached_data is not None:
print(f"[CACHE] 从缓存加载数据: {os.path.basename(file_path)}")
return cached_data
# 加载数据
ext = os.path.splitext(file_path)[1].lower()
df = None
try:
if ext == '.csv':
# 尝试多种编码
for encoding in ['utf-8', 'gbk', 'latin1']:
try:
df = pd.read_csv(file_path, encoding=encoding)
break
except UnicodeDecodeError:
continue
elif ext in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
else:
print(f"[WARN] 不支持的文件格式: {ext}")
return None
# 缓存数据
if df is not None and app_config.data_cache_enabled:
data_cache.set(cache_key, df)
print(f"[OK] 数据已缓存: {os.path.basename(file_path)}")
return df
except Exception as e:
print(f"[ERROR] 加载数据失败: {e}")
return None