# -*- coding: utf-8 -*- """ 数据合并模块 合并同类 Excel/CSV 文件 """ import os import glob import pandas as pd from typing import Optional, List from .config import default_config def merge_files( source_dir: str, output_file: Optional[str] = None, pattern: str = "*.xlsx", time_column: Optional[str] = None, add_source_column: bool = True ) -> str: """ 合并目录下的所有同类文件 Args: source_dir: 源数据目录 output_file: 输出 CSV 文件路径。如果为 None,则输出到 cleaned_data 目录 pattern: 文件匹配模式 (e.g., "*.xlsx", "*.csv", "*.xls") time_column: 可选,合并后按此列排序 add_source_column: 是否添加来源文件列 Returns: 输出文件的绝对路径 Raises: FileNotFoundError: 目录不存在或未找到匹配文件 """ if not os.path.isdir(source_dir): raise FileNotFoundError(f"目录不存在: {source_dir}") print(f"[SCAN] 正在扫描目录: {source_dir}") print(f" 匹配模式: {pattern}") # 查找匹配文件 files = glob.glob(os.path.join(source_dir, pattern)) # 如果是 xlsx,也尝试匹配 xls if pattern == "*.xlsx": files.extend(glob.glob(os.path.join(source_dir, "*.xls"))) if not files: raise FileNotFoundError(f"未找到匹配 '{pattern}' 的文件") # 排序文件列表 files = _sort_files(files) print(f"[FOUND] 找到 {len(files)} 个文件") # 确定输出路径 if output_file is None: default_config.ensure_dirs() dir_name = os.path.basename(os.path.normpath(source_dir)) output_file = os.path.join( default_config.cleaned_data_dir, f"{dir_name}_merged.csv" ) # 合并数据 all_dfs = [] for file in files: try: df = _read_file(file) if df is not None and not df.empty: if add_source_column: df['_source_file'] = os.path.basename(file) all_dfs.append(df) except Exception as e: print(f"[ERROR] 读取失败 {file}: {e}") if not all_dfs: raise ValueError("没有成功读取到任何数据") print(f"[MERGE] 正在合并 {len(all_dfs)} 个数据源...") merged_df = pd.concat(all_dfs, ignore_index=True) print(f" 合并后总行数: {len(merged_df)}") # 可选:按时间排序 if time_column and time_column in merged_df.columns: print(f"[SORT] 正在按 '{time_column}' 排序...") merged_df[time_column] = pd.to_datetime(merged_df[time_column], errors='coerce') merged_df = merged_df.sort_values(by=time_column, na_position='last') elif time_column: print(f"[WARN] 未找到时间列 '{time_column}',跳过排序") # 保存结果 print(f"[SAVE] 正在保存: {output_file}") merged_df.to_csv(output_file, index=False, encoding=default_config.csv_encoding) abs_output = os.path.abspath(output_file) print(f"[OK] 合并完成!") print(f" 输出文件: {abs_output}") print(f" 总行数: {len(merged_df)}") return abs_output def _sort_files(files: List[str]) -> List[str]: """对文件列表进行智能排序""" try: # 尝试按文件名中的数字排序 files.sort(key=lambda x: int(os.path.basename(x).split('.')[0])) print("[SORT] 已按文件名数字顺序排序") except ValueError: # 退回到字母排序 files.sort() print("[SORT] 已按文件名字母顺序排序") return files def _read_file(file_path: str) -> Optional[pd.DataFrame]: """读取单个文件(支持 CSV 和 Excel)""" ext = os.path.splitext(file_path)[1].lower() print(f"[READ] 读取: {os.path.basename(file_path)}") if ext == '.csv': df = pd.read_csv(file_path, low_memory=False) print(f" 行数: {len(df)}") return df elif ext in ('.xlsx', '.xls'): # 读取 Excel 所有 sheet 并合并 xls = pd.ExcelFile(file_path) print(f" Sheets: {xls.sheet_names}") sheet_dfs = [] for sheet_name in xls.sheet_names: df = pd.read_excel(xls, sheet_name=sheet_name) if not df.empty: print(f" - Sheet '{sheet_name}': {len(df)} 行") sheet_dfs.append(df) if sheet_dfs: return pd.concat(sheet_dfs, ignore_index=True) return None else: print(f"[WARN] 不支持的文件格式: {ext}") return None