import json import time import random from pathlib import Path from typing import Dict, List, Optional, Tuple import logging class UserCacheManager: """用户缓存管理器,用于管理飞书用户信息和最近联系人""" def __init__(self, cache_file: str = "./data/user_cache.json"): """ 初始化用户缓存管理器 Args: cache_file: 缓存文件路径 """ self.cache_file = Path(cache_file) self.cache_file.parent.mkdir(parents=True, exist_ok=True) # 缓存数据结构 self.users: List[Dict] = [] # 用户列表 self.recent_contacts: List[str] = [] # 最近联系人名字列表 self.last_update_time: float = 0 # 最后更新时间 # 加载缓存 self._load_cache() def _load_cache(self): """从文件加载缓存""" try: if self.cache_file.exists(): with open(self.cache_file, 'r', encoding='utf-8') as f: data = json.load(f) self.users = data.get('users', []) self.recent_contacts = data.get('recent_contacts', []) self.last_update_time = data.get('last_update_time', 0) logging.info(f"已加载用户缓存,共 {len(self.users)} 个用户") else: logging.info("未找到用户缓存文件,将创建新的缓存") except Exception as e: logging.error(f"加载用户缓存失败: {str(e)}") self.users = [] self.recent_contacts = [] self.last_update_time = 0 def _save_cache(self): """保存缓存到文件""" try: data = { 'users': self.users, 'recent_contacts': self.recent_contacts, 'last_update_time': time.time() } with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logging.info(f"已保存用户缓存到 {self.cache_file}") except Exception as e: logging.error(f"保存用户缓存失败: {str(e)}") def update_users(self, users: List[Dict]): """ 更新用户列表 Args: users: 飞书用户列表,每个用户包含 name, user_id 等字段 """ self.users = users self.last_update_time = time.time() self._save_cache() logging.info(f"已更新用户列表,共 {len(users)} 个用户") def add_recent_contact(self, name: str): """ 添加最近联系人 Args: name: 联系人姓名 """ if not name or name.strip() == "": return name = name.strip() # 如果已存在,先移除 if name in self.recent_contacts: self.recent_contacts.remove(name) # 添加到列表开头 self.recent_contacts.insert(0, name) # 限制列表长度(最多保留50个) if len(self.recent_contacts) > 50: self.recent_contacts = self.recent_contacts[:50] self._save_cache() logging.debug(f"已添加最近联系人: {name}") def match_user_by_name(self, name: str) -> Optional[Dict]: """ 根据名字匹配用户 Args: name: 要匹配的名字 Returns: 匹配的用户信息,如果未找到则返回None """ if not name or not self.users: return None name = name.strip() # 精确匹配 for user in self.users: if user.get('name') == name: return user # 模糊匹配(包含关系) for user in self.users: user_name = user.get('name', '') if user_name and (name in user_name or user_name in name): return user return None def get_random_recent_contact(self) -> Optional[str]: """ 从最近联系人中随机选择一个 Returns: 随机选择的联系人名字,如果没有则返回None """ if not self.recent_contacts: return None # 随机选择一个 return random.choice(self.recent_contacts) def get_user_suggestions(self, name: str) -> List[Dict]: """ 获取名字相似的用户建议 Args: name: 要匹配的名字 Returns: 相似的用户列表 """ if not name or not self.users: return [] name = name.strip().lower() suggestions = [] for user in self.users: user_name = user.get('name', '') user_name_lower = user_name.lower() # 计算相似度(简单的包含关系) if name in user_name_lower or user_name_lower in name: suggestions.append(user) elif name[0] == user_name_lower[0] if user_name_lower else False: suggestions.append(user) # 限制返回数量 return suggestions[:5] def is_cache_expired(self, max_age_hours: int = 24) -> bool: """ 检查缓存是否过期 Args: max_age_hours: 最大缓存时间(小时) Returns: 是否过期 """ if self.last_update_time == 0: return True age_hours = (time.time() - self.last_update_time) / 3600 return age_hours > max_age_hours def clear_cache(self): """清除缓存""" self.users = [] self.recent_contacts = [] self.last_update_time = 0 self._save_cache() logging.info("已清除用户缓存") def get_cache_stats(self) -> Dict: """ 获取缓存统计信息 Returns: 缓存统计信息 """ age_hours = 0 if self.last_update_time > 0: age_hours = (time.time() - self.last_update_time) / 3600 return { 'user_count': len(self.users), 'recent_contact_count': len(self.recent_contacts), 'last_update_time': self.last_update_time, 'cache_age_hours': round(age_hours, 2), 'is_expired': self.is_cache_expired() }