Files
2026-02-02 09:44:07 +08:00

149 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
数据合并模块
合并同类 Excel/CSV 文件
"""
import os
import glob
import pandas as pd
from typing import Optional, List
from .config import default_config
def merge_files(
source_dir: str,
output_file: Optional[str] = None,
pattern: str = "*.xlsx",
time_column: Optional[str] = None,
add_source_column: bool = True
) -> str:
"""
合并目录下的所有同类文件
Args:
source_dir: 源数据目录
output_file: 输出 CSV 文件路径。如果为 None则输出到 cleaned_data 目录
pattern: 文件匹配模式 (e.g., "*.xlsx", "*.csv", "*.xls")
time_column: 可选,合并后按此列排序
add_source_column: 是否添加来源文件列
Returns:
输出文件的绝对路径
Raises:
FileNotFoundError: 目录不存在或未找到匹配文件
"""
if not os.path.isdir(source_dir):
raise FileNotFoundError(f"目录不存在: {source_dir}")
print(f"[SCAN] 正在扫描目录: {source_dir}")
print(f" 匹配模式: {pattern}")
# 查找匹配文件
files = glob.glob(os.path.join(source_dir, pattern))
# 如果是 xlsx也尝试匹配 xls
if pattern == "*.xlsx":
files.extend(glob.glob(os.path.join(source_dir, "*.xls")))
if not files:
raise FileNotFoundError(f"未找到匹配 '{pattern}' 的文件")
# 排序文件列表
files = _sort_files(files)
print(f"[FOUND] 找到 {len(files)} 个文件")
# 确定输出路径
if output_file is None:
default_config.ensure_dirs()
dir_name = os.path.basename(os.path.normpath(source_dir))
output_file = os.path.join(
default_config.cleaned_data_dir,
f"{dir_name}_merged.csv"
)
# 合并数据
all_dfs = []
for file in files:
try:
df = _read_file(file)
if df is not None and not df.empty:
if add_source_column:
df['_source_file'] = os.path.basename(file)
all_dfs.append(df)
except Exception as e:
print(f"[ERROR] 读取失败 {file}: {e}")
if not all_dfs:
raise ValueError("没有成功读取到任何数据")
print(f"[MERGE] 正在合并 {len(all_dfs)} 个数据源...")
merged_df = pd.concat(all_dfs, ignore_index=True)
print(f" 合并后总行数: {len(merged_df)}")
# 可选:按时间排序
if time_column and time_column in merged_df.columns:
print(f"[SORT] 正在按 '{time_column}' 排序...")
merged_df[time_column] = pd.to_datetime(merged_df[time_column], errors='coerce')
merged_df = merged_df.sort_values(by=time_column, na_position='last')
elif time_column:
print(f"[WARN] 未找到时间列 '{time_column}',跳过排序")
# 保存结果
print(f"[SAVE] 正在保存: {output_file}")
merged_df.to_csv(output_file, index=False, encoding=default_config.csv_encoding)
abs_output = os.path.abspath(output_file)
print(f"[OK] 合并完成!")
print(f" 输出文件: {abs_output}")
print(f" 总行数: {len(merged_df)}")
return abs_output
def _sort_files(files: List[str]) -> List[str]:
"""对文件列表进行智能排序"""
try:
# 尝试按文件名中的数字排序
files.sort(key=lambda x: int(os.path.basename(x).split('.')[0]))
print("[SORT] 已按文件名数字顺序排序")
except ValueError:
# 退回到字母排序
files.sort()
print("[SORT] 已按文件名字母顺序排序")
return files
def _read_file(file_path: str) -> Optional[pd.DataFrame]:
"""读取单个文件(支持 CSV 和 Excel"""
ext = os.path.splitext(file_path)[1].lower()
print(f"[READ] 读取: {os.path.basename(file_path)}")
if ext == '.csv':
df = pd.read_csv(file_path, low_memory=False)
print(f" 行数: {len(df)}")
return df
elif ext in ('.xlsx', '.xls'):
# 读取 Excel 所有 sheet 并合并
xls = pd.ExcelFile(file_path)
print(f" Sheets: {xls.sheet_names}")
sheet_dfs = []
for sheet_name in xls.sheet_names:
df = pd.read_excel(xls, sheet_name=sheet_name)
if not df.empty:
print(f" - Sheet '{sheet_name}': {len(df)}")
sheet_dfs.append(df)
if sheet_dfs:
return pd.concat(sheet_dfs, ignore_index=True)
return None
else:
print(f"[WARN] 不支持的文件格式: {ext}")
return None