Files

218 lines
7.4 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
import os
import pandas as pd
import io
import hashlib
from pathlib import Path
from typing import Optional, Iterator
from config.app_config import app_config
from utils.cache_manager import CacheManager
# 初始化缓存管理器
data_cache = CacheManager(
cache_dir=app_config.cache_dir,
enabled=app_config.data_cache_enabled
)
def load_and_profile_data(file_paths: list) -> str:
"""
加载数据并生成数据画像
Args:
file_paths: 文件路径列表
Returns:
包含数据画像的Markdown字符串
"""
profile_summary = "# 数据画像报告 (Data Profile)\n\n"
if not file_paths:
return profile_summary + "未提供数据文件。"
for file_path in file_paths:
file_name = os.path.basename(file_path)
profile_summary += f"## 文件: {file_name}\n\n"
if not os.path.exists(file_path):
2026-01-31 18:00:05 +08:00
profile_summary += f"[WARN] 文件不存在: {file_path}\n\n"
continue
try:
# 根据扩展名选择加载方式
ext = os.path.splitext(file_path)[1].lower()
if ext == '.csv':
# 尝试多种编码
try:
df = pd.read_csv(file_path, encoding='utf-8')
except UnicodeDecodeError:
try:
df = pd.read_csv(file_path, encoding='gbk')
except Exception:
df = pd.read_csv(file_path, encoding='latin1')
elif ext in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
else:
2026-01-31 18:00:05 +08:00
profile_summary += f"[WARN] 不支持的文件格式: {ext}\n\n"
continue
# 基础信息
rows, cols = df.shape
profile_summary += f"- **维度**: {rows} 行 x {cols}\n"
profile_summary += f"- **列名**: `{', '.join(df.columns)}`\n\n"
profile_summary += "### 列详细分布:\n"
# 遍历分析每列
for col in df.columns:
dtype = df[col].dtype
null_count = df[col].isnull().sum()
null_ratio = (null_count / rows) * 100
profile_summary += f"#### {col} ({dtype})\n"
if null_count > 0:
2026-01-31 18:00:05 +08:00
profile_summary += f"- [WARN] 空值: {null_count} ({null_ratio:.1f}%)\n"
# 数值列分析
if pd.api.types.is_numeric_dtype(dtype):
desc = df[col].describe()
profile_summary += f"- 统计: Min={desc['min']:.2f}, Max={desc['max']:.2f}, Mean={desc['mean']:.2f}\n"
# 文本/分类列分析
elif pd.api.types.is_object_dtype(dtype) or pd.api.types.is_categorical_dtype(dtype):
unique_count = df[col].nunique()
profile_summary += f"- 唯一值数量: {unique_count}\n"
# 如果唯一值较少(<50或者看起来是分类数据显示Top分布
# 这对识别“高频问题”至关重要
if unique_count > 0:
top_n = df[col].value_counts().head(5)
top_items_str = ", ".join([f"{k}({v})" for k, v in top_n.items()])
profile_summary += f"- **TOP 5 高频值**: {top_items_str}\n"
# 时间列分析
elif pd.api.types.is_datetime64_any_dtype(dtype):
profile_summary += f"- 范围: {df[col].min()}{df[col].max()}\n"
profile_summary += "\n"
except Exception as e:
2026-01-31 18:00:05 +08:00
profile_summary += f"[ERROR] 读取或分析文件失败: {str(e)}\n\n"
return profile_summary
def get_file_hash(file_path: str) -> str:
"""计算文件哈希值,用于缓存键"""
hasher = hashlib.md5()
hasher.update(file_path.encode())
# 添加文件修改时间
if os.path.exists(file_path):
mtime = os.path.getmtime(file_path)
hasher.update(str(mtime).encode())
return hasher.hexdigest()
def load_data_chunked(file_path: str, chunksize: Optional[int] = None) -> Iterator[pd.DataFrame]:
"""
流式读取大文件分块返回DataFrame
Args:
file_path: 文件路径
chunksize: 每块行数默认使用配置值
Yields:
DataFrame块
"""
if chunksize is None:
chunksize = app_config.chunk_size
ext = os.path.splitext(file_path)[1].lower()
if ext == '.csv':
# 尝试多种编码
for encoding in ['utf-8', 'gbk', 'latin1']:
try:
chunks = pd.read_csv(file_path, encoding=encoding, chunksize=chunksize)
for chunk in chunks:
yield chunk
break
except UnicodeDecodeError:
continue
except Exception as e:
2026-01-31 18:00:05 +08:00
print(f"[ERROR] 读取CSV文件失败: {e}")
break
elif ext in ['.xlsx', '.xls']:
# Excel文件不支持chunksize直接读取
try:
df = pd.read_excel(file_path)
# 手动分块
for i in range(0, len(df), chunksize):
yield df.iloc[i:i+chunksize]
except Exception as e:
2026-01-31 18:00:05 +08:00
print(f"[ERROR] 读取Excel文件失败: {e}")
def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional[pd.DataFrame]:
"""
带缓存的数据加载
Args:
file_path: 文件路径
force_reload: 是否强制重新加载
Returns:
DataFrame或None
"""
if not os.path.exists(file_path):
2026-01-31 18:00:05 +08:00
print(f"[WARN] 文件不存在: {file_path}")
return None
# 检查文件大小
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
# 对于大文件,建议使用流式处理
if file_size_mb > app_config.max_file_size_mb:
2026-01-31 18:00:05 +08:00
print(f"[WARN] 文件过大 ({file_size_mb:.1f}MB),建议使用 load_data_chunked() 流式处理")
# 生成缓存键
cache_key = get_file_hash(file_path)
# 尝试从缓存加载
if not force_reload and app_config.data_cache_enabled:
cached_data = data_cache.get(cache_key)
if cached_data is not None:
2026-01-31 18:00:05 +08:00
print(f"[CACHE] 从缓存加载数据: {os.path.basename(file_path)}")
return cached_data
# 加载数据
ext = os.path.splitext(file_path)[1].lower()
df = None
try:
if ext == '.csv':
# 尝试多种编码
for encoding in ['utf-8', 'gbk', 'latin1']:
try:
df = pd.read_csv(file_path, encoding=encoding)
break
except UnicodeDecodeError:
continue
elif ext in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
else:
2026-01-31 18:00:05 +08:00
print(f"[WARN] 不支持的文件格式: {ext}")
return None
# 缓存数据
if df is not None and app_config.data_cache_enabled:
data_cache.set(cache_key, df)
2026-01-31 18:00:05 +08:00
print(f"[OK] 数据已缓存: {os.path.basename(file_path)}")
return df
except Exception as e:
2026-01-31 18:00:05 +08:00
print(f"[ERROR] 加载数据失败: {e}")
return None