""" 将工单 CSV 数据导入 SQLite 数据库 —— 增强版 - 自动检测列名映射(兼容中英文) - 空值/异常数据容错 - 数据类型自动推断 - 导入前完整性校验 """ import csv import sqlite3 import os import sys import re from typing import Any, Optional # ── 列名别名映射(兼容不同版本 CSV)───────────────── COLUMN_ALIASES = { "工单号": ["工单号", "ticket_id", "ticket_no", "id", "工单编号"], "来源": ["来源", "source", "渠道"], "创建日期": ["创建日期", "created_date", "create_date"], "问题类型": ["问题类型", "issue_type", "type", "问题分类"], "问题描述": ["问题描述", "description", "描述"], "处理过程": ["处理过程", "process", "处理流程"], "跟踪记录": ["跟踪记录", "tracking", "跟踪"], "严重程度": ["严重程度", "severity", "priority", "优先级"], "工单状态": ["工单状态", "status", "状态"], "模块": ["模块", "module", "功能模块"], "责任人": ["责任人", "assignee", "负责人"], "关闭日期": ["关闭日期", "closed_date", "close_date"], "车型": ["车型", "vehicle_model", "car_model"], "VIN": ["VIN", "vin", "车架号"], "SIM": ["SIM", "sim", "sim卡号"], "Notes": ["Notes", "notes", "备注"], "Attachment": ["Attachment", "attachment", "附件"], "创建人": ["创建人", "creator", "创建者"], "关闭时长_天": ["关闭时长(天)", "关闭时长_天", "close_duration", "duration_days"], "创建日期_解析": ["创建日期_解析", "created_date_parsed"], "关闭日期_解析": ["关闭日期_解析", "closed_date_parsed"], } def detect_column_mapping(headers: list[str]) -> dict[str, Optional[str]]: """ 自动检测 CSV 列名到标准列名的映射。 返回 {标准列名: CSV实际列名},找不到的值为 None。 """ # 标准化:去空格、小写 header_map = {h.strip().lower(): h for h in headers} mapping = {} for std_name, aliases in COLUMN_ALIASES.items(): found = None for alias in aliases: key = alias.strip().lower() if key in header_map: found = header_map[key] break mapping[std_name] = found return mapping def safe_float(val: Any) -> Optional[float]: """安全转 float""" if val is None or str(val).strip() == "": return None try: return float(str(val).strip()) except (ValueError, TypeError): return None def safe_str(val: Any) -> str: """安全转 string,None → 空串""" if val is None: return "" return str(val).strip() def validate_row(row: dict, mapping: dict) -> tuple[bool, list[str]]: """校验单行数据,返回 (是否通过, 问题列表)""" issues = [] ticket_id = safe_str(row.get(mapping.get("工单号", ""), "")) if not ticket_id: issues.append("缺少工单号") return len(issues) == 0, issues def import_csv(csv_path: str, db_path: str, dry_run: bool = False) -> dict: """ 导入 CSV 到 SQLite。 返回统计信息 dict。 """ stats = { "total": 0, "imported": 0, "skipped": 0, "warnings": [], "columns_detected": {}, "columns_missing": [], } if not os.path.isfile(csv_path): print(f"❌ CSV 文件不存在: {csv_path}") return stats # ── 读取 CSV ────────────────────────────── with open(csv_path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) headers = reader.fieldnames or [] rows = list(reader) stats["total"] = len(rows) print(f"📄 读取 CSV: {csv_path}") print(f" 列: {headers}") print(f" 行数: {len(rows)}") # ── 检测列映射 ──────────────────────────── mapping = detect_column_mapping(headers) detected = {k: v for k, v in mapping.items() if v is not None} missing = [k for k, v in mapping.items() if v is None] stats["columns_detected"] = detected stats["columns_missing"] = missing print(f"\n🔍 列名映射:") for std, actual in detected.items(): print(f" ✅ {std} ← {actual}") for m in missing: print(f" ⚠️ {m} ← 未找到(将使用空值)") if "工单号" not in detected: print(f"\n❌ 至少需要「工单号」列,无法继续!") return stats # ── 数据预处理 + 校验 ────────────────────── processed = [] for i, row in enumerate(rows): valid, issues = validate_row(row, mapping) if not valid: stats["skipped"] += 1 if len(stats["warnings"]) < 10: stats["warnings"].append(f"行 {i+2}: {', '.join(issues)}") continue def get_col(std_name: str, default: str = "") -> str: actual = mapping.get(std_name) return safe_str(row.get(actual, default)) if actual else default def get_float(std_name: str) -> Optional[float]: actual = mapping.get(std_name) if not actual: return None return safe_float(row.get(actual)) processed.append(( get_col("工单号"), get_col("来源"), get_col("创建日期"), get_col("问题类型"), get_col("问题描述"), get_col("处理过程"), get_col("跟踪记录"), get_col("严重程度"), get_col("工单状态"), get_col("模块"), get_col("责任人"), get_col("关闭日期"), get_col("车型"), get_col("VIN"), get_col("SIM"), get_col("Notes"), get_col("Attachment"), get_col("创建人"), get_float("关闭时长_天"), get_col("创建日期_解析"), get_col("关闭日期_解析"), )) stats["imported"] = len(processed) print(f"\n✅ 预处理完成: {len(processed)} 条有效, {stats['skipped']} 条跳过") if stats["warnings"]: print(f" 警告:") for w in stats["warnings"][:5]: print(f" ⚠️ {w}") if dry_run: print(" (dry_run 模式,未写入数据库)") return stats # ── 写入数据库 ───────────────────────────── if os.path.exists(db_path): os.remove(db_path) print(f"\n🗑️ 已删除旧数据库: {db_path}") conn = sqlite3.connect(db_path) cur = conn.cursor() cur.execute(""" CREATE TABLE tickets ( 工单号 TEXT PRIMARY KEY, 来源 TEXT, 创建日期 TEXT, 问题类型 TEXT, 问题描述 TEXT, 处理过程 TEXT, 跟踪记录 TEXT, 严重程度 TEXT, 工单状态 TEXT, 模块 TEXT, 责任人 TEXT, 关闭日期 TEXT, 车型 TEXT, VIN TEXT, SIM TEXT, Notes TEXT, Attachment TEXT, 创建人 TEXT, 关闭时长_天 REAL, 创建日期_解析 TEXT, 关闭日期_解析 TEXT ) """) cur.executemany( "INSERT INTO tickets VALUES (" + ",".join(["?"] * 21) + ")", processed, ) conn.commit() # ── 验证 ────────────────────────────────── cur.execute("SELECT COUNT(*) FROM tickets") db_count = cur.fetchone()[0] print(f"\n✅ 写入完成: 数据库中 {db_count} 条记录") # 打印维度信息 for col in ("问题类型", "工单状态", "车型", "模块", "来源"): actual = mapping.get(col) if actual: cur.execute(f'SELECT DISTINCT "{col}" FROM tickets WHERE "{col}" != ""') vals = [r[0] for r in cur.fetchall()] if vals: print(f" {col}: {', '.join(vals[:10])}{'...' if len(vals) > 10 else ''}") conn.close() return stats if __name__ == "__main__": csv_path = sys.argv[1] if len(sys.argv) > 1 else "cleaned_data.csv" db_path = os.path.join(os.path.dirname(__file__), "demo.db") dry_run = "--dry-run" in sys.argv stats = import_csv(csv_path, db_path, dry_run=dry_run) if stats["columns_missing"]: print(f"\n💡 提示: 以下列在 CSV 中未找到,已用空值填充:") for m in stats["columns_missing"]: print(f" - {m}")