Files
iov_ana/import_csv.py

258 lines
8.7 KiB
Python
Raw Permalink Normal View History

"""
将工单 CSV 数据导入 SQLite 数据库 增强版
- 自动检测列名映射兼容中英文
- 空值/异常数据容错
- 数据类型自动推断
- 导入前完整性校验
"""
import csv
import sqlite3
import os
import sys
import re
from typing import Any, Optional
# ── 列名别名映射(兼容不同版本 CSV─────────────────
COLUMN_ALIASES = {
"工单号": ["工单号", "ticket_id", "ticket_no", "id", "工单编号"],
"来源": ["来源", "source", "渠道"],
"创建日期": ["创建日期", "created_date", "create_date"],
"问题类型": ["问题类型", "issue_type", "type", "问题分类"],
"问题描述": ["问题描述", "description", "描述"],
"处理过程": ["处理过程", "process", "处理流程"],
"跟踪记录": ["跟踪记录", "tracking", "跟踪"],
"严重程度": ["严重程度", "severity", "priority", "优先级"],
"工单状态": ["工单状态", "status", "状态"],
"模块": ["模块", "module", "功能模块"],
"责任人": ["责任人", "assignee", "负责人"],
"关闭日期": ["关闭日期", "closed_date", "close_date"],
"车型": ["车型", "vehicle_model", "car_model"],
"VIN": ["VIN", "vin", "车架号"],
"SIM": ["SIM", "sim", "sim卡号"],
"Notes": ["Notes", "notes", "备注"],
"Attachment": ["Attachment", "attachment", "附件"],
"创建人": ["创建人", "creator", "创建者"],
"关闭时长_天": ["关闭时长(天)", "关闭时长_天", "close_duration", "duration_days"],
"创建日期_解析": ["创建日期_解析", "created_date_parsed"],
"关闭日期_解析": ["关闭日期_解析", "closed_date_parsed"],
}
def detect_column_mapping(headers: list[str]) -> dict[str, Optional[str]]:
"""
自动检测 CSV 列名到标准列名的映射
返回 {标准列名: CSV实际列名}找不到的值为 None
"""
# 标准化:去空格、小写
header_map = {h.strip().lower(): h for h in headers}
mapping = {}
for std_name, aliases in COLUMN_ALIASES.items():
found = None
for alias in aliases:
key = alias.strip().lower()
if key in header_map:
found = header_map[key]
break
mapping[std_name] = found
return mapping
def safe_float(val: Any) -> Optional[float]:
"""安全转 float"""
if val is None or str(val).strip() == "":
return None
try:
return float(str(val).strip())
except (ValueError, TypeError):
return None
def safe_str(val: Any) -> str:
"""安全转 stringNone → 空串"""
if val is None:
return ""
return str(val).strip()
def validate_row(row: dict, mapping: dict) -> tuple[bool, list[str]]:
"""校验单行数据,返回 (是否通过, 问题列表)"""
issues = []
ticket_id = safe_str(row.get(mapping.get("工单号", ""), ""))
if not ticket_id:
issues.append("缺少工单号")
return len(issues) == 0, issues
def import_csv(csv_path: str, db_path: str, dry_run: bool = False) -> dict:
"""
导入 CSV SQLite
返回统计信息 dict
"""
stats = {
"total": 0, "imported": 0, "skipped": 0,
"warnings": [], "columns_detected": {}, "columns_missing": [],
}
if not os.path.isfile(csv_path):
print(f"❌ CSV 文件不存在: {csv_path}")
return stats
# ── 读取 CSV ──────────────────────────────
with open(csv_path, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
headers = reader.fieldnames or []
rows = list(reader)
stats["total"] = len(rows)
print(f"📄 读取 CSV: {csv_path}")
print(f" 列: {headers}")
print(f" 行数: {len(rows)}")
# ── 检测列映射 ────────────────────────────
mapping = detect_column_mapping(headers)
detected = {k: v for k, v in mapping.items() if v is not None}
missing = [k for k, v in mapping.items() if v is None]
stats["columns_detected"] = detected
stats["columns_missing"] = missing
print(f"\n🔍 列名映射:")
for std, actual in detected.items():
print(f"{std}{actual}")
for m in missing:
print(f" ⚠️ {m} ← 未找到(将使用空值)")
if "工单号" not in detected:
print(f"\n❌ 至少需要「工单号」列,无法继续!")
return stats
# ── 数据预处理 + 校验 ──────────────────────
processed = []
for i, row in enumerate(rows):
valid, issues = validate_row(row, mapping)
if not valid:
stats["skipped"] += 1
if len(stats["warnings"]) < 10:
stats["warnings"].append(f"{i+2}: {', '.join(issues)}")
continue
def get_col(std_name: str, default: str = "") -> str:
actual = mapping.get(std_name)
return safe_str(row.get(actual, default)) if actual else default
def get_float(std_name: str) -> Optional[float]:
actual = mapping.get(std_name)
if not actual:
return None
return safe_float(row.get(actual))
processed.append((
get_col("工单号"),
get_col("来源"),
get_col("创建日期"),
get_col("问题类型"),
get_col("问题描述"),
get_col("处理过程"),
get_col("跟踪记录"),
get_col("严重程度"),
get_col("工单状态"),
get_col("模块"),
get_col("责任人"),
get_col("关闭日期"),
get_col("车型"),
get_col("VIN"),
get_col("SIM"),
get_col("Notes"),
get_col("Attachment"),
get_col("创建人"),
get_float("关闭时长_天"),
get_col("创建日期_解析"),
get_col("关闭日期_解析"),
))
stats["imported"] = len(processed)
print(f"\n✅ 预处理完成: {len(processed)} 条有效, {stats['skipped']} 条跳过")
if stats["warnings"]:
print(f" 警告:")
for w in stats["warnings"][:5]:
print(f" ⚠️ {w}")
if dry_run:
print(" (dry_run 模式,未写入数据库)")
return stats
# ── 写入数据库 ─────────────────────────────
if os.path.exists(db_path):
os.remove(db_path)
print(f"\n🗑️ 已删除旧数据库: {db_path}")
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute("""
CREATE TABLE tickets (
工单号 TEXT PRIMARY KEY,
来源 TEXT,
创建日期 TEXT,
问题类型 TEXT,
问题描述 TEXT,
处理过程 TEXT,
跟踪记录 TEXT,
严重程度 TEXT,
工单状态 TEXT,
模块 TEXT,
责任人 TEXT,
关闭日期 TEXT,
车型 TEXT,
VIN TEXT,
SIM TEXT,
Notes TEXT,
Attachment TEXT,
创建人 TEXT,
关闭时长_天 REAL,
创建日期_解析 TEXT,
关闭日期_解析 TEXT
)
""")
cur.executemany(
"INSERT INTO tickets VALUES (" + ",".join(["?"] * 21) + ")",
processed,
)
conn.commit()
# ── 验证 ──────────────────────────────────
cur.execute("SELECT COUNT(*) FROM tickets")
db_count = cur.fetchone()[0]
print(f"\n✅ 写入完成: 数据库中 {db_count} 条记录")
# 打印维度信息
for col in ("问题类型", "工单状态", "车型", "模块", "来源"):
actual = mapping.get(col)
if actual:
cur.execute(f'SELECT DISTINCT "{col}" FROM tickets WHERE "{col}" != ""')
vals = [r[0] for r in cur.fetchall()]
if vals:
print(f" {col}: {', '.join(vals[:10])}{'...' if len(vals) > 10 else ''}")
conn.close()
return stats
if __name__ == "__main__":
csv_path = sys.argv[1] if len(sys.argv) > 1 else "cleaned_data.csv"
db_path = os.path.join(os.path.dirname(__file__), "demo.db")
dry_run = "--dry-run" in sys.argv
stats = import_csv(csv_path, db_path, dry_run=dry_run)
if stats["columns_missing"]:
print(f"\n💡 提示: 以下列在 CSV 中未找到,已用空值填充:")
for m in stats["columns_missing"]:
print(f" - {m}")