修复AI建议逻辑和字段映射问题
- 修复AI建议基于问题描述而不是处理过程生成 - 修复工单详情页面显示逻辑 - 修复飞书时间字段处理(毫秒时间戳转换) - 优化字段映射和转换逻辑 - 添加飞书集成功能 - 改进对话历史合并功能 - 优化系统优化反馈机制
This commit is contained in:
5
src/integrations/__init__.py
Normal file
5
src/integrations/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
集成模块
|
||||
处理与外部系统的集成,如飞书、钉钉等
|
||||
"""
|
||||
171
src/integrations/ai_suggestion_service.py
Normal file
171
src/integrations/ai_suggestion_service.py
Normal file
@@ -0,0 +1,171 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
AI建议服务
|
||||
基于TR描述、知识库和VIN查询生成AI建议
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Any
|
||||
from src.knowledge_base.knowledge_manager import KnowledgeManager
|
||||
from src.vehicle.vehicle_data_manager import VehicleDataManager
|
||||
from src.agent.llm_client import LLMManager, LLMConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class AISuggestionService:
|
||||
"""AI建议服务"""
|
||||
|
||||
def __init__(self):
|
||||
self.knowledge_manager = KnowledgeManager()
|
||||
self.vehicle_manager = VehicleDataManager()
|
||||
|
||||
# 初始化LLM客户端
|
||||
try:
|
||||
llm_config = LLMConfig(
|
||||
provider="openai",
|
||||
api_key="your-api-key", # 这里需要从配置文件读取
|
||||
model="gpt-3.5-turbo",
|
||||
temperature=0.7,
|
||||
max_tokens=1000
|
||||
)
|
||||
self.llm_manager = LLMManager(llm_config)
|
||||
except Exception as e:
|
||||
logger.warning(f"LLM客户端初始化失败: {e}")
|
||||
self.llm_manager = None
|
||||
|
||||
def generate_suggestion(self, tr_description: str, vin: Optional[str] = None) -> str:
|
||||
"""
|
||||
生成AI建议
|
||||
|
||||
Args:
|
||||
tr_description: TR描述
|
||||
vin: 车架号(可选)
|
||||
|
||||
Returns:
|
||||
AI建议文本
|
||||
"""
|
||||
try:
|
||||
# 1. 从知识库搜索相关信息
|
||||
knowledge_results = self.knowledge_manager.search_knowledge(
|
||||
query=tr_description,
|
||||
top_k=5
|
||||
)
|
||||
|
||||
# 2. 如果有VIN,查询车辆信息
|
||||
vehicle_info = ""
|
||||
if vin:
|
||||
try:
|
||||
vehicle_data = self.vehicle_manager.get_latest_vehicle_data_by_vin(vin)
|
||||
if vehicle_data:
|
||||
vehicle_info = f"车辆信息:{vehicle_data.get('model', '未知车型')},里程:{vehicle_data.get('mileage', '未知')}km"
|
||||
except Exception as e:
|
||||
logger.warning(f"查询车辆信息失败: {e}")
|
||||
|
||||
# 3. 构建提示词
|
||||
context_parts = []
|
||||
|
||||
# 添加知识库信息
|
||||
if knowledge_results:
|
||||
knowledge_text = "\n".join([
|
||||
f"- {item.get('question', '')}: {item.get('answer', '')}"
|
||||
for item in knowledge_results
|
||||
])
|
||||
context_parts.append(f"相关知识库信息:\n{knowledge_text}")
|
||||
|
||||
# 添加车辆信息
|
||||
if vehicle_info:
|
||||
context_parts.append(vehicle_info)
|
||||
|
||||
context = "\n\n".join(context_parts) if context_parts else "无相关背景信息"
|
||||
|
||||
# 4. 生成AI建议
|
||||
prompt = f"""
|
||||
作为技术支持专家,请基于以下问题描述为工单提供专业的处理建议:
|
||||
|
||||
问题描述:{tr_description}
|
||||
|
||||
相关背景信息:
|
||||
{context}
|
||||
|
||||
请提供:
|
||||
1. 问题分析
|
||||
2. 建议的解决步骤
|
||||
3. 注意事项
|
||||
4. 如果问题无法解决,建议的后续行动
|
||||
|
||||
请用中文回答,简洁明了。
|
||||
"""
|
||||
|
||||
if self.llm_manager:
|
||||
import asyncio
|
||||
response = asyncio.run(self.llm_manager.generate(prompt))
|
||||
return response
|
||||
else:
|
||||
return "AI建议生成失败,LLM客户端未初始化。"
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"生成AI建议失败: {e}")
|
||||
return f"AI建议生成失败:{str(e)}"
|
||||
|
||||
def batch_generate_suggestions(self, records: List[Dict[str, Any]], limit: int = 10) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
批量生成AI建议
|
||||
|
||||
Args:
|
||||
records: 记录列表
|
||||
limit: 处理数量限制
|
||||
|
||||
Returns:
|
||||
处理后的记录列表
|
||||
"""
|
||||
processed_records = []
|
||||
|
||||
for i, record in enumerate(records[:limit]):
|
||||
try:
|
||||
tr_description = record.get("fields", {}).get("TR Description", "")
|
||||
vin = self._extract_vin_from_description(tr_description)
|
||||
|
||||
if tr_description:
|
||||
ai_suggestion = self.generate_suggestion(tr_description, vin)
|
||||
record["ai_suggestion"] = ai_suggestion
|
||||
logger.info(f"为记录 {record.get('record_id', i)} 生成AI建议")
|
||||
else:
|
||||
record["ai_suggestion"] = "无TR描述,无法生成建议"
|
||||
|
||||
processed_records.append(record)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理记录 {record.get('record_id', i)} 失败: {e}")
|
||||
record["ai_suggestion"] = f"处理失败:{str(e)}"
|
||||
processed_records.append(record)
|
||||
|
||||
return processed_records
|
||||
|
||||
def _extract_vin_from_description(self, description: str) -> Optional[str]:
|
||||
"""
|
||||
从描述中提取VIN
|
||||
|
||||
Args:
|
||||
description: TR描述
|
||||
|
||||
Returns:
|
||||
提取的VIN或None
|
||||
"""
|
||||
import re
|
||||
|
||||
# VIN通常是17位字符,包含数字和大写字母
|
||||
vin_pattern = r'\b[A-HJ-NPR-Z0-9]{17}\b'
|
||||
matches = re.findall(vin_pattern, description.upper())
|
||||
|
||||
if matches:
|
||||
return matches[0]
|
||||
|
||||
# 也尝试查找"VIN:"或"车架号:"后的内容
|
||||
vin_keywords = [r'VIN[:\s]+([A-HJ-NPR-Z0-9]{17})', r'车架号[:\s]+([A-HJ-NPR-Z0-9]{17})']
|
||||
|
||||
for pattern in vin_keywords:
|
||||
match = re.search(pattern, description.upper())
|
||||
if match:
|
||||
return match.group(1)
|
||||
|
||||
return None
|
||||
227
src/integrations/config_manager.py
Normal file
227
src/integrations/config_manager.py
Normal file
@@ -0,0 +1,227 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
配置管理器
|
||||
管理飞书等外部系统的配置信息,支持持久化存储和并发访问
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import logging
|
||||
from typing import Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ConfigManager:
|
||||
"""配置管理器"""
|
||||
|
||||
_instance = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __new__(cls):
|
||||
"""单例模式"""
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super(ConfigManager, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
self._config_lock = threading.RLock()
|
||||
self.config_file = Path("config/integrations_config.json")
|
||||
self.config_file.parent.mkdir(exist_ok=True)
|
||||
|
||||
# 默认配置
|
||||
self.default_config = {
|
||||
"feishu": {
|
||||
"app_id": "",
|
||||
"app_secret": "",
|
||||
"app_token": "",
|
||||
"table_id": "",
|
||||
"last_updated": None,
|
||||
"status": "inactive"
|
||||
},
|
||||
"system": {
|
||||
"sync_limit": 10,
|
||||
"ai_suggestions_enabled": True,
|
||||
"auto_sync_interval": 0, # 0表示不自动同步
|
||||
"last_sync_time": None
|
||||
}
|
||||
}
|
||||
|
||||
self._load_config()
|
||||
self._initialized = True
|
||||
|
||||
def _load_config(self):
|
||||
"""加载配置文件"""
|
||||
try:
|
||||
if self.config_file.exists():
|
||||
with open(self.config_file, 'r', encoding='utf-8') as f:
|
||||
loaded_config = json.load(f)
|
||||
# 合并默认配置和加载的配置
|
||||
self.config = self._merge_configs(self.default_config, loaded_config)
|
||||
else:
|
||||
self.config = self.default_config.copy()
|
||||
self._save_config()
|
||||
logger.info("配置加载成功")
|
||||
except Exception as e:
|
||||
logger.error(f"配置加载失败: {e}")
|
||||
self.config = self.default_config.copy()
|
||||
|
||||
def _save_config(self):
|
||||
"""保存配置文件"""
|
||||
try:
|
||||
with self._config_lock:
|
||||
with open(self.config_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.config, f, ensure_ascii=False, indent=2)
|
||||
logger.info("配置保存成功")
|
||||
except Exception as e:
|
||||
logger.error(f"配置保存失败: {e}")
|
||||
|
||||
def _merge_configs(self, default: Dict, loaded: Dict) -> Dict:
|
||||
"""合并配置,确保所有必要的键都存在"""
|
||||
result = default.copy()
|
||||
for key, value in loaded.items():
|
||||
if isinstance(value, dict) and key in result:
|
||||
result[key] = self._merge_configs(result[key], value)
|
||||
else:
|
||||
result[key] = value
|
||||
return result
|
||||
|
||||
def get_feishu_config(self) -> Dict[str, Any]:
|
||||
"""获取飞书配置"""
|
||||
with self._config_lock:
|
||||
return self.config.get("feishu", {}).copy()
|
||||
|
||||
def update_feishu_config(self, **kwargs) -> bool:
|
||||
"""更新飞书配置"""
|
||||
try:
|
||||
with self._config_lock:
|
||||
feishu_config = self.config.setdefault("feishu", {})
|
||||
|
||||
# 更新配置项
|
||||
for key, value in kwargs.items():
|
||||
if key in ["app_id", "app_secret", "app_token", "table_id"]:
|
||||
feishu_config[key] = value
|
||||
|
||||
# 更新状态和时间戳
|
||||
feishu_config["last_updated"] = datetime.now().isoformat()
|
||||
feishu_config["status"] = "active" if all([
|
||||
feishu_config.get("app_id"),
|
||||
feishu_config.get("app_secret"),
|
||||
feishu_config.get("app_token"),
|
||||
feishu_config.get("table_id")
|
||||
]) else "inactive"
|
||||
|
||||
self._save_config()
|
||||
logger.info("飞书配置更新成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"飞书配置更新失败: {e}")
|
||||
return False
|
||||
|
||||
def get_system_config(self) -> Dict[str, Any]:
|
||||
"""获取系统配置"""
|
||||
with self._config_lock:
|
||||
return self.config.get("system", {}).copy()
|
||||
|
||||
def update_system_config(self, **kwargs) -> bool:
|
||||
"""更新系统配置"""
|
||||
try:
|
||||
with self._config_lock:
|
||||
system_config = self.config.setdefault("system", {})
|
||||
|
||||
for key, value in kwargs.items():
|
||||
if key in ["sync_limit", "ai_suggestions_enabled", "auto_sync_interval"]:
|
||||
system_config[key] = value
|
||||
|
||||
self._save_config()
|
||||
logger.info("系统配置更新成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"系统配置更新失败: {e}")
|
||||
return False
|
||||
|
||||
def test_feishu_connection(self) -> Dict[str, Any]:
|
||||
"""测试飞书连接"""
|
||||
try:
|
||||
from .feishu_client import FeishuClient
|
||||
|
||||
feishu_config = self.get_feishu_config()
|
||||
if not all([feishu_config.get("app_id"), feishu_config.get("app_secret")]):
|
||||
return {"success": False, "error": "飞书配置不完整"}
|
||||
|
||||
client = FeishuClient(feishu_config["app_id"], feishu_config["app_secret"])
|
||||
|
||||
# 测试获取访问令牌
|
||||
token = client._get_access_token()
|
||||
if token:
|
||||
return {"success": True, "message": "飞书连接正常"}
|
||||
else:
|
||||
return {"success": False, "error": "无法获取访问令牌"}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"飞书连接测试失败: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
def get_config_summary(self) -> Dict[str, Any]:
|
||||
"""获取配置摘要"""
|
||||
with self._config_lock:
|
||||
feishu_config = self.config.get("feishu", {})
|
||||
system_config = self.config.get("system", {})
|
||||
|
||||
return {
|
||||
"feishu": {
|
||||
"app_id": feishu_config.get("app_id", ""),
|
||||
"app_token": feishu_config.get("app_token", ""),
|
||||
"table_id": feishu_config.get("table_id", ""),
|
||||
"status": feishu_config.get("status", "inactive"),
|
||||
"last_updated": feishu_config.get("last_updated"),
|
||||
"app_secret": "***" if feishu_config.get("app_secret") else ""
|
||||
},
|
||||
"system": {
|
||||
"sync_limit": system_config.get("sync_limit", 10),
|
||||
"ai_suggestions_enabled": system_config.get("ai_suggestions_enabled", True),
|
||||
"auto_sync_interval": system_config.get("auto_sync_interval", 0),
|
||||
"last_sync_time": system_config.get("last_sync_time")
|
||||
}
|
||||
}
|
||||
|
||||
def reset_config(self) -> bool:
|
||||
"""重置配置为默认值"""
|
||||
try:
|
||||
with self._config_lock:
|
||||
self.config = self.default_config.copy()
|
||||
self._save_config()
|
||||
logger.info("配置重置成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"配置重置失败: {e}")
|
||||
return False
|
||||
|
||||
def export_config(self) -> str:
|
||||
"""导出配置(用于备份)"""
|
||||
with self._config_lock:
|
||||
return json.dumps(self.config, ensure_ascii=False, indent=2)
|
||||
|
||||
def import_config(self, config_json: str) -> bool:
|
||||
"""导入配置(用于恢复)"""
|
||||
try:
|
||||
imported_config = json.loads(config_json)
|
||||
with self._config_lock:
|
||||
self.config = self._merge_configs(self.default_config, imported_config)
|
||||
self._save_config()
|
||||
logger.info("配置导入成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"配置导入失败: {e}")
|
||||
return False
|
||||
|
||||
# 全局配置管理器实例
|
||||
config_manager = ConfigManager()
|
||||
293
src/integrations/feishu_client.py
Normal file
293
src/integrations/feishu_client.py
Normal file
@@ -0,0 +1,293 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
飞书API客户端
|
||||
支持多维表格数据读取和更新
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
from typing import Dict, List, Optional, Any
|
||||
from datetime import datetime
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class FeishuClient:
|
||||
"""飞书API客户端"""
|
||||
|
||||
def __init__(self, app_id: str, app_secret: str):
|
||||
"""
|
||||
初始化飞书客户端
|
||||
|
||||
Args:
|
||||
app_id: 飞书应用ID
|
||||
app_secret: 飞书应用密钥
|
||||
"""
|
||||
self.app_id = app_id
|
||||
self.app_secret = app_secret
|
||||
self.base_url = "https://open.feishu.cn/open-apis"
|
||||
self.access_token = None
|
||||
self.token_expires_at = 0
|
||||
|
||||
def _get_access_token(self) -> str:
|
||||
"""获取访问令牌 - 使用tenant_access_token"""
|
||||
# 检查当前token是否还有效(提前5分钟刷新)
|
||||
if self.access_token and time.time() < (self.token_expires_at - 300):
|
||||
logger.debug(f"使用缓存的访问令牌: {self.access_token[:20]}...")
|
||||
return self.access_token
|
||||
|
||||
url = f"{self.base_url}/auth/v3/tenant_access_token/internal/"
|
||||
data = {
|
||||
"app_id": self.app_id,
|
||||
"app_secret": self.app_secret
|
||||
}
|
||||
|
||||
try:
|
||||
logger.info(f"正在获取飞书tenant_access_token,应用ID: {self.app_id}")
|
||||
response = requests.post(url, json=data, timeout=10)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
logger.info(f"飞书API响应: {result}")
|
||||
|
||||
if result.get("code") == 0:
|
||||
self.access_token = result["tenant_access_token"]
|
||||
# 设置过期时间,提前5分钟刷新
|
||||
expire_time = result.get("expire", 7200) # 默认2小时
|
||||
self.token_expires_at = time.time() + expire_time
|
||||
|
||||
logger.info(f"tenant_access_token获取成功: {self.access_token[:20]}...")
|
||||
logger.info(f"令牌有效期: {expire_time}秒,过期时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.token_expires_at))}")
|
||||
return self.access_token
|
||||
else:
|
||||
error_msg = f"获取tenant_access_token失败: {result.get('msg', '未知错误')}"
|
||||
logger.error(error_msg)
|
||||
raise Exception(error_msg)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取飞书访问令牌失败: {e}")
|
||||
raise
|
||||
|
||||
def _make_request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
|
||||
"""发送API请求"""
|
||||
headers = kwargs.get('headers', {})
|
||||
token = self._get_access_token()
|
||||
|
||||
# 确保Authorization头格式正确:Bearer <token>
|
||||
headers.update({
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json; charset=utf-8"
|
||||
})
|
||||
kwargs['headers'] = headers
|
||||
|
||||
try:
|
||||
logger.info(f"发送飞书API请求: {method} {url}")
|
||||
logger.info(f"请求头: Authorization: Bearer {token[:20]}...")
|
||||
|
||||
response = requests.request(method, url, timeout=30, **kwargs)
|
||||
logger.info(f"飞书API响应状态码: {response.status_code}")
|
||||
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
logger.info(f"飞书API响应内容: {result}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"飞书API请求失败: {e}")
|
||||
logger.error(f"请求URL: {url}")
|
||||
logger.error(f"请求方法: {method}")
|
||||
logger.error(f"请求头: {headers}")
|
||||
raise
|
||||
|
||||
def get_table_records(self, app_token: str, table_id: str, view_id: Optional[str] = None,
|
||||
page_size: int = 500, page_token: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
获取多维表格记录
|
||||
|
||||
Args:
|
||||
app_token: 多维表格应用token
|
||||
table_id: 表格ID
|
||||
view_id: 视图ID(可选)
|
||||
page_size: 每页记录数
|
||||
page_token: 分页令牌
|
||||
|
||||
Returns:
|
||||
包含记录数据的字典
|
||||
"""
|
||||
url = f"{self.base_url}/bitable/v1/apps/{app_token}/tables/{table_id}/records"
|
||||
|
||||
params = {
|
||||
"page_size": page_size
|
||||
}
|
||||
if view_id:
|
||||
params["view_id"] = view_id
|
||||
if page_token:
|
||||
params["page_token"] = page_token
|
||||
|
||||
return self._make_request("GET", url, params=params)
|
||||
|
||||
def get_all_table_records(self, app_token: str, table_id: str, view_id: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取表格所有记录(自动分页)
|
||||
|
||||
Args:
|
||||
app_token: 多维表格应用token
|
||||
table_id: 表格ID
|
||||
view_id: 视图ID(可选)
|
||||
|
||||
Returns:
|
||||
所有记录的列表
|
||||
"""
|
||||
all_records = []
|
||||
page_token = None
|
||||
|
||||
while True:
|
||||
result = self.get_table_records(app_token, table_id, view_id, page_token=page_token)
|
||||
|
||||
if result.get("code") != 0:
|
||||
raise Exception(f"获取表格记录失败: {result.get('msg', '未知错误')}")
|
||||
|
||||
records = result.get("data", {}).get("items", [])
|
||||
all_records.extend(records)
|
||||
|
||||
# 检查是否有下一页
|
||||
page_token = result.get("data", {}).get("page_token")
|
||||
if not page_token:
|
||||
break
|
||||
|
||||
return all_records
|
||||
|
||||
def update_table_record(self, app_token: str, table_id: str, record_id: str,
|
||||
fields: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
更新表格记录
|
||||
|
||||
Args:
|
||||
app_token: 多维表格应用token
|
||||
table_id: 表格ID
|
||||
record_id: 记录ID
|
||||
fields: 要更新的字段
|
||||
|
||||
Returns:
|
||||
更新结果
|
||||
"""
|
||||
url = f"{self.base_url}/bitable/v1/apps/{app_token}/tables/{table_id}/records/{record_id}"
|
||||
|
||||
data = {
|
||||
"fields": fields
|
||||
}
|
||||
|
||||
return self._make_request("PUT", url, json=data)
|
||||
|
||||
def test_connection(self) -> Dict[str, Any]:
|
||||
"""
|
||||
测试飞书连接
|
||||
|
||||
Returns:
|
||||
连接测试结果
|
||||
"""
|
||||
try:
|
||||
# 尝试获取访问令牌
|
||||
token = self._get_access_token()
|
||||
|
||||
# 验证token格式(应该以t-开头)
|
||||
if not token.startswith('t-'):
|
||||
logger.warning(f"获取的token格式异常,应该以't-'开头: {token[:20]}...")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "飞书连接测试成功",
|
||||
"token_prefix": token[:20] + "...",
|
||||
"token_expires_at": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.token_expires_at))
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"飞书连接测试失败: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"飞书连接测试失败: {str(e)}"
|
||||
}
|
||||
|
||||
def create_table_record(self, app_token: str, table_id: str,
|
||||
fields: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
创建表格记录
|
||||
|
||||
Args:
|
||||
app_token: 多维表格应用token
|
||||
table_id: 表格ID
|
||||
fields: 记录字段
|
||||
|
||||
Returns:
|
||||
创建结果
|
||||
"""
|
||||
url = f"{self.base_url}/bitable/v1/apps/{app_token}/tables/{table_id}/records"
|
||||
|
||||
data = {
|
||||
"fields": fields
|
||||
}
|
||||
|
||||
return self._make_request("POST", url, json=data)
|
||||
|
||||
def get_table_record(self, app_token: str, table_id: str, record_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
获取单条多维表格记录
|
||||
|
||||
Args:
|
||||
app_token: 应用token
|
||||
table_id: 表格ID
|
||||
record_id: 记录ID
|
||||
|
||||
Returns:
|
||||
记录数据
|
||||
"""
|
||||
url = f"{self.base_url}/bitable/v1/apps/{app_token}/tables/{table_id}/records/{record_id}"
|
||||
|
||||
return self._make_request("GET", url)
|
||||
|
||||
def get_table_fields(self, app_token: str, table_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
获取表格字段信息
|
||||
|
||||
Args:
|
||||
app_token: 多维表格应用token
|
||||
table_id: 表格ID
|
||||
|
||||
Returns:
|
||||
字段信息
|
||||
"""
|
||||
url = f"{self.base_url}/bitable/v1/apps/{app_token}/tables/{table_id}/fields"
|
||||
|
||||
return self._make_request("GET", url)
|
||||
|
||||
def parse_record_fields(self, record: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
解析记录字段,将飞书格式转换为标准格式
|
||||
|
||||
Args:
|
||||
record: 飞书记录
|
||||
|
||||
Returns:
|
||||
解析后的字段字典
|
||||
"""
|
||||
fields = record.get("fields", {})
|
||||
parsed = {}
|
||||
|
||||
for key, value in fields.items():
|
||||
if isinstance(value, dict):
|
||||
# 处理复杂字段类型
|
||||
if "text" in value:
|
||||
parsed[key] = value["text"]
|
||||
elif "number" in value:
|
||||
parsed[key] = value["number"]
|
||||
elif "date" in value:
|
||||
parsed[key] = value["date"]
|
||||
elif "select" in value:
|
||||
parsed[key] = value["select"]["name"] if isinstance(value["select"], dict) else value["select"]
|
||||
elif "multi_select" in value:
|
||||
parsed[key] = [item["name"] if isinstance(item, dict) else item for item in value["multi_select"]]
|
||||
else:
|
||||
parsed[key] = str(value)
|
||||
else:
|
||||
parsed[key] = value
|
||||
|
||||
return parsed
|
||||
461
src/integrations/workorder_sync.py
Normal file
461
src/integrations/workorder_sync.py
Normal file
@@ -0,0 +1,461 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
工单同步服务
|
||||
实现飞书多维表格与本地工单系统的双向同步
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Any
|
||||
from datetime import datetime
|
||||
from src.integrations.feishu_client import FeishuClient
|
||||
from src.integrations.ai_suggestion_service import AISuggestionService
|
||||
from src.core.database import db_manager
|
||||
from src.core.models import WorkOrder
|
||||
# 工单状态和优先级枚举
|
||||
class WorkOrderStatus:
|
||||
PENDING = "pending"
|
||||
IN_PROGRESS = "in_progress"
|
||||
COMPLETED = "completed"
|
||||
CLOSED = "closed"
|
||||
|
||||
class WorkOrderPriority:
|
||||
LOW = "low"
|
||||
MEDIUM = "medium"
|
||||
HIGH = "high"
|
||||
URGENT = "urgent"
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class WorkOrderSyncService:
|
||||
"""工单同步服务"""
|
||||
|
||||
def __init__(self, feishu_client: FeishuClient, app_token: str, table_id: str):
|
||||
"""
|
||||
初始化同步服务
|
||||
|
||||
Args:
|
||||
feishu_client: 飞书客户端
|
||||
app_token: 多维表格应用token
|
||||
table_id: 表格ID
|
||||
"""
|
||||
self.feishu_client = feishu_client
|
||||
self.app_token = app_token
|
||||
self.table_id = table_id
|
||||
self.ai_service = AISuggestionService()
|
||||
|
||||
# 字段映射配置 - 根据实际飞书表格结构
|
||||
self.field_mapping = {
|
||||
# 飞书字段名 -> 本地字段名
|
||||
"TR Number": "order_id", # TR编号映射到工单号
|
||||
"TR Description": "title", # TR描述作为标题(问题描述)
|
||||
"Type of problem": "category", # 问题类型作为分类
|
||||
"TR Level": "priority", # TR Level作为优先级
|
||||
"TR Status": "status", # TR Status作为状态(修正字段名)
|
||||
"Source": "assignee", # 来源信息
|
||||
"Date creation": "created_at", # 创建日期
|
||||
"处理过程": "description", # 处理过程作为描述
|
||||
"TR tracking": "solution", # TR跟踪作为解决方案
|
||||
"AI建议": "ai_suggestion", # AI建议字段
|
||||
"Issue Start Time": "updated_at" # 问题开始时间作为更新时间
|
||||
}
|
||||
|
||||
# 状态映射 - 根据飞书表格中的实际值
|
||||
self.status_mapping = {
|
||||
"close": WorkOrderStatus.CLOSED, # 已关闭
|
||||
"temporary close": WorkOrderStatus.IN_PROGRESS, # 临时关闭对应处理中
|
||||
"OTA": WorkOrderStatus.IN_PROGRESS, # OTA状态对应处理中
|
||||
"open": WorkOrderStatus.PENDING, # 开放状态对应待处理
|
||||
"pending": WorkOrderStatus.PENDING, # 待处理
|
||||
"completed": WorkOrderStatus.COMPLETED # 已完成
|
||||
}
|
||||
|
||||
# 优先级映射 - 根据飞书表格中的实际值
|
||||
self.priority_mapping = {
|
||||
"Low": WorkOrderPriority.LOW,
|
||||
"Medium": WorkOrderPriority.MEDIUM,
|
||||
"High": WorkOrderPriority.HIGH,
|
||||
"Urgent": WorkOrderPriority.URGENT
|
||||
}
|
||||
|
||||
def sync_from_feishu(self, generate_ai_suggestions: bool = True, limit: int = 10) -> Dict[str, Any]:
|
||||
"""
|
||||
从飞书同步数据到本地系统
|
||||
|
||||
Args:
|
||||
generate_ai_suggestions: 是否生成AI建议
|
||||
limit: 处理记录数量限制
|
||||
|
||||
Returns:
|
||||
同步结果统计
|
||||
"""
|
||||
try:
|
||||
logger.info("开始从飞书同步工单数据...")
|
||||
|
||||
# 获取飞书表格记录(限制数量)
|
||||
records = self.feishu_client.get_table_records(self.app_token, self.table_id, page_size=limit)
|
||||
|
||||
if records.get("code") != 0:
|
||||
raise Exception(f"获取飞书记录失败: {records.get('msg', '未知错误')}")
|
||||
|
||||
items = records.get("data", {}).get("items", [])
|
||||
logger.info(f"从飞书获取到 {len(items)} 条记录")
|
||||
|
||||
# 生成AI建议
|
||||
if generate_ai_suggestions:
|
||||
logger.info("开始生成AI建议...")
|
||||
items = self.ai_service.batch_generate_suggestions(items, limit)
|
||||
|
||||
# 将AI建议更新回飞书表格
|
||||
for item in items:
|
||||
if "ai_suggestion" in item:
|
||||
try:
|
||||
self.feishu_client.update_table_record(
|
||||
self.app_token,
|
||||
self.table_id,
|
||||
item["record_id"],
|
||||
{"AI建议": item["ai_suggestion"]}
|
||||
)
|
||||
logger.info(f"更新飞书记录 {item['record_id']} 的AI建议")
|
||||
except Exception as e:
|
||||
logger.error(f"更新飞书AI建议失败: {e}")
|
||||
|
||||
synced_count = 0
|
||||
updated_count = 0
|
||||
created_count = 0
|
||||
errors = []
|
||||
|
||||
with db_manager.get_session() as session:
|
||||
for record in items:
|
||||
try:
|
||||
# 解析飞书记录
|
||||
parsed_fields = self.feishu_client.parse_record_fields(record)
|
||||
feishu_id = record.get("record_id")
|
||||
|
||||
# 查找本地是否存在对应记录
|
||||
existing_workorder = session.query(WorkOrder).filter(
|
||||
WorkOrder.feishu_record_id == feishu_id
|
||||
).first()
|
||||
|
||||
# 转换为本地工单格式
|
||||
workorder_data = self._convert_feishu_to_local(parsed_fields)
|
||||
workorder_data["feishu_record_id"] = feishu_id
|
||||
|
||||
if existing_workorder:
|
||||
# 更新现有记录
|
||||
for key, value in workorder_data.items():
|
||||
if key != "feishu_record_id":
|
||||
setattr(existing_workorder, key, value)
|
||||
existing_workorder.updated_at = datetime.now()
|
||||
updated_count += 1
|
||||
else:
|
||||
# 创建新记录
|
||||
workorder_data["created_at"] = datetime.now()
|
||||
workorder_data["updated_at"] = datetime.now()
|
||||
new_workorder = WorkOrder(**workorder_data)
|
||||
session.add(new_workorder)
|
||||
created_count += 1
|
||||
|
||||
synced_count += 1
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"处理记录 {record.get('record_id', 'unknown')} 失败: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
errors.append(error_msg)
|
||||
|
||||
session.commit()
|
||||
|
||||
result = {
|
||||
"success": True,
|
||||
"total_records": len(items),
|
||||
"synced_count": synced_count,
|
||||
"created_count": created_count,
|
||||
"updated_count": updated_count,
|
||||
"ai_suggestions_generated": generate_ai_suggestions,
|
||||
"errors": errors
|
||||
}
|
||||
|
||||
logger.info(f"飞书同步完成: {result}")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"飞书同步失败: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def sync_to_feishu(self, workorder_id: int) -> Dict[str, Any]:
|
||||
"""
|
||||
将本地工单同步到飞书
|
||||
|
||||
Args:
|
||||
workorder_id: 工单ID
|
||||
|
||||
Returns:
|
||||
同步结果
|
||||
"""
|
||||
try:
|
||||
with db_manager.get_session() as session:
|
||||
workorder = session.query(WorkOrder).filter(WorkOrder.id == workorder_id).first()
|
||||
if not workorder:
|
||||
return {"success": False, "error": "工单不存在"}
|
||||
|
||||
# 转换为飞书格式
|
||||
feishu_fields = self._convert_local_to_feishu(workorder)
|
||||
|
||||
if workorder.feishu_record_id:
|
||||
# 更新飞书记录
|
||||
result = self.feishu_client.update_table_record(
|
||||
self.app_token, self.table_id, workorder.feishu_record_id, feishu_fields
|
||||
)
|
||||
else:
|
||||
# 创建新飞书记录
|
||||
result = self.feishu_client.create_table_record(
|
||||
self.app_token, self.table_id, feishu_fields
|
||||
)
|
||||
|
||||
if result.get("code") == 0:
|
||||
# 保存飞书记录ID到本地
|
||||
workorder.feishu_record_id = result["data"]["record"]["record_id"]
|
||||
session.commit()
|
||||
|
||||
if result.get("code") == 0:
|
||||
return {"success": True, "message": "同步成功"}
|
||||
else:
|
||||
return {"success": False, "error": result.get("msg", "同步失败")}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"同步到飞书失败: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
def create_workorder_from_feishu_record(self, record_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
从飞书单条记录创建工单
|
||||
|
||||
Args:
|
||||
record_id: 飞书记录ID
|
||||
|
||||
Returns:
|
||||
创建结果
|
||||
"""
|
||||
try:
|
||||
logger.info(f"从飞书记录 {record_id} 创建工单")
|
||||
|
||||
# 获取单条飞书记录
|
||||
feishu_data = self.feishu_client.get_table_record(
|
||||
self.app_token,
|
||||
self.table_id,
|
||||
record_id
|
||||
)
|
||||
|
||||
if feishu_data.get("code") != 0:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"获取飞书记录失败: {feishu_data.get('msg', '未知错误')}"
|
||||
}
|
||||
|
||||
record = feishu_data.get("data", {}).get("record")
|
||||
if not record:
|
||||
return {
|
||||
"success": False,
|
||||
"message": "飞书记录不存在"
|
||||
}
|
||||
|
||||
fields = record.get("fields", {})
|
||||
|
||||
# 转换为本地工单格式
|
||||
local_data = self._convert_feishu_to_local(fields)
|
||||
local_data["feishu_record_id"] = record_id
|
||||
|
||||
# 检查是否已存在
|
||||
existing_workorder = self._find_existing_workorder(record_id)
|
||||
|
||||
if existing_workorder:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"工单已存在: {existing_workorder.order_id}"
|
||||
}
|
||||
|
||||
# 创建新工单
|
||||
workorder = self._create_workorder(local_data)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"工单创建成功: {local_data.get('order_id')}",
|
||||
"workorder_id": workorder.id,
|
||||
"order_id": local_data.get('order_id')
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"从飞书记录创建工单失败: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"创建工单失败: {str(e)}"
|
||||
}
|
||||
|
||||
def _find_existing_workorder(self, feishu_record_id: str) -> Optional[WorkOrder]:
|
||||
"""查找已存在的工单"""
|
||||
try:
|
||||
with db_manager.get_session() as session:
|
||||
return session.query(WorkOrder).filter(
|
||||
WorkOrder.feishu_record_id == feishu_record_id
|
||||
).first()
|
||||
except Exception as e:
|
||||
logger.error(f"查找现有工单失败: {e}")
|
||||
return None
|
||||
|
||||
def _create_workorder(self, local_data: Dict[str, Any]) -> WorkOrder:
|
||||
"""创建新工单"""
|
||||
try:
|
||||
with db_manager.get_session() as session:
|
||||
workorder = WorkOrder(
|
||||
order_id=local_data.get("order_id"),
|
||||
title=local_data.get("title"),
|
||||
description=local_data.get("description"),
|
||||
category=local_data.get("category"),
|
||||
priority=local_data.get("priority"),
|
||||
status=local_data.get("status"),
|
||||
created_at=local_data.get("created_at"),
|
||||
updated_at=local_data.get("updated_at"),
|
||||
resolution=local_data.get("solution"),
|
||||
feishu_record_id=local_data.get("feishu_record_id"),
|
||||
assignee=local_data.get("assignee"),
|
||||
solution=local_data.get("solution"),
|
||||
ai_suggestion=local_data.get("ai_suggestion")
|
||||
)
|
||||
session.add(workorder)
|
||||
session.commit()
|
||||
session.refresh(workorder)
|
||||
logger.info(f"创建工单成功: {workorder.order_id}")
|
||||
return workorder
|
||||
except Exception as e:
|
||||
logger.error(f"创建工单失败: {e}")
|
||||
raise
|
||||
|
||||
def _update_workorder(self, workorder: WorkOrder, local_data: Dict[str, Any]) -> WorkOrder:
|
||||
"""更新现有工单"""
|
||||
try:
|
||||
with db_manager.get_session() as session:
|
||||
workorder.title = local_data.get("title", workorder.title)
|
||||
workorder.description = local_data.get("description", workorder.description)
|
||||
workorder.category = local_data.get("category", workorder.category)
|
||||
workorder.priority = local_data.get("priority", workorder.priority)
|
||||
workorder.status = local_data.get("status", workorder.status)
|
||||
workorder.updated_at = local_data.get("updated_at", workorder.updated_at)
|
||||
workorder.resolution = local_data.get("solution", workorder.resolution)
|
||||
workorder.assignee = local_data.get("assignee", workorder.assignee)
|
||||
workorder.solution = local_data.get("solution", workorder.solution)
|
||||
workorder.ai_suggestion = local_data.get("ai_suggestion", workorder.ai_suggestion)
|
||||
|
||||
session.commit()
|
||||
session.refresh(workorder)
|
||||
logger.info(f"更新工单成功: {workorder.order_id}")
|
||||
return workorder
|
||||
except Exception as e:
|
||||
logger.error(f"更新工单失败: {e}")
|
||||
raise
|
||||
|
||||
def _update_feishu_ai_suggestion(self, record_id: str, ai_suggestion: str) -> bool:
|
||||
"""更新飞书表格中的AI建议"""
|
||||
try:
|
||||
result = self.feishu_client.update_record(
|
||||
self.app_token,
|
||||
self.table_id,
|
||||
record_id,
|
||||
{"AI建议": ai_suggestion}
|
||||
)
|
||||
return result.get("code") == 0
|
||||
except Exception as e:
|
||||
logger.error(f"更新飞书AI建议失败: {e}")
|
||||
return False
|
||||
|
||||
def _convert_feishu_to_local(self, feishu_fields: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""将飞书字段转换为本地工单字段"""
|
||||
local_data = {}
|
||||
|
||||
logger.info(f"开始转换飞书字段: {feishu_fields}")
|
||||
logger.info(f"字段映射配置: {self.field_mapping}")
|
||||
|
||||
for feishu_field, local_field in self.field_mapping.items():
|
||||
if feishu_field in feishu_fields:
|
||||
value = feishu_fields[feishu_field]
|
||||
logger.info(f"映射字段 {feishu_field} -> {local_field}: {value}")
|
||||
|
||||
# 特殊字段处理
|
||||
if local_field == "status" and value in self.status_mapping:
|
||||
value = self.status_mapping[value]
|
||||
elif local_field == "priority" and value in self.priority_mapping:
|
||||
value = self.priority_mapping[value]
|
||||
elif local_field in ["created_at", "updated_at"] and value:
|
||||
try:
|
||||
# 处理飞书时间戳(毫秒)
|
||||
if isinstance(value, (int, float)):
|
||||
# 飞书时间戳是毫秒,需要转换为秒
|
||||
value = datetime.fromtimestamp(value / 1000)
|
||||
else:
|
||||
# 处理ISO格式时间字符串
|
||||
value = datetime.fromisoformat(value.replace('Z', '+00:00'))
|
||||
except Exception as e:
|
||||
logger.warning(f"时间字段转换失败: {e}, 使用当前时间")
|
||||
value = datetime.now()
|
||||
|
||||
local_data[local_field] = value
|
||||
else:
|
||||
logger.info(f"飞书字段 {feishu_field} 不存在于数据中")
|
||||
|
||||
# 设置默认值
|
||||
if "status" not in local_data:
|
||||
local_data["status"] = WorkOrderStatus.PENDING
|
||||
if "priority" not in local_data:
|
||||
local_data["priority"] = WorkOrderPriority.MEDIUM
|
||||
if "category" not in local_data:
|
||||
local_data["category"] = "Remote control" # 根据表格中最常见的问题类型
|
||||
if "title" not in local_data or not local_data["title"]:
|
||||
local_data["title"] = "TR工单" # 默认标题
|
||||
|
||||
return local_data
|
||||
|
||||
def _convert_local_to_feishu(self, workorder: WorkOrder) -> Dict[str, Any]:
|
||||
"""将本地工单字段转换为飞书字段"""
|
||||
feishu_fields = {}
|
||||
|
||||
# 反向映射
|
||||
reverse_mapping = {v: k for k, v in self.field_mapping.items()}
|
||||
|
||||
for local_field, feishu_field in reverse_mapping.items():
|
||||
value = getattr(workorder, local_field, None)
|
||||
if value is not None:
|
||||
# 特殊字段处理
|
||||
if local_field == "status":
|
||||
# 反向状态映射
|
||||
reverse_status = {v: k for k, v in self.status_mapping.items()}
|
||||
value = reverse_status.get(value, str(value))
|
||||
elif local_field == "priority":
|
||||
# 反向优先级映射
|
||||
reverse_priority = {v: k for k, v in self.priority_mapping.items()}
|
||||
value = reverse_priority.get(value, str(value))
|
||||
elif local_field in ["created_at", "updated_at"] and isinstance(value, datetime):
|
||||
value = value.isoformat()
|
||||
|
||||
feishu_fields[feishu_field] = value
|
||||
|
||||
return feishu_fields
|
||||
|
||||
def get_sync_status(self) -> Dict[str, Any]:
|
||||
"""获取同步状态统计"""
|
||||
try:
|
||||
with db_manager.get_session() as session:
|
||||
total_local = session.query(WorkOrder).count()
|
||||
synced_count = session.query(WorkOrder).filter(
|
||||
WorkOrder.feishu_record_id.isnot(None)
|
||||
).count()
|
||||
|
||||
return {
|
||||
"total_local_workorders": total_local,
|
||||
"synced_workorders": synced_count,
|
||||
"unsynced_workorders": total_local - synced_count
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"获取同步状态失败: {e}")
|
||||
return {"error": str(e)}
|
||||
Reference in New Issue
Block a user