2026-03-20 13:20:31 +08:00
|
|
|
|
"""
|
2026-03-31 14:39:17 +08:00
|
|
|
|
将工单 CSV 数据导入 SQLite 数据库 —— 增强版
|
|
|
|
|
|
- 自动检测列名映射(兼容中英文)
|
|
|
|
|
|
- 空值/异常数据容错
|
|
|
|
|
|
- 数据类型自动推断
|
|
|
|
|
|
- 导入前完整性校验
|
2026-03-20 13:20:31 +08:00
|
|
|
|
"""
|
|
|
|
|
|
import csv
|
|
|
|
|
|
import sqlite3
|
|
|
|
|
|
import os
|
|
|
|
|
|
import sys
|
2026-03-31 14:39:17 +08:00
|
|
|
|
import re
|
|
|
|
|
|
from typing import Any, Optional
|
2026-03-20 13:20:31 +08:00
|
|
|
|
|
2026-03-31 14:39:17 +08:00
|
|
|
|
|
|
|
|
|
|
# ── 列名别名映射(兼容不同版本 CSV)─────────────────
|
|
|
|
|
|
|
|
|
|
|
|
COLUMN_ALIASES = {
|
|
|
|
|
|
"工单号": ["工单号", "ticket_id", "ticket_no", "id", "工单编号"],
|
|
|
|
|
|
"来源": ["来源", "source", "渠道"],
|
|
|
|
|
|
"创建日期": ["创建日期", "created_date", "create_date"],
|
|
|
|
|
|
"问题类型": ["问题类型", "issue_type", "type", "问题分类"],
|
|
|
|
|
|
"问题描述": ["问题描述", "description", "描述"],
|
|
|
|
|
|
"处理过程": ["处理过程", "process", "处理流程"],
|
|
|
|
|
|
"跟踪记录": ["跟踪记录", "tracking", "跟踪"],
|
|
|
|
|
|
"严重程度": ["严重程度", "severity", "priority", "优先级"],
|
|
|
|
|
|
"工单状态": ["工单状态", "status", "状态"],
|
|
|
|
|
|
"模块": ["模块", "module", "功能模块"],
|
|
|
|
|
|
"责任人": ["责任人", "assignee", "负责人"],
|
|
|
|
|
|
"关闭日期": ["关闭日期", "closed_date", "close_date"],
|
|
|
|
|
|
"车型": ["车型", "vehicle_model", "car_model"],
|
|
|
|
|
|
"VIN": ["VIN", "vin", "车架号"],
|
|
|
|
|
|
"SIM": ["SIM", "sim", "sim卡号"],
|
|
|
|
|
|
"Notes": ["Notes", "notes", "备注"],
|
|
|
|
|
|
"Attachment": ["Attachment", "attachment", "附件"],
|
|
|
|
|
|
"创建人": ["创建人", "creator", "创建者"],
|
|
|
|
|
|
"关闭时长_天": ["关闭时长(天)", "关闭时长_天", "close_duration", "duration_days"],
|
|
|
|
|
|
"创建日期_解析": ["创建日期_解析", "created_date_parsed"],
|
|
|
|
|
|
"关闭日期_解析": ["关闭日期_解析", "closed_date_parsed"],
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_column_mapping(headers: list[str]) -> dict[str, Optional[str]]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
自动检测 CSV 列名到标准列名的映射。
|
|
|
|
|
|
返回 {标准列名: CSV实际列名},找不到的值为 None。
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 标准化:去空格、小写
|
|
|
|
|
|
header_map = {h.strip().lower(): h for h in headers}
|
|
|
|
|
|
mapping = {}
|
|
|
|
|
|
|
|
|
|
|
|
for std_name, aliases in COLUMN_ALIASES.items():
|
|
|
|
|
|
found = None
|
|
|
|
|
|
for alias in aliases:
|
|
|
|
|
|
key = alias.strip().lower()
|
|
|
|
|
|
if key in header_map:
|
|
|
|
|
|
found = header_map[key]
|
|
|
|
|
|
break
|
|
|
|
|
|
mapping[std_name] = found
|
|
|
|
|
|
|
|
|
|
|
|
return mapping
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_float(val: Any) -> Optional[float]:
|
|
|
|
|
|
"""安全转 float"""
|
|
|
|
|
|
if val is None or str(val).strip() == "":
|
|
|
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
|
|
|
return float(str(val).strip())
|
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_str(val: Any) -> str:
|
|
|
|
|
|
"""安全转 string,None → 空串"""
|
|
|
|
|
|
if val is None:
|
|
|
|
|
|
return ""
|
|
|
|
|
|
return str(val).strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_row(row: dict, mapping: dict) -> tuple[bool, list[str]]:
|
|
|
|
|
|
"""校验单行数据,返回 (是否通过, 问题列表)"""
|
|
|
|
|
|
issues = []
|
|
|
|
|
|
ticket_id = safe_str(row.get(mapping.get("工单号", ""), ""))
|
|
|
|
|
|
if not ticket_id:
|
|
|
|
|
|
issues.append("缺少工单号")
|
|
|
|
|
|
return len(issues) == 0, issues
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def import_csv(csv_path: str, db_path: str, dry_run: bool = False) -> dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
导入 CSV 到 SQLite。
|
|
|
|
|
|
返回统计信息 dict。
|
|
|
|
|
|
"""
|
|
|
|
|
|
stats = {
|
|
|
|
|
|
"total": 0, "imported": 0, "skipped": 0,
|
|
|
|
|
|
"warnings": [], "columns_detected": {}, "columns_missing": [],
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if not os.path.isfile(csv_path):
|
|
|
|
|
|
print(f"❌ CSV 文件不存在: {csv_path}")
|
|
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
|
|
|
# ── 读取 CSV ──────────────────────────────
|
|
|
|
|
|
with open(csv_path, "r", encoding="utf-8-sig") as f:
|
|
|
|
|
|
reader = csv.DictReader(f)
|
|
|
|
|
|
headers = reader.fieldnames or []
|
|
|
|
|
|
rows = list(reader)
|
|
|
|
|
|
|
|
|
|
|
|
stats["total"] = len(rows)
|
|
|
|
|
|
print(f"📄 读取 CSV: {csv_path}")
|
|
|
|
|
|
print(f" 列: {headers}")
|
|
|
|
|
|
print(f" 行数: {len(rows)}")
|
|
|
|
|
|
|
|
|
|
|
|
# ── 检测列映射 ────────────────────────────
|
|
|
|
|
|
mapping = detect_column_mapping(headers)
|
|
|
|
|
|
detected = {k: v for k, v in mapping.items() if v is not None}
|
|
|
|
|
|
missing = [k for k, v in mapping.items() if v is None]
|
|
|
|
|
|
stats["columns_detected"] = detected
|
|
|
|
|
|
stats["columns_missing"] = missing
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\n🔍 列名映射:")
|
|
|
|
|
|
for std, actual in detected.items():
|
|
|
|
|
|
print(f" ✅ {std} ← {actual}")
|
|
|
|
|
|
for m in missing:
|
|
|
|
|
|
print(f" ⚠️ {m} ← 未找到(将使用空值)")
|
|
|
|
|
|
|
|
|
|
|
|
if "工单号" not in detected:
|
|
|
|
|
|
print(f"\n❌ 至少需要「工单号」列,无法继续!")
|
|
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
|
|
|
# ── 数据预处理 + 校验 ──────────────────────
|
|
|
|
|
|
processed = []
|
|
|
|
|
|
for i, row in enumerate(rows):
|
|
|
|
|
|
valid, issues = validate_row(row, mapping)
|
|
|
|
|
|
if not valid:
|
|
|
|
|
|
stats["skipped"] += 1
|
|
|
|
|
|
if len(stats["warnings"]) < 10:
|
|
|
|
|
|
stats["warnings"].append(f"行 {i+2}: {', '.join(issues)}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
def get_col(std_name: str, default: str = "") -> str:
|
|
|
|
|
|
actual = mapping.get(std_name)
|
|
|
|
|
|
return safe_str(row.get(actual, default)) if actual else default
|
|
|
|
|
|
|
|
|
|
|
|
def get_float(std_name: str) -> Optional[float]:
|
|
|
|
|
|
actual = mapping.get(std_name)
|
|
|
|
|
|
if not actual:
|
|
|
|
|
|
return None
|
|
|
|
|
|
return safe_float(row.get(actual))
|
|
|
|
|
|
|
|
|
|
|
|
processed.append((
|
|
|
|
|
|
get_col("工单号"),
|
|
|
|
|
|
get_col("来源"),
|
|
|
|
|
|
get_col("创建日期"),
|
|
|
|
|
|
get_col("问题类型"),
|
|
|
|
|
|
get_col("问题描述"),
|
|
|
|
|
|
get_col("处理过程"),
|
|
|
|
|
|
get_col("跟踪记录"),
|
|
|
|
|
|
get_col("严重程度"),
|
|
|
|
|
|
get_col("工单状态"),
|
|
|
|
|
|
get_col("模块"),
|
|
|
|
|
|
get_col("责任人"),
|
|
|
|
|
|
get_col("关闭日期"),
|
|
|
|
|
|
get_col("车型"),
|
|
|
|
|
|
get_col("VIN"),
|
|
|
|
|
|
get_col("SIM"),
|
|
|
|
|
|
get_col("Notes"),
|
|
|
|
|
|
get_col("Attachment"),
|
|
|
|
|
|
get_col("创建人"),
|
|
|
|
|
|
get_float("关闭时长_天"),
|
|
|
|
|
|
get_col("创建日期_解析"),
|
|
|
|
|
|
get_col("关闭日期_解析"),
|
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
stats["imported"] = len(processed)
|
|
|
|
|
|
print(f"\n✅ 预处理完成: {len(processed)} 条有效, {stats['skipped']} 条跳过")
|
|
|
|
|
|
|
|
|
|
|
|
if stats["warnings"]:
|
|
|
|
|
|
print(f" 警告:")
|
|
|
|
|
|
for w in stats["warnings"][:5]:
|
|
|
|
|
|
print(f" ⚠️ {w}")
|
|
|
|
|
|
|
|
|
|
|
|
if dry_run:
|
|
|
|
|
|
print(" (dry_run 模式,未写入数据库)")
|
|
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
|
|
|
# ── 写入数据库 ─────────────────────────────
|
2026-03-20 13:20:31 +08:00
|
|
|
|
if os.path.exists(db_path):
|
|
|
|
|
|
os.remove(db_path)
|
2026-03-31 14:39:17 +08:00
|
|
|
|
print(f"\n🗑️ 已删除旧数据库: {db_path}")
|
2026-03-20 13:20:31 +08:00
|
|
|
|
|
|
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
|
|
|
|
cur = conn.cursor()
|
|
|
|
|
|
|
|
|
|
|
|
cur.execute("""
|
|
|
|
|
|
CREATE TABLE tickets (
|
|
|
|
|
|
工单号 TEXT PRIMARY KEY,
|
|
|
|
|
|
来源 TEXT,
|
|
|
|
|
|
创建日期 TEXT,
|
|
|
|
|
|
问题类型 TEXT,
|
|
|
|
|
|
问题描述 TEXT,
|
|
|
|
|
|
处理过程 TEXT,
|
|
|
|
|
|
跟踪记录 TEXT,
|
|
|
|
|
|
严重程度 TEXT,
|
|
|
|
|
|
工单状态 TEXT,
|
|
|
|
|
|
模块 TEXT,
|
|
|
|
|
|
责任人 TEXT,
|
|
|
|
|
|
关闭日期 TEXT,
|
|
|
|
|
|
车型 TEXT,
|
|
|
|
|
|
VIN TEXT,
|
|
|
|
|
|
SIM TEXT,
|
|
|
|
|
|
Notes TEXT,
|
|
|
|
|
|
Attachment TEXT,
|
|
|
|
|
|
创建人 TEXT,
|
|
|
|
|
|
关闭时长_天 REAL,
|
|
|
|
|
|
创建日期_解析 TEXT,
|
|
|
|
|
|
关闭日期_解析 TEXT
|
|
|
|
|
|
)
|
|
|
|
|
|
""")
|
|
|
|
|
|
|
2026-03-31 14:39:17 +08:00
|
|
|
|
cur.executemany(
|
|
|
|
|
|
"INSERT INTO tickets VALUES (" + ",".join(["?"] * 21) + ")",
|
|
|
|
|
|
processed,
|
|
|
|
|
|
)
|
2026-03-20 13:20:31 +08:00
|
|
|
|
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
|
2026-03-31 14:39:17 +08:00
|
|
|
|
# ── 验证 ──────────────────────────────────
|
2026-03-20 13:20:31 +08:00
|
|
|
|
cur.execute("SELECT COUNT(*) FROM tickets")
|
2026-03-31 14:39:17 +08:00
|
|
|
|
db_count = cur.fetchone()[0]
|
|
|
|
|
|
print(f"\n✅ 写入完成: 数据库中 {db_count} 条记录")
|
2026-03-20 13:20:31 +08:00
|
|
|
|
|
2026-03-31 14:39:17 +08:00
|
|
|
|
# 打印维度信息
|
|
|
|
|
|
for col in ("问题类型", "工单状态", "车型", "模块", "来源"):
|
|
|
|
|
|
actual = mapping.get(col)
|
|
|
|
|
|
if actual:
|
|
|
|
|
|
cur.execute(f'SELECT DISTINCT "{col}" FROM tickets WHERE "{col}" != ""')
|
|
|
|
|
|
vals = [r[0] for r in cur.fetchall()]
|
|
|
|
|
|
if vals:
|
|
|
|
|
|
print(f" {col}: {', '.join(vals[:10])}{'...' if len(vals) > 10 else ''}")
|
2026-03-20 13:20:31 +08:00
|
|
|
|
|
|
|
|
|
|
conn.close()
|
2026-03-31 14:39:17 +08:00
|
|
|
|
return stats
|
2026-03-20 13:20:31 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
csv_path = sys.argv[1] if len(sys.argv) > 1 else "cleaned_data.csv"
|
|
|
|
|
|
db_path = os.path.join(os.path.dirname(__file__), "demo.db")
|
2026-03-31 14:39:17 +08:00
|
|
|
|
dry_run = "--dry-run" in sys.argv
|
|
|
|
|
|
|
|
|
|
|
|
stats = import_csv(csv_path, db_path, dry_run=dry_run)
|
|
|
|
|
|
|
|
|
|
|
|
if stats["columns_missing"]:
|
|
|
|
|
|
print(f"\n💡 提示: 以下列在 CSV 中未找到,已用空值填充:")
|
|
|
|
|
|
for m in stats["columns_missing"]:
|
|
|
|
|
|
print(f" - {m}")
|