Files
feishu_screen/utils/user_cache.py

211 lines
6.5 KiB
Python
Raw Normal View History

2026-03-18 15:15:52 +08:00
import json
import time
import random
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import logging
class UserCacheManager:
"""用户缓存管理器,用于管理飞书用户信息和最近联系人"""
def __init__(self, cache_file: str = "./data/user_cache.json"):
"""
初始化用户缓存管理器
Args:
cache_file: 缓存文件路径
"""
self.cache_file = Path(cache_file)
self.cache_file.parent.mkdir(parents=True, exist_ok=True)
# 缓存数据结构
self.users: List[Dict] = [] # 用户列表
self.recent_contacts: List[str] = [] # 最近联系人名字列表
self.last_update_time: float = 0 # 最后更新时间
# 加载缓存
self._load_cache()
def _load_cache(self):
"""从文件加载缓存"""
try:
if self.cache_file.exists():
with open(self.cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.users = data.get('users', [])
self.recent_contacts = data.get('recent_contacts', [])
self.last_update_time = data.get('last_update_time', 0)
logging.info(f"已加载用户缓存,共 {len(self.users)} 个用户")
else:
logging.info("未找到用户缓存文件,将创建新的缓存")
except Exception as e:
logging.error(f"加载用户缓存失败: {str(e)}")
self.users = []
self.recent_contacts = []
self.last_update_time = 0
def _save_cache(self):
"""保存缓存到文件"""
try:
data = {
'users': self.users,
'recent_contacts': self.recent_contacts,
'last_update_time': time.time()
}
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logging.info(f"已保存用户缓存到 {self.cache_file}")
except Exception as e:
logging.error(f"保存用户缓存失败: {str(e)}")
def update_users(self, users: List[Dict]):
"""
更新用户列表
Args:
users: 飞书用户列表每个用户包含 name, user_id 等字段
"""
self.users = users
self.last_update_time = time.time()
self._save_cache()
logging.info(f"已更新用户列表,共 {len(users)} 个用户")
def add_recent_contact(self, name: str):
"""
添加最近联系人
Args:
name: 联系人姓名
"""
if not name or name.strip() == "":
return
name = name.strip()
# 如果已存在,先移除
if name in self.recent_contacts:
self.recent_contacts.remove(name)
# 添加到列表开头
self.recent_contacts.insert(0, name)
# 限制列表长度最多保留50个
if len(self.recent_contacts) > 50:
self.recent_contacts = self.recent_contacts[:50]
self._save_cache()
logging.debug(f"已添加最近联系人: {name}")
def match_user_by_name(self, name: str) -> Optional[Dict]:
"""
根据名字匹配用户
Args:
name: 要匹配的名字
Returns:
匹配的用户信息如果未找到则返回None
"""
if not name or not self.users:
return None
name = name.strip()
# 精确匹配
for user in self.users:
if user.get('name') == name:
return user
# 模糊匹配(包含关系)
for user in self.users:
user_name = user.get('name', '')
if user_name and (name in user_name or user_name in name):
return user
return None
def get_random_recent_contact(self) -> Optional[str]:
"""
从最近联系人中随机选择一个
Returns:
随机选择的联系人名字如果没有则返回None
"""
if not self.recent_contacts:
return None
# 随机选择一个
return random.choice(self.recent_contacts)
def get_user_suggestions(self, name: str) -> List[Dict]:
"""
获取名字相似的用户建议
Args:
name: 要匹配的名字
Returns:
相似的用户列表
"""
if not name or not self.users:
return []
name = name.strip().lower()
suggestions = []
for user in self.users:
user_name = user.get('name', '')
user_name_lower = user_name.lower()
# 计算相似度(简单的包含关系)
if name in user_name_lower or user_name_lower in name:
suggestions.append(user)
elif name[0] == user_name_lower[0] if user_name_lower else False:
suggestions.append(user)
# 限制返回数量
return suggestions[:5]
def is_cache_expired(self, max_age_hours: int = 24) -> bool:
"""
检查缓存是否过期
Args:
max_age_hours: 最大缓存时间小时
Returns:
是否过期
"""
if self.last_update_time == 0:
return True
age_hours = (time.time() - self.last_update_time) / 3600
return age_hours > max_age_hours
def clear_cache(self):
"""清除缓存"""
self.users = []
self.recent_contacts = []
self.last_update_time = 0
self._save_cache()
logging.info("已清除用户缓存")
def get_cache_stats(self) -> Dict:
"""
获取缓存统计信息
Returns:
缓存统计信息
"""
age_hours = 0
if self.last_update_time > 0:
age_hours = (time.time() - self.last_update_time) / 3600
return {
'user_count': len(self.users),
'recent_contact_count': len(self.recent_contacts),
'last_update_time': self.last_update_time,
'cache_age_hours': round(age_hours, 2),
'is_expired': self.is_cache_expired()
}