211 lines
6.5 KiB
Python
211 lines
6.5 KiB
Python
|
|
import json
|
|||
|
|
import time
|
|||
|
|
import random
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Dict, List, Optional, Tuple
|
|||
|
|
import logging
|
|||
|
|
|
|||
|
|
|
|||
|
|
class UserCacheManager:
|
|||
|
|
"""用户缓存管理器,用于管理飞书用户信息和最近联系人"""
|
|||
|
|
|
|||
|
|
def __init__(self, cache_file: str = "./data/user_cache.json"):
|
|||
|
|
"""
|
|||
|
|
初始化用户缓存管理器
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
cache_file: 缓存文件路径
|
|||
|
|
"""
|
|||
|
|
self.cache_file = Path(cache_file)
|
|||
|
|
self.cache_file.parent.mkdir(parents=True, exist_ok=True)
|
|||
|
|
|
|||
|
|
# 缓存数据结构
|
|||
|
|
self.users: List[Dict] = [] # 用户列表
|
|||
|
|
self.recent_contacts: List[str] = [] # 最近联系人名字列表
|
|||
|
|
self.last_update_time: float = 0 # 最后更新时间
|
|||
|
|
|
|||
|
|
# 加载缓存
|
|||
|
|
self._load_cache()
|
|||
|
|
|
|||
|
|
def _load_cache(self):
|
|||
|
|
"""从文件加载缓存"""
|
|||
|
|
try:
|
|||
|
|
if self.cache_file.exists():
|
|||
|
|
with open(self.cache_file, 'r', encoding='utf-8') as f:
|
|||
|
|
data = json.load(f)
|
|||
|
|
self.users = data.get('users', [])
|
|||
|
|
self.recent_contacts = data.get('recent_contacts', [])
|
|||
|
|
self.last_update_time = data.get('last_update_time', 0)
|
|||
|
|
logging.info(f"已加载用户缓存,共 {len(self.users)} 个用户")
|
|||
|
|
else:
|
|||
|
|
logging.info("未找到用户缓存文件,将创建新的缓存")
|
|||
|
|
except Exception as e:
|
|||
|
|
logging.error(f"加载用户缓存失败: {str(e)}")
|
|||
|
|
self.users = []
|
|||
|
|
self.recent_contacts = []
|
|||
|
|
self.last_update_time = 0
|
|||
|
|
|
|||
|
|
def _save_cache(self):
|
|||
|
|
"""保存缓存到文件"""
|
|||
|
|
try:
|
|||
|
|
data = {
|
|||
|
|
'users': self.users,
|
|||
|
|
'recent_contacts': self.recent_contacts,
|
|||
|
|
'last_update_time': time.time()
|
|||
|
|
}
|
|||
|
|
with open(self.cache_file, 'w', encoding='utf-8') as f:
|
|||
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|||
|
|
logging.info(f"已保存用户缓存到 {self.cache_file}")
|
|||
|
|
except Exception as e:
|
|||
|
|
logging.error(f"保存用户缓存失败: {str(e)}")
|
|||
|
|
|
|||
|
|
def update_users(self, users: List[Dict]):
|
|||
|
|
"""
|
|||
|
|
更新用户列表
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
users: 飞书用户列表,每个用户包含 name, user_id 等字段
|
|||
|
|
"""
|
|||
|
|
self.users = users
|
|||
|
|
self.last_update_time = time.time()
|
|||
|
|
self._save_cache()
|
|||
|
|
logging.info(f"已更新用户列表,共 {len(users)} 个用户")
|
|||
|
|
|
|||
|
|
def add_recent_contact(self, name: str):
|
|||
|
|
"""
|
|||
|
|
添加最近联系人
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
name: 联系人姓名
|
|||
|
|
"""
|
|||
|
|
if not name or name.strip() == "":
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
name = name.strip()
|
|||
|
|
|
|||
|
|
# 如果已存在,先移除
|
|||
|
|
if name in self.recent_contacts:
|
|||
|
|
self.recent_contacts.remove(name)
|
|||
|
|
|
|||
|
|
# 添加到列表开头
|
|||
|
|
self.recent_contacts.insert(0, name)
|
|||
|
|
|
|||
|
|
# 限制列表长度(最多保留50个)
|
|||
|
|
if len(self.recent_contacts) > 50:
|
|||
|
|
self.recent_contacts = self.recent_contacts[:50]
|
|||
|
|
|
|||
|
|
self._save_cache()
|
|||
|
|
logging.debug(f"已添加最近联系人: {name}")
|
|||
|
|
|
|||
|
|
def match_user_by_name(self, name: str) -> Optional[Dict]:
|
|||
|
|
"""
|
|||
|
|
根据名字匹配用户
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
name: 要匹配的名字
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
匹配的用户信息,如果未找到则返回None
|
|||
|
|
"""
|
|||
|
|
if not name or not self.users:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
name = name.strip()
|
|||
|
|
|
|||
|
|
# 精确匹配
|
|||
|
|
for user in self.users:
|
|||
|
|
if user.get('name') == name:
|
|||
|
|
return user
|
|||
|
|
|
|||
|
|
# 模糊匹配(包含关系)
|
|||
|
|
for user in self.users:
|
|||
|
|
user_name = user.get('name', '')
|
|||
|
|
if user_name and (name in user_name or user_name in name):
|
|||
|
|
return user
|
|||
|
|
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def get_random_recent_contact(self) -> Optional[str]:
|
|||
|
|
"""
|
|||
|
|
从最近联系人中随机选择一个
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
随机选择的联系人名字,如果没有则返回None
|
|||
|
|
"""
|
|||
|
|
if not self.recent_contacts:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 随机选择一个
|
|||
|
|
return random.choice(self.recent_contacts)
|
|||
|
|
|
|||
|
|
def get_user_suggestions(self, name: str) -> List[Dict]:
|
|||
|
|
"""
|
|||
|
|
获取名字相似的用户建议
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
name: 要匹配的名字
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
相似的用户列表
|
|||
|
|
"""
|
|||
|
|
if not name or not self.users:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
name = name.strip().lower()
|
|||
|
|
suggestions = []
|
|||
|
|
|
|||
|
|
for user in self.users:
|
|||
|
|
user_name = user.get('name', '')
|
|||
|
|
user_name_lower = user_name.lower()
|
|||
|
|
|
|||
|
|
# 计算相似度(简单的包含关系)
|
|||
|
|
if name in user_name_lower or user_name_lower in name:
|
|||
|
|
suggestions.append(user)
|
|||
|
|
elif name[0] == user_name_lower[0] if user_name_lower else False:
|
|||
|
|
suggestions.append(user)
|
|||
|
|
|
|||
|
|
# 限制返回数量
|
|||
|
|
return suggestions[:5]
|
|||
|
|
|
|||
|
|
def is_cache_expired(self, max_age_hours: int = 24) -> bool:
|
|||
|
|
"""
|
|||
|
|
检查缓存是否过期
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
max_age_hours: 最大缓存时间(小时)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
是否过期
|
|||
|
|
"""
|
|||
|
|
if self.last_update_time == 0:
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
age_hours = (time.time() - self.last_update_time) / 3600
|
|||
|
|
return age_hours > max_age_hours
|
|||
|
|
|
|||
|
|
def clear_cache(self):
|
|||
|
|
"""清除缓存"""
|
|||
|
|
self.users = []
|
|||
|
|
self.recent_contacts = []
|
|||
|
|
self.last_update_time = 0
|
|||
|
|
self._save_cache()
|
|||
|
|
logging.info("已清除用户缓存")
|
|||
|
|
|
|||
|
|
def get_cache_stats(self) -> Dict:
|
|||
|
|
"""
|
|||
|
|
获取缓存统计信息
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
缓存统计信息
|
|||
|
|
"""
|
|||
|
|
age_hours = 0
|
|||
|
|
if self.last_update_time > 0:
|
|||
|
|
age_hours = (time.time() - self.last_update_time) / 3600
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
'user_count': len(self.users),
|
|||
|
|
'recent_contact_count': len(self.recent_contacts),
|
|||
|
|
'last_update_time': self.last_update_time,
|
|||
|
|
'cache_age_hours': round(age_hours, 2),
|
|||
|
|
'is_expired': self.is_cache_expired()
|
|||
|
|
}
|