工单导入及下载

This commit is contained in:
赵杰 Jie Zhao (雄狮汽车科技)
2025-09-19 19:58:54 +01:00
parent 45beca7f94
commit a884eafc64

View File

@@ -7,6 +7,8 @@
import os import os
import pandas as pd import pandas as pd
import logging import logging
import uuid
import time
from datetime import datetime from datetime import datetime
from flask import Blueprint, request, jsonify, send_file from flask import Blueprint, request, jsonify, send_file
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
@@ -52,29 +54,48 @@ def get_assistant():
def _ensure_workorder_template_file() -> str: def _ensure_workorder_template_file() -> str:
"""返回已有的模板xlsx路径不做动态生成避免运行时依赖问题""" """返回已有的模板xlsx路径不做动态生成避免运行时依赖问题"""
template_path = os.path.join('uploads', 'workorder_template.xlsx') # 获取项目根目录
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, '..', '..', '..'))
# 模板文件路径项目根目录下的uploads
template_path = os.path.join(project_root, 'uploads', 'workorder_template.xlsx')
# 确保目录存在 # 确保目录存在
os.makedirs('uploads', exist_ok=True) uploads_dir = os.path.join(project_root, 'uploads')
os.makedirs(uploads_dir, exist_ok=True)
if not os.path.exists(template_path): if not os.path.exists(template_path):
# 优先从项目根目录的 uploads 拷贝(仓库自带模板 # 尝试从其他可能的位置复制模板
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) possible_locations = [
repo_template = os.path.join(project_root, 'uploads', 'workorder_template.xlsx') os.path.join(project_root, 'uploads', 'workorder_template.xlsx'),
try: os.path.join(current_dir, 'uploads', 'workorder_template.xlsx'),
if os.path.exists(repo_template): os.path.join(os.getcwd(), 'uploads', 'workorder_template.xlsx')
import shutil ]
shutil.copyfile(repo_template, template_path)
else: source_found = False
# 仓库模板不存在时,自动生成一个最小可用模板 for source_path in possible_locations:
if os.path.exists(source_path):
try: try:
import pandas as pd import shutil
from pandas import DataFrame shutil.copyfile(source_path, template_path)
columns = ['标题', '描述', '分类', '优先级', '状态', '解决方案', '满意度'] source_found = True
df: DataFrame = pd.DataFrame(columns=columns) break
df.to_excel(template_path, index=False) except Exception as e:
except Exception as gen_err: logger.warning(f"复制模板文件失败: {e}")
raise FileNotFoundError('模板文件缺失且自动生成失败请检查依赖openpyxl/pandas') from gen_err
except Exception as copy_err: if not source_found:
raise copy_err # 自动生成一个最小可用模板
try:
import pandas as pd
from pandas import DataFrame
columns = ['标题', '描述', '分类', '优先级', '状态', '解决方案', '满意度']
df: DataFrame = pd.DataFrame(columns=columns)
df.to_excel(template_path, index=False)
logger.info(f"自动生成模板文件: {template_path}")
except Exception as gen_err:
raise FileNotFoundError('模板文件缺失且自动生成失败请检查依赖openpyxl/pandas') from gen_err
return template_path return template_path
@workorders_bp.route('') @workorders_bp.route('')
@@ -411,45 +432,58 @@ def import_workorders():
if not title or title.strip() == '': if not title or title.strip() == '':
continue continue
# 生成唯一的工单ID
timestamp = int(time.time())
unique_id = str(uuid.uuid4())[:8]
order_id = f"IMP_{timestamp}_{unique_id}"
# 创建工单到数据库 # 创建工单到数据库
with db_manager.get_session() as session: try:
workorder = WorkOrder( with db_manager.get_session() as session:
title=title, workorder = WorkOrder(
description=description, order_id=order_id,
category=category, title=title,
priority=priority, description=description,
status=status, category=category,
created_at=datetime.now(), priority=priority,
updated_at=datetime.now() status=status,
) created_at=datetime.now(),
updated_at=datetime.now()
# 处理可选字段 )
if pd.notna(row.get('解决方案', row.get('resolution'))):
workorder.resolution = str(row.get('解决方案', row.get('resolution'))) # 处理可选字段
if pd.notna(row.get('解决方案', row.get('resolution'))):
if pd.notna(row.get('满意度', row.get('satisfaction_score'))): workorder.resolution = str(row.get('解决方案', row.get('resolution')))
try:
workorder.satisfaction_score = int(row.get('满意度', row.get('satisfaction_score'))) if pd.notna(row.get('满意度', row.get('satisfaction_score'))):
except (ValueError, TypeError): try:
workorder.satisfaction_score = None workorder.satisfaction_score = int(row.get('满意度', row.get('satisfaction_score')))
except (ValueError, TypeError):
session.add(workorder) workorder.satisfaction_score = None
session.commit()
session.add(workorder)
# 添加到返回列表 session.commit()
imported_workorders.append({
"id": workorder.id, logger.info(f"成功导入工单: {order_id} - {title}")
"order_id": workorder.order_id,
"title": workorder.title, except Exception as db_error:
"description": workorder.description, logger.error(f"导入工单到数据库失败: {db_error}")
"category": workorder.category, continue
"priority": workorder.priority,
"status": workorder.status, # 添加到返回列表
"created_at": workorder.created_at.isoformat() if workorder.created_at else None, imported_workorders.append({
"updated_at": workorder.updated_at.isoformat() if workorder.updated_at else None, "id": workorder.id,
"resolution": workorder.resolution, "order_id": workorder.order_id,
"satisfaction_score": workorder.satisfaction_score "title": workorder.title,
}) "description": workorder.description,
"category": workorder.category,
"priority": workorder.priority,
"status": workorder.status,
"created_at": workorder.created_at.isoformat() if workorder.created_at else None,
"updated_at": workorder.updated_at.isoformat() if workorder.updated_at else None,
"resolution": workorder.resolution,
"satisfaction_score": workorder.satisfaction_score
})
# 清理上传的文件 # 清理上传的文件
os.remove(upload_path) os.remove(upload_path)
@@ -488,11 +522,27 @@ def download_import_template_file():
"""直接返回工单导入模板文件(下载)""" """直接返回工单导入模板文件(下载)"""
try: try:
template_path = _ensure_workorder_template_file() template_path = _ensure_workorder_template_file()
# 检查文件是否存在
if not os.path.exists(template_path):
logger.error(f"模板文件不存在: {template_path}")
return jsonify({"error": "模板文件不存在"}), 404
# 检查文件大小
file_size = os.path.getsize(template_path)
if file_size == 0:
logger.error(f"模板文件为空: {template_path}")
return jsonify({"error": "模板文件为空"}), 500
logger.info(f"准备下载模板文件: {template_path}, 大小: {file_size} bytes")
try: try:
# Flask>=2 使用 download_name # Flask>=2 使用 download_name
return send_file(template_path, as_attachment=True, download_name='工单导入模板.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') return send_file(template_path, as_attachment=True, download_name='工单导入模板.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
except TypeError: except TypeError:
# 兼容 Flask<2 的 attachment_filename # 兼容 Flask<2 的 attachment_filename
return send_file(template_path, as_attachment=True, attachment_filename='工单导入模板.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') return send_file(template_path, as_attachment=True, attachment_filename='工单导入模板.xlsx', mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
except Exception as e: except Exception as e:
return jsonify({"error": str(e)}), 500 logger.error(f"下载模板文件失败: {e}")
return jsonify({"error": f"下载失败: {str(e)}"}), 500