# -*- coding: utf-8 -*- import os import pandas as pd import io import hashlib from pathlib import Path from typing import Optional, Iterator from config.app_config import app_config from utils.cache_manager import CacheManager # 初始化缓存管理器 data_cache = CacheManager( cache_dir=app_config.cache_dir, enabled=app_config.data_cache_enabled ) def load_and_profile_data(file_paths: list) -> str: """ 加载数据并生成数据画像 Args: file_paths: 文件路径列表 Returns: 包含数据画像的Markdown字符串 """ profile_summary = "# 数据画像报告 (Data Profile)\n\n" if not file_paths: return profile_summary + "未提供数据文件。" for file_path in file_paths: file_name = os.path.basename(file_path) profile_summary += f"## 文件: {file_name}\n\n" if not os.path.exists(file_path): profile_summary += f"[WARN] 文件不存在: {file_path}\n\n" continue try: # 根据扩展名选择加载方式 ext = os.path.splitext(file_path)[1].lower() if ext == '.csv': # 尝试多种编码 try: df = pd.read_csv(file_path, encoding='utf-8') except UnicodeDecodeError: try: df = pd.read_csv(file_path, encoding='gbk') except Exception: df = pd.read_csv(file_path, encoding='latin1') elif ext in ['.xlsx', '.xls']: df = pd.read_excel(file_path) else: profile_summary += f"[WARN] 不支持的文件格式: {ext}\n\n" continue # 基础信息 rows, cols = df.shape profile_summary += f"- **维度**: {rows} 行 x {cols} 列\n" profile_summary += f"- **列名**: `{', '.join(df.columns)}`\n\n" profile_summary += "### 列详细分布:\n" # 遍历分析每列 for col in df.columns: dtype = df[col].dtype null_count = df[col].isnull().sum() null_ratio = (null_count / rows) * 100 profile_summary += f"#### {col} ({dtype})\n" if null_count > 0: profile_summary += f"- [WARN] 空值: {null_count} ({null_ratio:.1f}%)\n" # 数值列分析 if pd.api.types.is_numeric_dtype(dtype): desc = df[col].describe() profile_summary += f"- 统计: Min={desc['min']:.2f}, Max={desc['max']:.2f}, Mean={desc['mean']:.2f}\n" # 文本/分类列分析 elif pd.api.types.is_object_dtype(dtype) or pd.api.types.is_categorical_dtype(dtype): unique_count = df[col].nunique() profile_summary += f"- 唯一值数量: {unique_count}\n" # 如果唯一值较少(<50)或者看起来是分类数据,显示Top分布 # 这对识别“高频问题”至关重要 if unique_count > 0: top_n = df[col].value_counts().head(5) top_items_str = ", ".join([f"{k}({v})" for k, v in top_n.items()]) profile_summary += f"- **TOP 5 高频值**: {top_items_str}\n" # 时间列分析 elif pd.api.types.is_datetime64_any_dtype(dtype): profile_summary += f"- 范围: {df[col].min()} 至 {df[col].max()}\n" profile_summary += "\n" except Exception as e: profile_summary += f"[ERROR] 读取或分析文件失败: {str(e)}\n\n" return profile_summary def get_file_hash(file_path: str) -> str: """计算文件哈希值,用于缓存键""" hasher = hashlib.md5() hasher.update(file_path.encode()) # 添加文件修改时间 if os.path.exists(file_path): mtime = os.path.getmtime(file_path) hasher.update(str(mtime).encode()) return hasher.hexdigest() def load_data_chunked(file_path: str, chunksize: Optional[int] = None) -> Iterator[pd.DataFrame]: """ 流式读取大文件,分块返回DataFrame Args: file_path: 文件路径 chunksize: 每块行数,默认使用配置值 Yields: DataFrame块 """ if chunksize is None: chunksize = app_config.chunk_size ext = os.path.splitext(file_path)[1].lower() if ext == '.csv': # 尝试多种编码 for encoding in ['utf-8', 'gbk', 'latin1']: try: chunks = pd.read_csv(file_path, encoding=encoding, chunksize=chunksize) for chunk in chunks: yield chunk break except UnicodeDecodeError: continue except Exception as e: print(f"[ERROR] 读取CSV文件失败: {e}") break elif ext in ['.xlsx', '.xls']: # Excel文件不支持chunksize,直接读取 try: df = pd.read_excel(file_path) # 手动分块 for i in range(0, len(df), chunksize): yield df.iloc[i:i+chunksize] except Exception as e: print(f"[ERROR] 读取Excel文件失败: {e}") def load_data_with_cache(file_path: str, force_reload: bool = False) -> Optional[pd.DataFrame]: """ 带缓存的数据加载 Args: file_path: 文件路径 force_reload: 是否强制重新加载 Returns: DataFrame或None """ if not os.path.exists(file_path): print(f"[WARN] 文件不存在: {file_path}") return None # 检查文件大小 file_size_mb = os.path.getsize(file_path) / (1024 * 1024) # 对于大文件,建议使用流式处理 if file_size_mb > app_config.max_file_size_mb: print(f"[WARN] 文件过大 ({file_size_mb:.1f}MB),建议使用 load_data_chunked() 流式处理") # 生成缓存键 cache_key = get_file_hash(file_path) # 尝试从缓存加载 if not force_reload and app_config.data_cache_enabled: cached_data = data_cache.get(cache_key) if cached_data is not None: print(f"[CACHE] 从缓存加载数据: {os.path.basename(file_path)}") return cached_data # 加载数据 ext = os.path.splitext(file_path)[1].lower() df = None try: if ext == '.csv': # 尝试多种编码 for encoding in ['utf-8', 'gbk', 'latin1']: try: df = pd.read_csv(file_path, encoding=encoding) break except UnicodeDecodeError: continue elif ext in ['.xlsx', '.xls']: df = pd.read_excel(file_path) else: print(f"[WARN] 不支持的文件格式: {ext}") return None # 缓存数据 if df is not None and app_config.data_cache_enabled: data_cache.set(cache_key, df) print(f"[OK] 数据已缓存: {os.path.basename(file_path)}") return df except Exception as e: print(f"[ERROR] 加载数据失败: {e}") return None