diff --git a/cleaned_data/.gitkeep b/cleaned_data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/data_preprocessing/README.md b/data_preprocessing/README.md new file mode 100644 index 0000000..6b14908 --- /dev/null +++ b/data_preprocessing/README.md @@ -0,0 +1,89 @@ +# 数据预处理模块 + +独立的数据清洗工具,用于在正式分析前准备数据。 + +## 功能 + +- **数据合并**:将多个 Excel/CSV 文件合并为单一 CSV +- **时间排序**:按时间列对数据进行排序 +- **目录管理**:标准化的原始数据和输出数据目录 + +## 目录结构 + +``` +project/ +├── raw_data/ # 原始数据存放目录 +│ ├── remotecontrol/ # 按数据来源分类 +│ └── ... +├── cleaned_data/ # 清洗后数据输出目录 +│ ├── xxx_merged.csv +│ └── xxx_sorted.csv +└── data_preprocessing/ # 本模块 +``` + +## 使用方法 + +### 命令行 + +```bash +# 初始化目录结构 +python -m data_preprocessing.cli init + +# 合并 Excel 文件 +python -m data_preprocessing.cli merge --source raw_data/remotecontrol + +# 合并并按时间排序 +python -m data_preprocessing.cli merge --source raw_data/remotecontrol --sort-by SendTime + +# 指定输出路径 +python -m data_preprocessing.cli merge -s raw_data/remotecontrol -o cleaned_data/my_output.csv + +# 排序已有 CSV +python -m data_preprocessing.cli sort --input some_file.csv --time-col SendTime + +# 原地排序(覆盖原文件) +python -m data_preprocessing.cli sort --input data.csv --inplace +``` + +### Python API + +```python +from data_preprocessing import merge_files, sort_by_time, Config + +# 合并文件 +output_path = merge_files( + source_dir="raw_data/remotecontrol", + output_file="cleaned_data/merged.csv", + pattern="*.xlsx", + time_column="SendTime" # 可选:合并后排序 +) + +# 排序 CSV +sorted_path = sort_by_time( + input_path="data.csv", + output_path="sorted_data.csv", + time_column="CreateTime" +) + +# 自定义配置 +config = Config() +config.raw_data_dir = "/path/to/raw" +config.cleaned_data_dir = "/path/to/cleaned" +config.ensure_dirs() +``` + +## 配置项 + +| 配置项 | 默认值 | 说明 | +|--------|--------|------| +| `raw_data_dir` | `raw_data/` | 原始数据目录 | +| `cleaned_data_dir` | `cleaned_data/` | 清洗输出目录 | +| `default_time_column` | `SendTime` | 默认时间列名 | +| `csv_encoding` | `utf-8-sig` | CSV 编码格式 | + +## 注意事项 + +1. 本模块与 `DataAnalysisAgent` 完全独立,不会相互调用 +2. 合并时会自动添加 `_source_file` 列标记数据来源(可用 `--no-source-col` 禁用) +3. Excel 文件会自动合并所有 Sheet +4. 无效时间值在排序时会被放到最后 diff --git a/data_preprocessing/__init__.py b/data_preprocessing/__init__.py new file mode 100644 index 0000000..f83fa68 --- /dev/null +++ b/data_preprocessing/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +""" +数据预处理模块 + +提供独立的数据清洗功能: +- 按时间排序 +- 同类数据合并 +""" + +from .sorter import sort_by_time +from .merger import merge_files +from .config import Config + +__all__ = ["sort_by_time", "merge_files", "Config"] diff --git a/data_preprocessing/cli.py b/data_preprocessing/cli.py new file mode 100644 index 0000000..a56a145 --- /dev/null +++ b/data_preprocessing/cli.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +""" +数据预处理命令行接口 + +使用示例: + # 合并 Excel 文件 + python -m data_preprocessing.cli merge --source raw_data/remotecontrol --output cleaned_data/merged.csv + + # 合并并排序 + python -m data_preprocessing.cli merge --source raw_data/remotecontrol --sort-by SendTime + + # 排序已有 CSV + python -m data_preprocessing.cli sort --input data.csv --output sorted.csv --time-col SendTime + + # 初始化目录结构 + python -m data_preprocessing.cli init +""" + +import argparse +import sys +from .config import default_config +from .sorter import sort_by_time +from .merger import merge_files + + +def main(): + parser = argparse.ArgumentParser( + prog="data_preprocessing", + description="数据预处理工具:排序、合并", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例: + %(prog)s merge --source raw_data/remotecontrol --sort-by SendTime + %(prog)s sort --input data.csv --time-col CreateTime + %(prog)s init + """ + ) + + subparsers = parser.add_subparsers(dest="command", help="可用命令") + + # ========== merge 命令 ========== + merge_parser = subparsers.add_parser("merge", help="合并同类文件") + merge_parser.add_argument( + "--source", "-s", + required=True, + help="源数据目录路径" + ) + merge_parser.add_argument( + "--output", "-o", + default=None, + help="输出文件路径 (默认: cleaned_data/<目录名>_merged.csv)" + ) + merge_parser.add_argument( + "--pattern", "-p", + default="*.xlsx", + help="文件匹配模式 (默认: *.xlsx)" + ) + merge_parser.add_argument( + "--sort-by", + default=None, + dest="time_column", + help="合并后按此时间列排序" + ) + merge_parser.add_argument( + "--no-source-col", + action="store_true", + help="不添加来源文件列" + ) + + # ========== sort 命令 ========== + sort_parser = subparsers.add_parser("sort", help="按时间排序 CSV") + sort_parser.add_argument( + "--input", "-i", + required=True, + help="输入 CSV 文件路径" + ) + sort_parser.add_argument( + "--output", "-o", + default=None, + help="输出文件路径 (默认: cleaned_data/<文件名>_sorted.csv)" + ) + sort_parser.add_argument( + "--time-col", "-t", + default=None, + dest="time_column", + help=f"时间列名 (默认: {default_config.default_time_column})" + ) + sort_parser.add_argument( + "--inplace", + action="store_true", + help="原地覆盖输入文件" + ) + + # ========== init 命令 ========== + init_parser = subparsers.add_parser("init", help="初始化目录结构") + + # 解析参数 + args = parser.parse_args() + + if args.command is None: + parser.print_help() + sys.exit(0) + + try: + if args.command == "merge": + result = merge_files( + source_dir=args.source, + output_file=args.output, + pattern=args.pattern, + time_column=args.time_column, + add_source_column=not args.no_source_col + ) + print(f"\n✅ 合并成功: {result}") + + elif args.command == "sort": + result = sort_by_time( + input_path=args.input, + output_path=args.output, + time_column=args.time_column, + inplace=args.inplace + ) + print(f"\n✅ 排序成功: {result}") + + elif args.command == "init": + default_config.ensure_dirs() + print("\n✅ 目录初始化完成") + + except FileNotFoundError as e: + print(f"\n❌ 错误: {e}") + sys.exit(1) + except KeyError as e: + print(f"\n❌ 错误: {e}") + sys.exit(1) + except Exception as e: + print(f"\n❌ 未知错误: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/data_preprocessing/config.py b/data_preprocessing/config.py new file mode 100644 index 0000000..f41e107 --- /dev/null +++ b/data_preprocessing/config.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +""" +数据预处理模块配置 +""" + +import os +from dataclasses import dataclass + +# 获取项目根目录 +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + +@dataclass +class Config: + """预处理模块配置""" + + # 原始数据存放目录 + raw_data_dir: str = os.path.join(PROJECT_ROOT, "raw_data") + + # 清洗后数据输出目录 + cleaned_data_dir: str = os.path.join(PROJECT_ROOT, "cleaned_data") + + # 默认时间列名 + default_time_column: str = "SendTime" + + # 支持的文件扩展名 + supported_extensions: tuple = (".csv", ".xlsx", ".xls") + + # CSV 编码 + csv_encoding: str = "utf-8-sig" + + def ensure_dirs(self): + """确保目录存在""" + os.makedirs(self.raw_data_dir, exist_ok=True) + os.makedirs(self.cleaned_data_dir, exist_ok=True) + print(f"[OK] 目录已就绪:") + print(f" 原始数据: {self.raw_data_dir}") + print(f" 清洗输出: {self.cleaned_data_dir}") + + +# 默认配置实例 +default_config = Config() diff --git a/merge_excel.py b/data_preprocessing/merge_excel.py similarity index 100% rename from merge_excel.py rename to data_preprocessing/merge_excel.py diff --git a/data_preprocessing/merger.py b/data_preprocessing/merger.py new file mode 100644 index 0000000..5e3d608 --- /dev/null +++ b/data_preprocessing/merger.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +""" +数据合并模块 + +合并同类 Excel/CSV 文件 +""" + +import os +import glob +import pandas as pd +from typing import Optional, List +from .config import default_config + + +def merge_files( + source_dir: str, + output_file: Optional[str] = None, + pattern: str = "*.xlsx", + time_column: Optional[str] = None, + add_source_column: bool = True +) -> str: + """ + 合并目录下的所有同类文件 + + Args: + source_dir: 源数据目录 + output_file: 输出 CSV 文件路径。如果为 None,则输出到 cleaned_data 目录 + pattern: 文件匹配模式 (e.g., "*.xlsx", "*.csv", "*.xls") + time_column: 可选,合并后按此列排序 + add_source_column: 是否添加来源文件列 + + Returns: + 输出文件的绝对路径 + + Raises: + FileNotFoundError: 目录不存在或未找到匹配文件 + """ + if not os.path.isdir(source_dir): + raise FileNotFoundError(f"目录不存在: {source_dir}") + + print(f"[SCAN] 正在扫描目录: {source_dir}") + print(f" 匹配模式: {pattern}") + + # 查找匹配文件 + files = glob.glob(os.path.join(source_dir, pattern)) + + # 如果是 xlsx,也尝试匹配 xls + if pattern == "*.xlsx": + files.extend(glob.glob(os.path.join(source_dir, "*.xls"))) + + if not files: + raise FileNotFoundError(f"未找到匹配 '{pattern}' 的文件") + + # 排序文件列表 + files = _sort_files(files) + print(f"[FOUND] 找到 {len(files)} 个文件") + + # 确定输出路径 + if output_file is None: + default_config.ensure_dirs() + dir_name = os.path.basename(os.path.normpath(source_dir)) + output_file = os.path.join( + default_config.cleaned_data_dir, + f"{dir_name}_merged.csv" + ) + + # 合并数据 + all_dfs = [] + for file in files: + try: + df = _read_file(file) + if df is not None and not df.empty: + if add_source_column: + df['_source_file'] = os.path.basename(file) + all_dfs.append(df) + except Exception as e: + print(f"[ERROR] 读取失败 {file}: {e}") + + if not all_dfs: + raise ValueError("没有成功读取到任何数据") + + print(f"[MERGE] 正在合并 {len(all_dfs)} 个数据源...") + merged_df = pd.concat(all_dfs, ignore_index=True) + print(f" 合并后总行数: {len(merged_df)}") + + # 可选:按时间排序 + if time_column and time_column in merged_df.columns: + print(f"[SORT] 正在按 '{time_column}' 排序...") + merged_df[time_column] = pd.to_datetime(merged_df[time_column], errors='coerce') + merged_df = merged_df.sort_values(by=time_column, na_position='last') + elif time_column: + print(f"[WARN] 未找到时间列 '{time_column}',跳过排序") + + # 保存结果 + print(f"[SAVE] 正在保存: {output_file}") + merged_df.to_csv(output_file, index=False, encoding=default_config.csv_encoding) + + abs_output = os.path.abspath(output_file) + print(f"[OK] 合并完成!") + print(f" 输出文件: {abs_output}") + print(f" 总行数: {len(merged_df)}") + + return abs_output + + +def _sort_files(files: List[str]) -> List[str]: + """对文件列表进行智能排序""" + try: + # 尝试按文件名中的数字排序 + files.sort(key=lambda x: int(os.path.basename(x).split('.')[0])) + print("[SORT] 已按文件名数字顺序排序") + except ValueError: + # 退回到字母排序 + files.sort() + print("[SORT] 已按文件名字母顺序排序") + return files + + +def _read_file(file_path: str) -> Optional[pd.DataFrame]: + """读取单个文件(支持 CSV 和 Excel)""" + ext = os.path.splitext(file_path)[1].lower() + + print(f"[READ] 读取: {os.path.basename(file_path)}") + + if ext == '.csv': + df = pd.read_csv(file_path, low_memory=False) + print(f" 行数: {len(df)}") + return df + + elif ext in ('.xlsx', '.xls'): + # 读取 Excel 所有 sheet 并合并 + xls = pd.ExcelFile(file_path) + print(f" Sheets: {xls.sheet_names}") + + sheet_dfs = [] + for sheet_name in xls.sheet_names: + df = pd.read_excel(xls, sheet_name=sheet_name) + if not df.empty: + print(f" - Sheet '{sheet_name}': {len(df)} 行") + sheet_dfs.append(df) + + if sheet_dfs: + return pd.concat(sheet_dfs, ignore_index=True) + return None + + else: + print(f"[WARN] 不支持的文件格式: {ext}") + return None diff --git a/sort_csv.py b/data_preprocessing/sort_csv.py similarity index 100% rename from sort_csv.py rename to data_preprocessing/sort_csv.py diff --git a/data_preprocessing/sorter.py b/data_preprocessing/sorter.py new file mode 100644 index 0000000..9dffe61 --- /dev/null +++ b/data_preprocessing/sorter.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +""" +数据排序模块 + +按时间列对 CSV 文件进行排序 +""" + +import os +import pandas as pd +from typing import Optional +from .config import default_config + + +def sort_by_time( + input_path: str, + output_path: Optional[str] = None, + time_column: str = None, + inplace: bool = False +) -> str: + """ + 按时间列对 CSV 文件排序 + + Args: + input_path: 输入 CSV 文件路径 + output_path: 输出路径。如果为 None 且 inplace=False,则输出到 cleaned_data 目录 + time_column: 时间列名,默认使用配置中的 default_time_column + inplace: 是否原地覆盖输入文件 + + Returns: + 输出文件的绝对路径 + + Raises: + FileNotFoundError: 输入文件不存在 + KeyError: 时间列不存在 + """ + # 参数处理 + time_column = time_column or default_config.default_time_column + + if not os.path.exists(input_path): + raise FileNotFoundError(f"文件不存在: {input_path}") + + # 确定输出路径 + if inplace: + output_path = input_path + elif output_path is None: + default_config.ensure_dirs() + basename = os.path.basename(input_path) + name, ext = os.path.splitext(basename) + output_path = os.path.join( + default_config.cleaned_data_dir, + f"{name}_sorted{ext}" + ) + + print(f"[READ] 正在读取: {input_path}") + df = pd.read_csv(input_path, low_memory=False) + print(f" 数据行数: {len(df)}") + + # 检查时间列是否存在 + if time_column not in df.columns: + available_cols = list(df.columns) + raise KeyError( + f"未找到时间列 '{time_column}'。可用列: {available_cols}" + ) + + print(f"[PARSE] 正在解析时间列 '{time_column}'...") + df[time_column] = pd.to_datetime(df[time_column], errors='coerce') + + # 统计无效时间 + nat_count = df[time_column].isna().sum() + if nat_count > 0: + print(f"[WARN] 发现 {nat_count} 行无效时间数据,排序时将排在最后") + + print("[SORT] 正在按时间排序...") + df_sorted = df.sort_values(by=time_column, na_position='last') + + print(f"[SAVE] 正在保存: {output_path}") + df_sorted.to_csv(output_path, index=False, encoding=default_config.csv_encoding) + + abs_output = os.path.abspath(output_path) + print(f"[OK] 排序完成!输出文件: {abs_output}") + + return abs_output diff --git a/main.py b/main.py index cab1102..b9ac2f2 100644 --- a/main.py +++ b/main.py @@ -43,7 +43,7 @@ def main(): import os # 自动查找当前目录及remotecontrol目录下的所有数据文件 data_extensions = ['*.csv', '*.xlsx', '*.xls'] - search_dirs = ['jetour'] + search_dirs = ['cleaned_data'] files = [] for search_dir in search_dirs: diff --git a/prompts.py b/prompts.py index 171171f..e783de4 100644 --- a/prompts.py +++ b/prompts.py @@ -1,19 +1,10 @@ data_analysis_system_prompt = """你是一个专业的数据分析助手,运行在Jupyter Notebook环境中,能够根据用户需求生成和执行Python数据分析代码。 - -<<<<<<< HEAD **核心使命**: - 接收自然语言需求,分阶段生成高效、安全的数据分析代码。 - 深度挖掘数据,不仅仅是绘图,更要发现数据背后的业务洞察。 - 输出高质量、可落地的业务分析报告。 **核心能力**: -======= -[TARGET] **核心使命**:+、安全的数据分析代码。 -- 深度挖掘数据,不仅仅是绘图,更要发现数据背后的业务洞察。 -- 输出高质量、可落地的业务分析报告。 - -[TOOL] **核心能力**: ->>>>>>> e9644360ce283742849fe67c38d05864513e2f96 1. **代码执行**:自动编写并执行Pandas/Matplotlib代码。 2. **多模态分析**:支持时序预测、文本挖掘(N-gram)、多维交叉分析。 3. **智能纠错**:遇到报错自动分析原因并修复代码。 @@ -32,11 +23,7 @@ jupyter notebook环境当前变量: --- -<<<<<<< HEAD **代码生成规则 (Code Generation Rules)**: -======= -[TOOL] **代码生成规则 (Code Generation Rules)**: ->>>>>>> e9644360ce283742849fe67c38d05864513e2f96 **1. 执行策略**: - **分步执行**:每次只专注一个分析阶段(如“清洗”或“可视化”),不要试图一次性写完所有代码。 @@ -66,11 +53,7 @@ jupyter notebook环境当前变量: --- -<<<<<<< HEAD **标准化分析SOP (Standard Operating Procedure)**: -======= -[START] **标准化分析SOP (Standard Operating Procedure)**: ->>>>>>> e9644360ce283742849fe67c38d05864513e2f96 **阶段1:数据探索与智能加载** - 检查文件扩展名与实际格式是否一致(CSV vs Excel)。 @@ -102,11 +85,7 @@ jupyter notebook环境当前变量: --- -<<<<<<< HEAD **动作选择指南 (Action Selection)**: -======= -[LIST] **动作选择指南 (Action Selection)**: ->>>>>>> e9644360ce283742849fe67c38d05864513e2f96 1. **generate_code** - 场景:需要执行代码(加载、分析、绘图)。 @@ -147,11 +126,7 @@ jupyter notebook环境当前变量: --- -<<<<<<< HEAD **特别提示**: -======= -[WARN] **特别提示**: ->>>>>>> e9644360ce283742849fe67c38d05864513e2f96 - **翻译要求**:报告中的英文专有名词(除了TSP, TBOX, HU等标准缩写)必须翻译成中文(Remote Control -> 远控)。 - **客观陈述**:不要使用"data shows", "plot indicates"等技术语言,直接陈述业务事实("X车型在Y模块故障率最高")。 - **鲁棒性**:如果代码报错,请深呼吸,分析错误日志,修改代码重试。不要重复无效代码。 @@ -362,11 +337,7 @@ jupyter notebook环境当前变量(已包含之前分析的数据df): --- -<<<<<<< HEAD **关键红线 (Critical Rules)**: -======= -[ALERT] **关键红线 (Critical Rules)**: ->>>>>>> e9644360ce283742849fe67c38d05864513e2f96 1. **进程保护**:严禁使用 `exit()`、`quit()` 或 `sys.exit()`。 2. **数据安全**:严禁伪造数据。严禁写入非结果文件。 3. **文件验证**:所有文件操作前必须 `os.path.exists()`。 @@ -375,23 +346,14 @@ jupyter notebook环境当前变量(已包含之前分析的数据df): --- -<<<<<<< HEAD **代码生成规则 (Reuse)**: -======= -[TOOL] **代码生成规则 (Reuse)**: ->>>>>>> e9644360ce283742849fe67c38d05864513e2f96 - **环境持久化**:直接使用已加载的 `df`,不要重复加载数据。 - **可视化规范**:中文字体配置、类别>5使用水平条形图、美学要求同上。 - **文本挖掘**:如需挖掘,继续遵守N-gram和停用词规则。 --- -<<<<<<< HEAD **动作选择指南**: -======= -[LIST] **动作选择指南**: - ->>>>>>> e9644360ce283742849fe67c38d05864513e2f96 1. **generate_code** - 场景:执行针对追问的代码。 - 格式:同标准模式。 diff --git a/raw_data/.gitkeep b/raw_data/.gitkeep new file mode 100644 index 0000000..e69de29