Files
feishu_screen/services/feishu_service.py

489 lines
20 KiB
Python
Raw Permalink Normal View History

2026-03-18 15:15:52 +08:00
import requests
import json
import time
import logging
from datetime import datetime
from typing import Dict, Optional, List
from pathlib import Path
from utils.user_cache import UserCacheManager
class FeishuService:
def __init__(self, config):
"""
初始化飞书服务
:param config: 包含飞书配置的字典 (来自 config.yaml)
"""
self.app_id = config.get('app_id')
self.app_secret = config.get('app_secret')
self.app_token = config.get('app_token') # 多维表格的 Base Token
self.table_id = config.get('table_id') # 数据表 ID
# 内部变量,用于缓存 Token
self._tenant_access_token = None
self._token_expire_time = 0
# 重试配置
self.max_retries = config.get('max_retries', 3)
self.retry_delay = config.get('retry_delay', 1.0)
# 用户匹配配置
user_matching_config = config.get('user_matching', {})
self.user_matching_enabled = user_matching_config.get('enabled', True)
self.fallback_to_random = user_matching_config.get('fallback_to_random', True)
self.cache_enabled = user_matching_config.get('cache_enabled', True)
self.cache_file = user_matching_config.get('cache_file', './data/user_cache.json')
# 用户缓存管理器
self.user_cache = UserCacheManager(self.cache_file) if self.cache_enabled else None
# 支持的字段类型映射
self.field_type_map = {
'text': 'text',
'number': 'number',
'date': 'date',
'select': 'single_select',
'multi_select': 'multiple_select',
'checkbox': 'checkbox',
'url': 'url',
'email': 'email',
'phone': 'phone',
'user': 'user' # 用户字段类型
}
# 验证必需的配置
self._validate_config()
def _validate_config(self):
"""验证飞书配置"""
if not self.app_id:
raise ValueError("飞书配置缺少 app_id")
if not self.app_secret:
raise ValueError("飞书配置缺少 app_secret")
if not self.app_token:
raise ValueError("飞书配置缺少 app_token")
if not self.table_id:
raise ValueError("飞书配置缺少 table_id")
# 验证用户匹配配置
if self.user_matching_enabled and not self.cache_enabled:
logging.warning("用户匹配已启用但缓存未启用,可能影响性能")
def _get_token(self, retry_count: int = 0) -> Optional[str]:
"""
获取 tenant_access_token (带缓存机制和重试)
如果 Token 还有效直接返回否则重新请求
"""
now = time.time()
# 如果 token 存在且离过期还有60秒以上直接复用
if self._tenant_access_token and now < self._token_expire_time - 60:
return self._tenant_access_token
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
headers = {"Content-Type": "application/json; charset=utf-8"}
payload = {
"app_id": self.app_id,
"app_secret": self.app_secret
}
try:
resp = requests.post(url, headers=headers, json=payload, timeout=10)
resp_data = resp.json()
if resp_data.get("code") == 0:
self._tenant_access_token = resp_data.get("tenant_access_token")
# expire_in 通常是 7200 秒,我们记录下过期的绝对时间戳
self._token_expire_time = now + resp_data.get("expire", 7200)
logging.info("🔄 [Feishu] Token 已刷新")
return self._tenant_access_token
else:
error_msg = resp_data.get("msg", "未知错误")
raise Exception(f"获取飞书 Token 失败: {error_msg}")
except requests.exceptions.RequestException as e:
if retry_count < self.max_retries:
wait_time = self.retry_delay * (2 ** retry_count)
logging.warning(f"网络错误,{wait_time}秒后重试: {str(e)}")
time.sleep(wait_time)
return self._get_token(retry_count + 1)
logging.error(f" [Feishu] 网络错误,已重试 {self.max_retries} 次: {str(e)}")
return None
except Exception as e:
logging.error(f" [Feishu] 认证错误: {str(e)}")
return None
def _date_to_timestamp(self, date_str: str) -> Optional[int]:
"""
YYYY-MM-DD 字符串转换为飞书需要的毫秒级时间戳
"""
if not date_str or date_str == "null":
return None
try:
# 假设 AI 返回的是 YYYY-MM-DD
dt = datetime.strptime(date_str, "%Y-%m-%d")
# 转换为毫秒时间戳
return int(dt.timestamp() * 1000)
except ValueError:
logging.warning(f"⚠️ [Feishu] 日期格式无法解析: {date_str},将留空。")
return None
def _validate_ai_data(self, ai_data: Dict) -> bool:
"""验证AI返回的数据"""
required_fields = ['task_description', 'priority', 'status']
for field in required_fields:
if field not in ai_data:
logging.error(f" [Feishu] AI数据缺少必需字段: {field}")
return False
# 验证选项值
valid_priorities = ['紧急', '较紧急', '一般', '普通']
valid_statuses = [' 已停滞', '待开始', '进行中', ',已完成']
if ai_data.get('priority') not in valid_priorities:
logging.error(f" [Feishu] 无效的优先级: {ai_data.get('priority')}")
return False
if ai_data.get('status') not in valid_statuses:
logging.error(f" [Feishu] 无效的状态: {ai_data.get('status')}")
return False
return True
def _build_fields(self, ai_data: Dict) -> Dict:
"""构建飞书字段数据"""
# --- 字段映射区域 (Key 必须与飞书表格列名完全一致) ---
fields = {
"任务描述": ai_data.get("task_description", ""),
"重要紧急程度": ai_data.get("priority", "普通"), # 默认值防止报错
"进展": ai_data.get("status", "待开始"),
"最新进展记录": ai_data.get("latest_progress", "进行中,请及时更新"),
# 任务发起方.部门是多选字段,需要列表格式
"任务发起方.部门": self._build_multi_select_field(ai_data.get("department", "")) # 修复字段名:添加点号
}
# 处理任务发起方
initiator_name = ai_data.get("initiator", "")
if initiator_name and self.user_matching_enabled:
# 尝试匹配用户
user_match = self._match_initiator(initiator_name)
if user_match:
if user_match.get('matched'):
# 成功匹配到用户ID
fields["任务发起方"] = self._build_user_field(user_match['user_id'])
logging.info(f"成功匹配任务发起方: {initiator_name} -> {user_match['user_name']}")
else:
# 未匹配到用户,添加待确认标记到任务描述
fields["任务描述"] = f"[待确认发起人] {fields['任务描述']}"
# 在任务发起方.部门字段中添加待确认信息(作为提示)
existing_dept = ai_data.get("department", "")
if existing_dept:
fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (待确认: {initiator_name})")
else:
fields["任务发起方.部门"] = self._build_multi_select_field(f"待确认: {initiator_name}")
logging.warning(f"未匹配到用户: {initiator_name},标记为待确认")
# 添加到最近联系人
if self.user_cache:
self.user_cache.add_recent_contact(initiator_name)
else:
# 匹配失败,使用随机选择或留空
if self.fallback_to_random and self.user_cache:
random_contact = self.user_cache.get_random_recent_contact()
if random_contact:
fields["任务描述"] = f"[随机匹配] {fields['任务描述']}"
# 在任务发起方.部门字段中添加随机匹配信息
existing_dept = ai_data.get("department", "")
if existing_dept:
fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (随机: {random_contact})")
else:
fields["任务发起方.部门"] = self._build_multi_select_field(f"随机: {random_contact}")
logging.info(f"随机匹配任务发起方: {random_contact}")
else:
fields["任务描述"] = f"[待确认发起人] {fields['任务描述']}"
# 在任务发起方.部门字段中添加待确认信息
existing_dept = ai_data.get("department", "")
if existing_dept:
fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (待确认: {initiator_name})")
else:
fields["任务发起方.部门"] = self._build_multi_select_field(f"待确认: {initiator_name}")
logging.warning(f"无最近联系人可用,标记为待确认: {initiator_name}")
else:
fields["任务描述"] = f"[待确认发起人] {fields['任务描述']}"
# 在任务发起方.部门字段中添加待确认信息
existing_dept = ai_data.get("department", "")
if existing_dept:
fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (待确认: {initiator_name})")
else:
fields["任务发起方.部门"] = self._build_multi_select_field(f"待确认: {initiator_name}")
logging.warning(f"未匹配到用户且未启用随机匹配: {initiator_name}")
elif initiator_name:
# 用户匹配未启用,直接使用名字
# 在任务发起方.部门字段中添加发起人信息
existing_dept = ai_data.get("department", "")
if existing_dept:
fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (发起人: {initiator_name})")
else:
fields["任务发起方.部门"] = self._build_multi_select_field(f"发起人: {initiator_name}")
# 移除用户字段避免UserFieldConvFail错误如果匹配失败
if "任务发起方" not in fields:
fields.pop("任务发起方", None)
# 处理日期字段 (AI 返回的是字符串,飞书写入最好用时间戳)
start_date = ai_data.get("start_date")
if start_date and isinstance(start_date, str):
start_ts = self._date_to_timestamp(start_date)
if start_ts:
fields["开始日期"] = start_ts
due_date = ai_data.get("due_date")
if due_date and isinstance(due_date, str):
due_ts = self._date_to_timestamp(due_date)
if due_ts:
fields["预计完成日期"] = due_ts
return fields
def _build_multi_select_field(self, value: str) -> Optional[List]:
"""构建飞书多选字段"""
if not value:
return None
# 多选字段需要以列表格式提供
# 格式:["选项1", "选项2"]
return [value]
def _call_feishu_api(self, url: str, headers: Dict, payload: Dict, retry_count: int = 0) -> Optional[Dict]:
"""调用飞书API带重试机制"""
try:
response = requests.post(url, headers=headers, json=payload, timeout=30)
res_json = response.json()
if res_json.get("code") == 0:
return res_json
else:
error_msg = res_json.get("msg", "未知错误")
error_code = res_json.get("code", "未知错误码")
raise Exception(f"飞书API错误 {error_code}: {error_msg}")
except requests.exceptions.RequestException as e:
if retry_count < self.max_retries:
wait_time = self.retry_delay * (2 ** retry_count)
logging.warning(f"网络错误,{wait_time}秒后重试: {str(e)}")
time.sleep(wait_time)
return self._call_feishu_api(url, headers, payload, retry_count + 1)
logging.error(f" [Feishu] 网络错误,已重试 {self.max_retries} 次: {str(e)}")
return None
except Exception as e:
logging.error(f" [Feishu] API调用失败: {str(e)}")
return None
def add_task(self, ai_data: Optional[Dict]) -> bool:
"""
AI 识别的数据写入多维表格
:param ai_data: AI 返回的 JSON 字典
:return: 是否成功
"""
# 验证AI数据
if not ai_data or not self._validate_ai_data(ai_data):
return False
token = self._get_token()
if not token:
logging.error(" [Feishu] 无法获取 Token跳过写入。")
return False
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{self.app_token}/tables/{self.table_id}/records"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8"
}
# 构建字段数据
fields = self._build_fields(ai_data)
payload = {"fields": fields}
# 调用API
res_json = self._call_feishu_api(url, headers, payload)
if res_json and res_json.get("code") == 0:
record_id = res_json['data']['record']['record_id']
logging.info(f" [Feishu] 成功写入一条记录!(Record ID: {record_id})")
return True
else:
logging.error(f" [Feishu] 写入失败: {json.dumps(res_json, ensure_ascii=False) if res_json else '无响应'}")
return False
def add_task_batch(self, ai_data_list: List[Dict]) -> Dict[str, bool]:
"""
批量写入任务到飞书
:param ai_data_list: AI数据列表
:return: 字典键为数据索引值为是否成功
"""
results = {}
for index, ai_data in enumerate(ai_data_list):
try:
success = self.add_task(ai_data)
results[str(index)] = success
except Exception as e:
logging.error(f" [Feishu] 批量写入失败 (索引 {index}): {str(e)}")
results[str(index)] = False
return results
def _match_initiator(self, initiator_name: str) -> Optional[Dict]:
"""
匹配任务发起人
Args:
initiator_name: AI识别的发起人名字
Returns:
匹配结果字典包含matched, user_id, user_name等字段
"""
if not initiator_name or not self.user_matching_enabled:
return None
# 确保用户缓存已初始化
if not self.user_cache:
return None
# 检查缓存是否过期,如果过期则重新获取用户列表
if self.user_cache.is_cache_expired(max_age_hours=24):
logging.info("用户缓存已过期,重新获取用户列表")
users = self._get_user_list()
if users:
self.user_cache.update_users(users)
# 尝试匹配用户
matched_user = self.user_cache.match_user_by_name(initiator_name)
if matched_user:
return {
'matched': True,
'user_id': matched_user.get('user_id'),
'user_name': matched_user.get('name')
}
else:
return {
'matched': False,
'user_name': initiator_name
}
def _get_user_list(self) -> Optional[List[Dict]]:
"""
从飞书API获取用户列表
Returns:
用户列表每个用户包含 name, user_id 等字段
"""
token = self._get_token()
if not token:
logging.error(" [Feishu] 无法获取 Token跳过用户列表获取。")
return None
url = "https://open.feishu.cn/open-apis/contact/v3/users"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8"
}
try:
# 分页获取用户列表
all_users = []
page_token = None
while True:
params = {"page_size": 100}
if page_token:
params["page_token"] = page_token
response = requests.get(url, headers=headers, params=params, timeout=30)
res_json = response.json()
if res_json.get("code") == 0:
users = res_json.get("data", {}).get("items", [])
for user in users:
# 提取用户信息
user_info = {
'user_id': user.get('user_id'),
'name': user.get('name'),
'email': user.get('email'),
'mobile': user.get('mobile')
}
all_users.append(user_info)
# 检查是否有更多页面
page_token = res_json.get("data", {}).get("page_token")
if not page_token:
break
else:
error_msg = res_json.get("msg", "未知错误")
logging.error(f"获取用户列表失败: {error_msg}")
break
logging.info(f"成功获取 {len(all_users)} 个用户")
return all_users
except Exception as e:
logging.error(f"获取用户列表时出错: {str(e)}")
return None
def _build_user_field(self, user_id: str) -> Optional[Dict]:
"""
构建飞书用户字段
Args:
user_id: 用户ID
Returns:
用户字段字典
"""
if not user_id:
return None
return {
"id": user_id
}
# ================= 单元测试代码 =================
if __name__ == "__main__":
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 如果你直接运行这个文件,会执行下面的测试代码
# 这里填入真实的配置进行测试
test_config = {
"app_id": "cli_xxxxxxxxxxxx", # 换成你的 App ID
"app_secret": "xxxxxxxxxxxxxx", # 换成你的 App Secret
"app_token": "basxxxxxxxxxxxxx", # 换成你的 Base Token
"table_id": "tblxxxxxxxxxxxxx" # 换成你的 Table ID
}
# 模拟 AI 返回的数据
mock_ai_data = {
"task_description": "测试任务:编写 Feishu Service 模块",
"priority": "普通", # 确保飞书表格里有这个选项
"status": "进行中", # 确保飞书表格里有这个选项
"latest_progress": "代码框架已完成,正在调试 API",
"initiator": "Python脚本",
"department": "研发部",
"start_date": "2023-10-27",
"due_date": "2023-10-30"
}
try:
fs = FeishuService(test_config)
success = fs.add_task(mock_ai_data)
import sys
if success:
sys.stdout.write("测试成功!\n")
else:
sys.stdout.write("测试失败!\n")
sys.stdout.flush()
except Exception as e:
import sys
sys.stdout.write(f"测试出错: {str(e)}\n")
sys.stdout.flush()