From 69f92205e98d71f48e1308f4d5e51f2a29e25920 Mon Sep 17 00:00:00 2001
From: Jeason <1710884619@qq.com>
Date: Wed, 18 Mar 2026 15:15:52 +0800
Subject: [PATCH] =?UTF-8?q?=E6=88=AA=E5=9B=BE=E8=AF=86=E5=88=AB=E9=A3=9E?=
=?UTF-8?q?=E4=B9=A6=E4=B8=AA=E4=BA=BA=E4=BB=BB=E5=8A=A1=E6=B8=85=E5=8D=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.gitignore | 70 +++
Web应用使用说明.md | 307 +++++++++++
code (4).html | 1028 ++++++++++++++++++++++++++++++++++++
config.example.yaml | 45 ++
data/test_user_cache.json | 5 +
data/user_cache.json | 71 +++
main.py | 317 +++++++++++
requirements.txt | 18 +
services/__init__.py | 11 +
services/ai_service.py | 624 ++++++++++++++++++++++
services/feishu_service.py | 489 +++++++++++++++++
setup.py | 193 +++++++
start_web_app.py | 71 +++
templates/index.html | 829 +++++++++++++++++++++++++++++
tests/__init__.py | 5 +
utils/__init__.py | 54 ++
utils/config_loader.py | 200 +++++++
utils/logger.py | 185 +++++++
utils/notifier.py | 306 +++++++++++
utils/user_cache.py | 211 ++++++++
web_app.py | 452 ++++++++++++++++
优先级修改说明.md | 56 ++
项目结构说明.md | 288 ++++++++++
23 files changed, 5835 insertions(+)
create mode 100644 .gitignore
create mode 100644 Web应用使用说明.md
create mode 100644 code (4).html
create mode 100644 config.example.yaml
create mode 100644 data/test_user_cache.json
create mode 100644 data/user_cache.json
create mode 100644 main.py
create mode 100644 requirements.txt
create mode 100644 services/__init__.py
create mode 100644 services/ai_service.py
create mode 100644 services/feishu_service.py
create mode 100644 setup.py
create mode 100644 start_web_app.py
create mode 100644 templates/index.html
create mode 100644 tests/__init__.py
create mode 100644 utils/__init__.py
create mode 100644 utils/config_loader.py
create mode 100644 utils/logger.py
create mode 100644 utils/notifier.py
create mode 100644 utils/user_cache.py
create mode 100644 web_app.py
create mode 100644 优先级修改说明.md
create mode 100644 项目结构说明.md
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..cdc890c
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,70 @@
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# 虚拟环境
+venv/
+env/
+ENV/
+.env
+.env.local
+
+# 配置文件(包含敏感信息)
+config.yaml
+*.secret
+*.key
+
+# 日志文件
+*.log
+logs/
+
+# 测试相关
+.pytest_cache/
+.coverage
+htmlcov/
+.tox/
+.nox/
+.coverage.*
+*.lcov
+
+# IDE
+.vscode/
+.idea/
+*.swp
+*.swo
+*~
+
+# 操作系统
+.DS_Store
+Thumbs.db
+
+# 项目特定
+monitor_images/
+processed_images/
+test_*.py
+test_img.jpg
+test_img.png
+
+# 临时文件
+temp/
+tmp/
+*.tmp
+*.temp
\ No newline at end of file
diff --git a/Web应用使用说明.md b/Web应用使用说明.md
new file mode 100644
index 0000000..62310d1
--- /dev/null
+++ b/Web应用使用说明.md
@@ -0,0 +1,307 @@
+# Screen2Feishu Web应用使用说明
+
+## 概述
+
+Screen2Feishu Web应用是一个结合了前端界面和后端功能的完整解决方案。它提供了一个用户友好的界面,可以:
+
+1. **手动添加任务**:通过表单添加任务到飞书多维表格
+2. **图片上传分析**:上传截图,AI自动分析并生成任务
+3. **实时预览**:查看AI分析结果和任务信息
+4. **用户匹配**:自动匹配飞书用户,处理重名情况
+
+## 启动应用
+
+### 1. 安装依赖
+
+```bash
+pip install flask flask-cors
+```
+
+### 2. 配置文件
+
+确保 `config.yaml` 文件已正确配置:
+
+```yaml
+ai:
+ api_key: "your_api_key"
+ base_url: "https://api.openai.com/v1"
+ model: "gpt-4o"
+
+feishu:
+ app_id: "your_app_id"
+ app_secret: "your_app_secret"
+ app_token: "your_app_token"
+ table_id: "your_table_id"
+
+system:
+ watch_folder: "./monitor_images"
+ post_process: "keep"
+ processed_folder: "./processed_images"
+ log_level: "INFO"
+ log_file: "app.log"
+ memory_processing: false
+
+ user_matching:
+ enabled: true
+ fallback_to_random: true
+ cache_enabled: true
+ cache_file: "./data/user_cache.json"
+```
+
+### 3. 启动应用
+
+```bash
+python start_web_app.py
+```
+
+或者直接运行:
+
+```bash
+python web_app.py
+```
+
+### 4. 访问界面
+
+打开浏览器,访问:`http://localhost:5000`
+
+## 功能说明
+
+### 1. 手动添加任务
+
+1. 在"添加新任务..."输入框中输入任务描述
+2. 选择截止时间(可选)
+3. 选择分类(工作/学习/生活/其他)
+4. 选择优先级(高/中/低)
+5. 点击"添加"按钮
+
+**AI分析过程**:
+- 系统会调用AI服务分析任务描述
+- 自动识别任务发起人和部门
+- 匹配飞书用户(如果启用)
+- 将任务写入飞书多维表格
+
+### 2. 图片上传分析
+
+1. 点击"上传分析"按钮或拖拽图片到文件选择框
+2. 选择截图文件(支持JPG、PNG等格式)
+3. 系统会自动:
+ - 读取图片到内存
+ - 调用AI分析图片内容
+ - 识别任务信息(描述、优先级、发起人等)
+ - 匹配飞书用户
+ - 写入飞书表格
+ - 在界面显示分析结果
+
+**AI分析结果**:
+- 任务描述
+- 优先级(紧急/较紧急/一般/普通)
+- 状态(未开始/进行中/已完成)
+- 发起人姓名
+- 部门信息
+- 开始日期
+- 截止日期
+
+### 3. 任务管理
+
+- **筛选**:按分类、状态、优先级筛选任务
+- **编辑**:点击编辑按钮修改任务
+- **删除**:点击删除按钮删除任务
+- **完成**:勾选复选框标记任务完成
+
+### 4. 通知功能
+
+- **浏览器通知**:启用后可在任务到期时收到通知
+- **桌面通知**:显示操作结果和AI分析结果
+
+## API接口
+
+### 1. 上传图片分析
+
+**接口**:`POST /api/upload`
+
+**请求**:
+```json
+{
+ "image": "图片文件"
+}
+```
+
+**响应**:
+```json
+{
+ "success": true,
+ "message": "任务已成功添加到飞书",
+ "ai_result": {
+ "task_description": "任务描述",
+ "priority": "紧急",
+ "status": "未开始",
+ "initiator": "张三",
+ "department": "研发部",
+ "start_date": "2026-03-03",
+ "due_date": "2026-03-10"
+ }
+}
+```
+
+### 2. 获取用户缓存信息
+
+**接口**:`GET /api/user_cache`
+
+**响应**:
+```json
+{
+ "success": true,
+ "stats": {
+ "user_count": 10,
+ "recent_contact_count": 5,
+ "cache_age_hours": 0.5,
+ "is_expired": false
+ }
+}
+```
+
+### 3. 获取配置信息
+
+**接口**:`GET /api/config`
+
+**响应**:
+```json
+{
+ "success": true,
+ "config": {
+ "memory_processing": false,
+ "user_matching_enabled": true,
+ "fallback_to_random": true,
+ "cache_enabled": true,
+ "post_process": "keep"
+ }
+}
+```
+
+## 配置说明
+
+### 内存处理模式
+
+```yaml
+system:
+ memory_processing: true # 启用内存处理,不保存图片文件
+```
+
+### 用户匹配配置
+
+```yaml
+system:
+ user_matching:
+ enabled: true # 启用用户匹配
+ fallback_to_random: true # 启用随机匹配回退
+ cache_enabled: true # 启用缓存
+ cache_file: "./data/user_cache.json" # 缓存文件路径
+```
+
+### 文件处理策略
+
+```yaml
+system:
+ post_process: "keep" # 保留原始文件
+ # post_process: "delete" # 删除原始文件
+ # post_process: "move" # 移动到processed_images目录
+```
+
+## 使用场景
+
+### 场景1:截图转任务
+
+1. 从聊天记录、邮件或文档中截图
+2. 上传截图到Web界面
+3. AI自动分析截图内容
+4. 生成任务并写入飞书表格
+
+### 场景2:手动创建任务
+
+1. 在Web界面手动输入任务描述
+2. AI分析任务内容,提供优化建议
+3. 自动匹配任务发起人
+4. 写入飞书表格
+
+### 场景3:批量处理
+
+1. 准备多个截图文件
+2. 逐个上传分析
+3. 系统自动处理并写入飞书
+
+## 注意事项
+
+1. **飞书API权限**:确保飞书应用有读取用户列表和写入表格的权限
+2. **图片大小**:建议图片大小不超过10MB
+3. **网络连接**:需要稳定的网络连接调用AI和飞书API
+4. **浏览器通知**:需要用户授权浏览器通知权限
+5. **缓存管理**:用户缓存默认24小时过期,可手动清理
+
+## 故障排除
+
+### 1. 无法启动应用
+
+- 检查依赖是否安装完整
+- 检查配置文件是否正确
+- 检查端口5000是否被占用
+
+### 2. AI分析失败
+
+- 检查API密钥是否正确
+- 检查网络连接
+- 查看日志文件 `app.log`
+
+### 3. 飞书写入失败
+
+- 检查飞书API配置
+- 检查表格ID和Base Token
+- 确保飞书应用有写入权限
+
+### 4. 用户匹配失败
+
+- 检查飞书API权限(用户列表读取)
+- 检查缓存文件路径
+- 查看日志文件了解详细错误
+
+## 技术架构
+
+### 前端
+- HTML5 + CSS3 + JavaScript
+- 响应式设计,支持移动端
+- 实时通知和状态反馈
+
+### 后端
+- Flask Web框架
+- AI服务集成(OpenAI格式API)
+- 飞书API集成
+- 用户缓存管理
+
+### 数据流
+1. 用户上传图片/输入任务
+2. 前端调用后端API
+3. 后端调用AI服务分析
+4. 后端匹配飞书用户
+5. 后端写入飞书表格
+6. 返回结果给前端
+7. 前端显示结果和通知
+
+## 扩展功能
+
+### 1. 批量处理
+可以扩展支持批量上传和处理多个图片文件
+
+### 2. 实时预览
+可以添加图片预览功能,显示上传的截图
+
+### 3. 任务模板
+可以添加任务模板功能,快速创建常见任务
+
+### 4. 统计分析
+可以添加任务统计和分析功能
+
+### 5. 多语言支持
+可以添加多语言界面支持
+
+## 总结
+
+Screen2Feishu Web应用提供了一个完整的解决方案,将前端界面与后端功能完美结合。用户可以通过友好的界面轻松创建任务,AI自动分析并写入飞书表格,大大提高了工作效率。
\ No newline at end of file
diff --git a/code (4).html b/code (4).html
new file mode 100644
index 0000000..84ef4e6
--- /dev/null
+++ b/code (4).html
@@ -0,0 +1,1028 @@
+
+
+
+
+
+ 高级任务清单
+
+
+
+
+
+
+ 高级任务清单
+ 高效管理你的日常任务,支持截止时间提醒
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
暂无任务
+
添加您的第一个任务开始吧!
+
+
+
+
+
+
+
+
+
diff --git a/config.example.yaml b/config.example.yaml
new file mode 100644
index 0000000..9e3d028
--- /dev/null
+++ b/config.example.yaml
@@ -0,0 +1,45 @@
+# ================= AI 配置 =================
+ai:
+ # 推荐使用以下任一服务:
+ # 1. Gemini (Google): https://ai.google.dev/
+ # 2. SiliconFlow (硅基流动): https://cloud.siliconflow.cn/
+ # 3. DeepSeek: https://platform.deepseek.com/
+ # 4. Moonshot (月之暗面): https://platform.moonshot.cn/
+ api_key: "sk-xxxxxxxxxxxxxxxxxxxxxxxx"
+ base_url: "https://api.siliconflow.cn/v1" # 根据服务商修改
+ model: "gemini-2.5-pro" # 模型名称,根据服务商调整
+
+ # 重试配置(可选)
+ max_retries: 3 # 最大重试次数
+ retry_delay: 1.0 # 重试延迟(秒)
+
+# ================= 飞书配置 =================
+feishu:
+ # 飞书开放平台 -> 凭证与基础信息
+ app_id: "cli_xxxxxxxxxxxx"
+ app_secret: "xxxxxxxxxxxxxxxxxxxxxxxx"
+
+ # 多维表格的 token (浏览器地址栏中 app 开头的那串字符)
+ app_token: "bascnxxxxxxxxxxxxxxx"
+
+ # 数据表的 ID (在多维表格界面 -> 重命名数据表 -> 复制表ID)
+ table_id: "tblxxxxxxxxxxxx"
+
+ # 重试配置(可选)
+ max_retries: 3 # 最大重试次数
+ retry_delay: 1.0 # 重试延迟(秒)
+
+# ================= 系统配置 =================
+system:
+ # 监控的文件夹路径 (Windows 路径注意使用双斜杠 \\ 或反斜杠 /)
+ watch_folder: "./monitor_images"
+
+ # 处理完成后的文件处理方式: "delete" (删除) / "move" (移动) / "keep" (保留)
+ post_process: "move"
+
+ # 文件移动的目标文件夹 (当 post_process 为 "move" 时生效)
+ processed_folder: "./processed_images"
+
+ # 日志配置(可选)
+ log_level: "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL
+ log_file: "app.log" # 日志文件路径,留空则只输出到控制台
\ No newline at end of file
diff --git a/data/test_user_cache.json b/data/test_user_cache.json
new file mode 100644
index 0000000..112f09f
--- /dev/null
+++ b/data/test_user_cache.json
@@ -0,0 +1,5 @@
+{
+ "users": [],
+ "recent_contacts": [],
+ "last_update_time": 1772430132.497222
+}
\ No newline at end of file
diff --git a/data/user_cache.json b/data/user_cache.json
new file mode 100644
index 0000000..ceac180
--- /dev/null
+++ b/data/user_cache.json
@@ -0,0 +1,71 @@
+{
+ "users": [
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ },
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ },
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ },
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ },
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ },
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ },
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ },
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ },
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ },
+ {
+ "user_id": null,
+ "name": null,
+ "email": null,
+ "mobile": null
+ }
+ ],
+ "recent_contacts": [
+ "马老师",
+ "周刘路",
+ "周刘路 (Lucy)",
+ "张睿思"
+ ],
+ "last_update_time": 1773119191.9583383
+}
\ No newline at end of file
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..758af58
--- /dev/null
+++ b/main.py
@@ -0,0 +1,317 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Screen2Feishu - AI飞书多维表格自动录入助手
+监控文件夹,自动处理图片并写入飞书多维表格
+"""
+
+import os
+import sys
+import time
+import argparse
+from pathlib import Path
+from watchdog.observers import Observer
+from watchdog.events import FileSystemEventHandler
+
+# 添加项目根目录到Python路径
+project_root = Path(__file__).parent
+sys.path.insert(0, str(project_root))
+
+from services.ai_service import AIService
+from services.feishu_service import FeishuService
+from utils.logger import setup_logger
+from utils.notifier import send_notification
+from utils.config_loader import load_config, validate_config
+
+
+class ImageFileHandler(FileSystemEventHandler):
+ """文件系统事件处理器"""
+
+ def __init__(self, config, logger):
+ self.config = config
+ self.logger = logger
+ self.ai_service = AIService(config['ai'])
+ self.feishu_service = FeishuService(config['feishu'])
+ self.processed_files = set() # 已处理文件记录
+ self.supported_extensions = {'.png', '.jpg', '.jpeg', '.bmp', '.gif'}
+
+ # 创建必要的文件夹
+ self._create_directories()
+
+ def _create_directories(self):
+ """创建必要的文件夹"""
+ watch_folder = Path(self.config['system']['watch_folder'])
+ watch_folder.mkdir(exist_ok=True)
+
+ if self.config['system'].get('post_process') == 'move':
+ processed_folder = Path(self.config['system']['processed_folder'])
+ processed_folder.mkdir(exist_ok=True)
+
+ def _is_supported_image(self, file_path):
+ """检查文件是否为支持的图片格式"""
+ return Path(file_path).suffix.lower() in self.supported_extensions
+
+ def _is_file_ready(self, file_path):
+ """检查文件是否已完全写入(通过文件大小稳定判断)"""
+ try:
+ size1 = os.path.getsize(file_path)
+ time.sleep(0.5) # 等待0.5秒
+ size2 = os.path.getsize(file_path)
+ return size1 == size2 and size1 > 0
+ except (OSError, IOError):
+ return False
+
+ def _process_image(self, file_path):
+ """处理单个图片文件"""
+ file_path = Path(file_path)
+
+ # 检查是否已处理
+ if str(file_path) in self.processed_files:
+ return
+
+ self.logger.info(f"开始处理图片: {file_path.name}")
+
+ ai_result = None # 初始化变量
+
+ try:
+ # 检查是否启用内存处理
+ memory_processing = self.config['system'].get('memory_processing', False)
+
+ if memory_processing:
+ # 内存处理模式:直接读取图片到内存,不保存到processed_images
+ try:
+ with open(file_path, 'rb') as f:
+ image_bytes = f.read()
+
+ # 1. AI分析图片(内存模式)
+ ai_result = self.ai_service.analyze_image_from_bytes(image_bytes, file_path.name)
+ if not ai_result:
+ self.logger.error(f"AI分析失败: {file_path.name}")
+ return
+ except Exception as e:
+ self.logger.error(f"内存处理图片失败: {str(e)}")
+ # 回退到文件处理模式
+ memory_processing = False
+
+ if not memory_processing:
+ # 文件处理模式:传统方式
+ # 1. AI分析图片
+ ai_result = self.ai_service.analyze_image(str(file_path))
+ if not ai_result:
+ self.logger.error(f"AI分析失败: {file_path.name}")
+ return
+
+ # 2. 写入飞书
+ success = self.feishu_service.add_task(ai_result)
+ if success:
+ self.logger.info(f"成功写入飞书: {file_path.name}")
+ self.processed_files.add(str(file_path))
+
+ # 3. 后处理
+ self._post_process_file(file_path)
+
+ # 4. 发送通知
+ send_notification(
+ "任务处理成功",
+ f"已成功处理 {file_path.name} 并写入飞书表格"
+ )
+ else:
+ self.logger.error(f"写入飞书失败: {file_path.name}")
+
+ except Exception as e:
+ self.logger.error(f"处理图片 {file_path.name} 时出错: {str(e)}")
+
+ def _post_process_file(self, file_path):
+ """文件后处理"""
+ post_process = self.config['system'].get('post_process', 'keep')
+
+ if post_process == 'delete':
+ try:
+ file_path.unlink()
+ self.logger.info(f"已删除文件: {file_path.name}")
+ except Exception as e:
+ self.logger.error(f"删除文件失败: {str(e)}")
+
+ elif post_process == 'move':
+ try:
+ processed_folder = Path(self.config['system']['processed_folder'])
+ target_path = processed_folder / file_path.name
+
+ # 如果目标文件已存在,添加时间戳
+ if target_path.exists():
+ timestamp = int(time.time())
+ target_path = processed_folder / f"{file_path.stem}_{timestamp}{file_path.suffix}"
+
+ file_path.rename(target_path)
+ self.logger.info(f"已移动文件到: {target_path}")
+ except Exception as e:
+ self.logger.error(f"移动文件失败: {str(e)}")
+
+ def on_created(self, event):
+ """文件创建事件处理"""
+ if event.is_directory:
+ return
+
+ file_path = event.src_path
+ if not self._is_supported_image(file_path):
+ return
+
+ self.logger.info(f"检测到新文件: {file_path}")
+
+ # 等待文件完全写入
+ if self._is_file_ready(file_path):
+ self._process_image(file_path)
+ else:
+ self.logger.warning(f"文件未完全写入,跳过处理: {file_path}")
+
+ def on_modified(self, event):
+ """文件修改事件处理"""
+ if event.is_directory:
+ return
+
+ file_path = event.src_path
+ if not self._is_supported_image(file_path):
+ return
+
+ # 对于修改事件,也检查文件是否已完全写入
+ if self._is_file_ready(file_path):
+ self._process_image(file_path)
+
+
+def parse_arguments():
+ """解析命令行参数"""
+ parser = argparse.ArgumentParser(
+ description="AI飞书多维表格自动录入助手",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+示例用法:
+ python main.py # 使用默认配置启动
+ python main.py --config custom.yaml # 使用自定义配置文件
+ python main.py --test # 测试模式(不监控文件夹)
+ """
+ )
+
+ parser.add_argument(
+ '--config', '-c',
+ default='config.yaml',
+ help='配置文件路径 (默认: config.yaml)'
+ )
+
+ parser.add_argument(
+ '--test', '-t',
+ action='store_true',
+ help='测试模式:处理现有文件后退出,不监控文件夹'
+ )
+
+ parser.add_argument(
+ '--verbose', '-v',
+ action='store_true',
+ help='显示详细日志'
+ )
+
+ return parser.parse_args()
+
+
+def main():
+ """主函数"""
+ args = parse_arguments()
+
+ # 加载配置
+ try:
+ config = load_config(args.config)
+ except Exception as e:
+ print(f" 配置加载失败: {e}")
+ sys.exit(1)
+
+ # 验证配置
+ try:
+ validate_config(config)
+ except Exception as e:
+ print(f" 配置验证失败: {e}")
+ sys.exit(1)
+
+ # 设置日志
+ log_level = 'DEBUG' if args.verbose else 'INFO'
+ logger = setup_logger(
+ name='screen2feishu',
+ log_file='app.log',
+ level=log_level
+ )
+
+ logger.info("=" * 50)
+ logger.info("Screen2Feishu 启动")
+ logger.info(f"配置文件: {args.config}")
+ logger.info(f"监控文件夹: {config['system']['watch_folder']}")
+ logger.info("=" * 50)
+
+ # 创建文件处理器
+ handler = ImageFileHandler(config, logger)
+
+ if args.test:
+ # 测试模式:处理现有文件
+ watch_folder = Path(config['system']['watch_folder'])
+ image_files = [
+ f for f in watch_folder.iterdir()
+ if f.is_file() and handler._is_supported_image(f)
+ ]
+
+ if not image_files:
+ logger.info("没有找到待处理的图片文件")
+ return
+
+ logger.info(f"找到 {len(image_files)} 个图片文件,开始处理...")
+ for image_file in image_files:
+ handler._process_image(image_file)
+
+ logger.info("测试模式处理完成")
+ return
+
+ # 监控模式
+ observer = Observer()
+ observer.schedule(
+ handler,
+ path=config['system']['watch_folder'],
+ recursive=False
+ )
+
+ try:
+ observer.start()
+ logger.info("文件监控已启动,按 Ctrl+C 停止程序")
+
+ # 发送启动通知
+ send_notification(
+ "Screen2Feishu 已启动",
+ f"正在监控文件夹: {config['system']['watch_folder']}"
+ )
+
+ while True:
+ time.sleep(1)
+
+ except KeyboardInterrupt:
+ logger.info("收到停止信号,正在关闭...")
+ observer.stop()
+
+ # 发送停止通知
+ send_notification(
+ "Screen2Feishu 已停止",
+ "程序已正常关闭"
+ )
+
+ except Exception as e:
+ logger.error(f"程序运行出错: {str(e)}")
+ observer.stop()
+
+ # 发送错误通知
+ send_notification(
+ "Screen2Feishu 错误",
+ f"程序运行出错: {str(e)}"
+ )
+ sys.exit(1)
+
+ finally:
+ observer.join()
+ logger.info("程序已完全停止")
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..77738c0
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,18 @@
+# 核心依赖
+openai>=1.0.0
+watchdog>=3.0.0
+requests>=2.31.0
+pyyaml>=6.0.1
+
+# Web应用依赖
+flask>=2.3.0
+flask-cors>=4.0.0
+
+# 可选依赖
+# pillow>=10.0.0 # 如果需要本地图片处理
+# torch>=2.0.0 # 如果需要GPU加速
+
+# 开发依赖(可选)
+# pytest>=7.0.0 # 用于单元测试
+# black>=23.0.0 # 代码格式化
+# flake8>=6.0.0 # 代码检查
\ No newline at end of file
diff --git a/services/__init__.py b/services/__init__.py
new file mode 100644
index 0000000..56c425f
--- /dev/null
+++ b/services/__init__.py
@@ -0,0 +1,11 @@
+"""
+服务模块
+"""
+
+from .ai_service import AIService
+from .feishu_service import FeishuService
+
+__all__ = [
+ 'AIService',
+ 'FeishuService'
+]
\ No newline at end of file
diff --git a/services/ai_service.py b/services/ai_service.py
new file mode 100644
index 0000000..fe4c177
--- /dev/null
+++ b/services/ai_service.py
@@ -0,0 +1,624 @@
+import base64
+import json
+import datetime
+import time
+import re
+from typing import Dict, Optional, List
+from openai import OpenAI
+from openai import APIError, RateLimitError, AuthenticationError
+
+
+class AIService:
+ def __init__(self, config):
+ """
+ 初始化 AI 服务
+ :param config: 包含 ai 配置的字典 (来自 config.yaml)
+ """
+ self.api_key = config.get('api_key')
+ self.base_url = config.get('base_url')
+ self.model = config.get('model', 'gpt-4o')
+ self.max_retries = config.get('max_retries', 3)
+ self.retry_delay = config.get('retry_delay', 1.0)
+
+ # 初始化 OpenAI 客户端 (兼容所有支持 OpenAI 格式的 API)
+ self.client = OpenAI(
+ api_key=self.api_key,
+ base_url=self.base_url
+ )
+
+ # 支持的图片格式
+ self.supported_formats = {'.png', '.jpg', '.jpeg', '.bmp', '.gif'}
+
+ # Prompt模板缓存
+ self.prompt_templates = {}
+
+ def _encode_image(self, image_path):
+ """将本地图片转换为 Base64 编码"""
+ try:
+ with open(image_path, "rb") as image_file:
+ return base64.b64encode(image_file.read()).decode('utf-8')
+ except Exception as e:
+ raise Exception(f"图片编码失败: {str(e)}")
+
+ def _encode_image_from_bytes(self, image_bytes: bytes) -> str:
+ """将图片字节数据转换为 Base64 编码"""
+ try:
+ return base64.b64encode(image_bytes).decode('utf-8')
+ except Exception as e:
+ raise Exception(f"图片字节数据编码失败: {str(e)}")
+
+ def _validate_image_file(self, image_path):
+ """验证图片文件"""
+ from pathlib import Path
+
+ file_path = Path(image_path)
+
+ # 检查文件是否存在
+ if not file_path.exists():
+ raise FileNotFoundError(f"图片文件不存在: {image_path}")
+
+ # 检查文件大小(限制为10MB)
+ file_size = file_path.stat().st_size
+ if file_size > 10 * 1024 * 1024: # 10MB
+ raise ValueError(f"图片文件过大: {file_size / 1024 / 1024:.2f}MB (最大10MB)")
+
+ # 检查文件格式
+ if file_path.suffix.lower() not in self.supported_formats:
+ raise ValueError(f"不支持的图片格式: {file_path.suffix}")
+
+ return True
+
+ def _build_prompt(self, current_date_str: str) -> str:
+ """构建AI提示词"""
+ prompt = f"""
+ 你是一个专业的项目管理助手。当前系统日期是:{current_date_str}。
+ 你的任务是从用户上传的图片(可能是聊天记录、邮件、文档截图)中提取任务信息。
+
+ 请严格按照以下 JSON 格式返回结果:
+ {{
+ "task_description": "任务的具体描述,简练概括",
+ "priority": "必须从以下选项中选择一个: ['紧急', '较紧急', '一般', '普通']",
+ "status": "必须从以下选项中选择一个: ['已停滞','待开始', '进行中', '已完成']",
+ "latest_progress": "图片中提到的最新进展,如果没有则留空字符串",
+ "initiator": "任务发起人姓名。请仔细识别图片中的发件人/发送者名字。如果是邮件截图,请识别发件人;如果是聊天记录,请识别发送者。如果没有明确的发起人,则留空字符串。",
+ "department": "发起人部门,如果没有则留空字符串",
+ "start_date": "YYYY-MM-DD 格式的日期字符串。如果提到'今天'就是当前日期,'下周一'请根据当前日期计算。如果没提到则返回 null",
+ "due_date": "YYYY-MM-DD 格式的截止日期字符串。逻辑同上,如果没提到则返回 null"
+ }}
+
+ 注意:
+ 1. 如果图片中包含多个任务,请只提取最核心的一个。
+ 2. 请特别关注图片中的发件人/发送者信息,准确提取姓名。
+ 3. 如果识别到的名字可能存在重名,请在任务描述中添加提示信息。
+ 4. 不要返回 Markdown 代码块标记(如 ```json),直接返回纯 JSON 字符串。
+ 5. 确保返回的 JSON 格式正确,可以被 Python 的 json.loads() 解析。
+ """
+ return prompt
+
+ def _parse_ai_response(self, content: Optional[str]) -> Optional[Dict]:
+ """解析AI响应"""
+ if not content:
+ raise ValueError("AI响应内容为空")
+
+ try:
+ # 清理可能的 markdown 标记
+ content = content.replace("```json", "").replace("```", "").strip()
+
+ # 尝试修复常见的JSON格式问题
+ # 1. 处理未闭合的引号
+ content = self._fix_json_string(content)
+
+ # 2. 处理转义字符问题
+ content = self._fix_json_escapes(content)
+
+ # 解析JSON
+ result_dict = json.loads(content)
+
+ # 验证必需的字段
+ required_fields = ['task_description', 'priority', 'status']
+ for field in required_fields:
+ if field not in result_dict:
+ raise ValueError(f"AI响应缺少必需字段: {field}")
+
+ # 验证选项值
+ valid_priorities = ['紧急', '较紧急', '一般', '普通']
+ valid_statuses = ['已停滞','待开始', '进行中', '已完成']
+
+ if result_dict.get('priority') not in valid_priorities:
+ raise ValueError(f"无效的优先级: {result_dict.get('priority')}")
+
+ if result_dict.get('status') not in valid_statuses:
+ raise ValueError(f"无效的状态: {result_dict.get('status')}")
+
+ return result_dict
+
+ except json.JSONDecodeError as e:
+ # 尝试更详细的错误修复
+ try:
+ # 如果标准解析失败,尝试使用更宽松的解析
+ content = self._aggressive_json_fix(content)
+ result_dict = json.loads(content)
+
+ # 重新验证字段
+ required_fields = ['task_description', 'priority', 'status']
+ for field in required_fields:
+ if field not in result_dict:
+ raise ValueError(f"AI响应缺少必需字段: {field}")
+
+ # 重新验证选项值
+ valid_priorities = ['紧急', '较紧急', '一般', '普通']
+ valid_statuses = ['已停滞','待开始', '进行中', '已完成']
+
+ if result_dict.get('priority') not in valid_priorities:
+ raise ValueError(f"无效的优先级: {result_dict.get('priority')}")
+
+ if result_dict.get('status') not in valid_statuses:
+ raise ValueError(f"无效的状态: {result_dict.get('status')}")
+
+ return result_dict
+ except Exception as retry_error:
+ raise ValueError(f"AI响应不是有效的JSON: {str(e)} (修复后错误: {str(retry_error)})")
+ except Exception as e:
+ raise ValueError(f"解析AI响应失败: {str(e)}")
+
+ def _fix_json_string(self, content: str) -> str:
+ """修复JSON字符串中的未闭合引号问题"""
+ import re
+
+ # 查找可能未闭合的字符串
+ # 匹配模式:从引号开始,但没有对应的闭合引号
+ lines = content.split('\n')
+ fixed_lines = []
+
+ for line in lines:
+ # 检查行中是否有未闭合的引号
+ in_string = False
+ escaped = False
+ fixed_line = []
+
+ for char in line:
+ if escaped:
+ fixed_line.append(char)
+ escaped = False
+ continue
+
+ if char == '\\':
+ escaped = True
+ fixed_line.append(char)
+ continue
+
+ if char == '"':
+ if in_string:
+ in_string = False
+ else:
+ in_string = True
+ fixed_line.append(char)
+ else:
+ fixed_line.append(char)
+
+ # 如果行结束时仍在字符串中,添加闭合引号
+ if in_string:
+ fixed_line.append('"')
+
+ fixed_lines.append(''.join(fixed_line))
+
+ return '\n'.join(fixed_lines)
+
+ def _fix_json_escapes(self, content: str) -> str:
+ """修复JSON转义字符问题"""
+ # 注意:我们不应该转义JSON结构中的引号,只转义字符串内容中的引号
+ # 这个函数暂时不处理换行符,因为JSON中的换行符是有效的
+ # 更复杂的转义修复应该在JSON解析后进行
+
+ return content
+
+ def _aggressive_json_fix(self, content: str) -> str:
+ """更激进的JSON修复策略"""
+ # 1. 移除可能的非JSON内容
+ content = re.sub(r'^[^{]*', '', content) # 移除JSON前的非JSON内容
+ content = re.sub(r'[^}]*$', '', content) # 移除JSON后的非JSON内容
+
+ # 2. 确保JSON对象闭合
+ if not content.strip().endswith('}'):
+ content = content.strip() + '}'
+
+ # 3. 确保JSON对象开始
+ if not content.strip().startswith('{'):
+ content = '{' + content.strip()
+
+ # 4. 处理常见的AI响应格式问题
+ # 移除可能的Markdown代码块标记
+ content = content.replace('```json', '').replace('```', '')
+
+ # 5. 处理可能的多余空格和换行
+ content = ' '.join(content.split())
+
+ return content
+
+ def _call_ai_with_retry(self, image_path: str, prompt: str) -> Optional[str]:
+ """调用AI API,带重试机制"""
+ base64_image = self._encode_image(image_path)
+
+ for attempt in range(self.max_retries):
+ try:
+ # 尝试使用response_format参数(适用于OpenAI格式的API)
+ try:
+ response = self.client.chat.completions.create(
+ model=self.model,
+ messages=[
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{base64_image}"
+ }
+ }
+ ]
+ }
+ ],
+ response_format={"type": "json_object"},
+ max_tokens=2000
+ )
+ except Exception as e:
+ # 如果response_format参数不支持,尝试不使用该参数
+ print(f"⚠️ response_format参数不支持,尝试不使用该参数: {str(e)}")
+ response = self.client.chat.completions.create(
+ model=self.model,
+ messages=[
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{base64_image}"
+ }
+ }
+ ]
+ }
+ ],
+ max_tokens=2000
+ )
+
+ if not response.choices:
+ return None
+
+ content = response.choices[0].message.content
+ return content if content else None
+
+ except RateLimitError as e:
+ if attempt < self.max_retries - 1:
+ wait_time = self.retry_delay * (2 ** attempt) # 指数退避
+ time.sleep(wait_time)
+ continue
+ raise Exception(f"API速率限制: {str(e)}")
+
+ except AuthenticationError as e:
+ raise Exception(f"API认证失败: {str(e)}")
+
+ except APIError as e:
+ if attempt < self.max_retries - 1:
+ wait_time = self.retry_delay * (2 ** attempt)
+ time.sleep(wait_time)
+ continue
+ raise Exception(f"API调用失败: {str(e)}")
+
+ except Exception as e:
+ if attempt < self.max_retries - 1:
+ wait_time = self.retry_delay * (2 ** attempt)
+ time.sleep(wait_time)
+ continue
+ raise Exception(f"未知错误: {str(e)}")
+
+ raise Exception(f"AI调用失败,已重试 {self.max_retries} 次")
+
+ def _call_ai_with_retry_from_bytes(self, image_bytes: bytes, prompt: str, image_name: str = "memory_image") -> Optional[str]:
+ """调用AI API,带重试机制(从内存字节数据)"""
+ base64_image = self._encode_image_from_bytes(image_bytes)
+
+ for attempt in range(self.max_retries):
+ try:
+ # 尝试使用response_format参数(适用于OpenAI格式的API)
+ try:
+ response = self.client.chat.completions.create(
+ model=self.model,
+ messages=[
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{base64_image}"
+ }
+ }
+ ]
+ }
+ ],
+ response_format={"type": "json_object"},
+ max_tokens=2000
+ )
+ except Exception as e:
+ # 如果response_format参数不支持,尝试不使用该参数
+ print(f"⚠️ response_format参数不支持,尝试不使用该参数: {str(e)}")
+ response = self.client.chat.completions.create(
+ model=self.model,
+ messages=[
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{base64_image}"
+ }
+ }
+ ]
+ }
+ ],
+ max_tokens=2000
+ )
+
+ if not response.choices:
+ return None
+
+ content = response.choices[0].message.content
+ return content if content else None
+
+ except RateLimitError as e:
+ if attempt < self.max_retries - 1:
+ wait_time = self.retry_delay * (2 ** attempt) # 指数退避
+ time.sleep(wait_time)
+ continue
+ raise Exception(f"API速率限制: {str(e)}")
+
+ except AuthenticationError as e:
+ raise Exception(f"API认证失败: {str(e)}")
+
+ except APIError as e:
+ if attempt < self.max_retries - 1:
+ wait_time = self.retry_delay * (2 ** attempt)
+ time.sleep(wait_time)
+ continue
+ raise Exception(f"API调用失败: {str(e)}")
+
+ except Exception as e:
+ if attempt < self.max_retries - 1:
+ wait_time = self.retry_delay * (2 ** attempt)
+ time.sleep(wait_time)
+ continue
+ raise Exception(f"未知错误: {str(e)}")
+
+ raise Exception(f"AI调用失败,已重试 {self.max_retries} 次")
+
+ for attempt in range(self.max_retries):
+ try:
+ # 尝试使用response_format参数(适用于OpenAI格式的API)
+ try:
+ response = self.client.chat.completions.create(
+ model=self.model,
+ messages=[
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{base64_image}"
+ }
+ }
+ ]
+ }
+ ],
+ response_format={"type": "json_object"},
+ max_tokens=2000
+ )
+ except Exception as e:
+ # 如果response_format参数不支持,尝试不使用该参数
+ print(f"⚠️ response_format参数不支持,尝试不使用该参数: {str(e)}")
+ response = self.client.chat.completions.create(
+ model=self.model,
+ messages=[
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{base64_image}"
+ }
+ }
+ ]
+ }
+ ],
+ max_tokens=2000
+ )
+
+ if not response.choices:
+ return None
+
+ content = response.choices[0].message.content
+ return content if content else None
+
+ except RateLimitError as e:
+ if attempt < self.max_retries - 1:
+ wait_time = self.retry_delay * (2 ** attempt) # 指数退避
+ time.sleep(wait_time)
+ continue
+ raise Exception(f"API速率限制: {str(e)}")
+
+ except AuthenticationError as e:
+ raise Exception(f"API认证失败: {str(e)}")
+
+ except APIError as e:
+ if attempt < self.max_retries - 1:
+ wait_time = self.retry_delay * (2 ** attempt)
+ time.sleep(wait_time)
+ continue
+ raise Exception(f"API调用失败: {str(e)}")
+
+ except Exception as e:
+ if attempt < self.max_retries - 1:
+ wait_time = self.retry_delay * (2 ** attempt)
+ time.sleep(wait_time)
+ continue
+ raise Exception(f"未知错误: {str(e)}")
+
+ raise Exception(f"AI调用失败,已重试 {self.max_retries} 次")
+
+ def analyze_image(self, image_path):
+ """
+ 核心方法:发送图片到 AI 并获取结构化数据
+ :param image_path: 图片文件的路径
+ :return: 解析后的字典 (Dict)
+ """
+ from pathlib import Path
+
+ file_path = Path(image_path)
+ # 使用sys.stdout.write替代print,避免编码问题
+ import sys
+ sys.stdout.write(f" [AI] 正在分析图片: {file_path.name} ...\n")
+ sys.stdout.flush()
+
+ try:
+ # 1. 验证图片文件
+ self._validate_image_file(image_path)
+
+ # 2. 获取当前日期 (用于辅助 AI 推断相对时间)
+ now = datetime.datetime.now()
+ current_date_str = now.strftime("%Y-%m-%d %A") # 例如: 2023-10-27 Sunday
+
+ # 3. 构建 Prompt
+ prompt = self._build_prompt(current_date_str)
+
+ # 4. 调用AI API(带重试机制)
+ content = self._call_ai_with_retry(image_path, prompt)
+
+ # 5. 解析结果
+ if not content:
+ import sys
+ sys.stdout.write(f" [AI] AI返回空内容\n")
+ sys.stdout.flush()
+ return None
+
+ result_dict = self._parse_ai_response(content)
+
+ # 记录成功日志
+ if result_dict:
+ task_desc = result_dict.get('task_description', '')
+ if task_desc and len(task_desc) > 30:
+ task_desc = task_desc[:30] + "..."
+ import sys
+ sys.stdout.write(f" [AI] 识别成功: {task_desc}\n")
+ sys.stdout.flush()
+
+ return result_dict
+
+ except Exception as e:
+ import sys
+ sys.stdout.write(f" [AI] 分析失败: {str(e)}\n")
+ sys.stdout.flush()
+ return None
+
+ def analyze_image_from_bytes(self, image_bytes: bytes, image_name: str = "memory_image"):
+ """
+ 核心方法:从内存中的图片字节数据发送到 AI 并获取结构化数据
+ :param image_bytes: 图片的字节数据
+ :param image_name: 图片名称(用于日志)
+ :return: 解析后的字典 (Dict)
+ """
+ # 使用sys.stdout.write替代print,避免编码问题
+ import sys
+ sys.stdout.write(f" [AI] 正在分析内存图片: {image_name} ...\n")
+ sys.stdout.flush()
+
+ try:
+ # 1. 验证图片数据大小
+ if len(image_bytes) > 10 * 1024 * 1024: # 10MB
+ raise ValueError(f"图片数据过大: {len(image_bytes) / 1024 / 1024:.2f}MB (最大10MB)")
+
+ # 2. 获取当前日期 (用于辅助 AI 推断相对时间)
+ now = datetime.datetime.now()
+ current_date_str = now.strftime("%Y-%m-%d %A") # 例如: 2023-10-27 Sunday
+
+ # 3. 构建 Prompt
+ prompt = self._build_prompt(current_date_str)
+
+ # 4. 调用AI API(带重试机制)
+ content = self._call_ai_with_retry_from_bytes(image_bytes, prompt, image_name)
+
+ # 5. 解析结果
+ if not content:
+ import sys
+ sys.stdout.write(f" [AI] AI返回空内容\n")
+ sys.stdout.flush()
+ return None
+
+ result_dict = self._parse_ai_response(content)
+
+ # 记录成功日志
+ if result_dict:
+ task_desc = result_dict.get('task_description', '')
+ if task_desc and len(task_desc) > 30:
+ task_desc = task_desc[:30] + "..."
+ import sys
+ sys.stdout.write(f" [AI] 识别成功: {task_desc}\n")
+ sys.stdout.flush()
+
+ return result_dict
+
+ except Exception as e:
+ import sys
+ sys.stdout.write(f" [AI] 分析失败: {str(e)}\n")
+ sys.stdout.flush()
+ return None
+
+ def analyze_image_batch(self, image_paths: List[str]) -> Dict[str, Optional[Dict]]:
+ """
+ 批量分析图片
+ :param image_paths: 图片文件路径列表
+ :return: 字典,键为图片路径,值为分析结果
+ """
+ results = {}
+
+ for image_path in image_paths:
+ try:
+ result = self.analyze_image(image_path)
+ results[image_path] = result
+ except Exception as e:
+ import sys
+ sys.stdout.write(f" [AI] 批量处理失败 {image_path}: {str(e)}\n")
+ sys.stdout.flush()
+ results[image_path] = None
+
+ return results
+
+# ================= 单元测试 =================
+if __name__ == "__main__":
+ # 在这里填入你的配置进行测试
+ test_config = {
+ "api_key": "sk-xxxxxxxxxxxxxxxxxxxxxxxx",
+ "base_url": "https://api.openai.com/v1",
+ "model": "gpt-4o"
+ }
+
+ # 确保目录下有一张名为 test_img.jpg 的图片
+ import os
+ if os.path.exists("test_img.jpg"):
+ ai = AIService(test_config)
+ res = ai.analyze_image("test_img.jpg")
+ import sys
+ sys.stdout.write(json.dumps(res, indent=2, ensure_ascii=False) + "\n")
+ sys.stdout.flush()
+ else:
+ import sys
+ sys.stdout.write("请在同级目录下放一张 test_img.jpg 用于测试\n")
+ sys.stdout.flush()
\ No newline at end of file
diff --git a/services/feishu_service.py b/services/feishu_service.py
new file mode 100644
index 0000000..5953a33
--- /dev/null
+++ b/services/feishu_service.py
@@ -0,0 +1,489 @@
+import requests
+import json
+import time
+import logging
+from datetime import datetime
+from typing import Dict, Optional, List
+from pathlib import Path
+from utils.user_cache import UserCacheManager
+
+
+class FeishuService:
+ def __init__(self, config):
+ """
+ 初始化飞书服务
+ :param config: 包含飞书配置的字典 (来自 config.yaml)
+ """
+ self.app_id = config.get('app_id')
+ self.app_secret = config.get('app_secret')
+ self.app_token = config.get('app_token') # 多维表格的 Base Token
+ self.table_id = config.get('table_id') # 数据表 ID
+
+ # 内部变量,用于缓存 Token
+ self._tenant_access_token = None
+ self._token_expire_time = 0
+
+ # 重试配置
+ self.max_retries = config.get('max_retries', 3)
+ self.retry_delay = config.get('retry_delay', 1.0)
+
+ # 用户匹配配置
+ user_matching_config = config.get('user_matching', {})
+ self.user_matching_enabled = user_matching_config.get('enabled', True)
+ self.fallback_to_random = user_matching_config.get('fallback_to_random', True)
+ self.cache_enabled = user_matching_config.get('cache_enabled', True)
+ self.cache_file = user_matching_config.get('cache_file', './data/user_cache.json')
+
+ # 用户缓存管理器
+ self.user_cache = UserCacheManager(self.cache_file) if self.cache_enabled else None
+
+ # 支持的字段类型映射
+ self.field_type_map = {
+ 'text': 'text',
+ 'number': 'number',
+ 'date': 'date',
+ 'select': 'single_select',
+ 'multi_select': 'multiple_select',
+ 'checkbox': 'checkbox',
+ 'url': 'url',
+ 'email': 'email',
+ 'phone': 'phone',
+ 'user': 'user' # 用户字段类型
+ }
+
+ # 验证必需的配置
+ self._validate_config()
+
+ def _validate_config(self):
+ """验证飞书配置"""
+ if not self.app_id:
+ raise ValueError("飞书配置缺少 app_id")
+ if not self.app_secret:
+ raise ValueError("飞书配置缺少 app_secret")
+ if not self.app_token:
+ raise ValueError("飞书配置缺少 app_token")
+ if not self.table_id:
+ raise ValueError("飞书配置缺少 table_id")
+
+ # 验证用户匹配配置
+ if self.user_matching_enabled and not self.cache_enabled:
+ logging.warning("用户匹配已启用但缓存未启用,可能影响性能")
+
+ def _get_token(self, retry_count: int = 0) -> Optional[str]:
+ """
+ 获取 tenant_access_token (带缓存机制和重试)
+ 如果 Token 还有效,直接返回;否则重新请求。
+ """
+ now = time.time()
+ # 如果 token 存在且离过期还有60秒以上,直接复用
+ if self._tenant_access_token and now < self._token_expire_time - 60:
+ return self._tenant_access_token
+
+ url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
+ headers = {"Content-Type": "application/json; charset=utf-8"}
+ payload = {
+ "app_id": self.app_id,
+ "app_secret": self.app_secret
+ }
+
+ try:
+ resp = requests.post(url, headers=headers, json=payload, timeout=10)
+ resp_data = resp.json()
+
+ if resp_data.get("code") == 0:
+ self._tenant_access_token = resp_data.get("tenant_access_token")
+ # expire_in 通常是 7200 秒,我们记录下过期的绝对时间戳
+ self._token_expire_time = now + resp_data.get("expire", 7200)
+ logging.info("🔄 [Feishu] Token 已刷新")
+ return self._tenant_access_token
+ else:
+ error_msg = resp_data.get("msg", "未知错误")
+ raise Exception(f"获取飞书 Token 失败: {error_msg}")
+
+ except requests.exceptions.RequestException as e:
+ if retry_count < self.max_retries:
+ wait_time = self.retry_delay * (2 ** retry_count)
+ logging.warning(f"网络错误,{wait_time}秒后重试: {str(e)}")
+ time.sleep(wait_time)
+ return self._get_token(retry_count + 1)
+ logging.error(f" [Feishu] 网络错误,已重试 {self.max_retries} 次: {str(e)}")
+ return None
+ except Exception as e:
+ logging.error(f" [Feishu] 认证错误: {str(e)}")
+ return None
+
+ def _date_to_timestamp(self, date_str: str) -> Optional[int]:
+ """
+ 将 YYYY-MM-DD 字符串转换为飞书需要的毫秒级时间戳
+ """
+ if not date_str or date_str == "null":
+ return None
+ try:
+ # 假设 AI 返回的是 YYYY-MM-DD
+ dt = datetime.strptime(date_str, "%Y-%m-%d")
+ # 转换为毫秒时间戳
+ return int(dt.timestamp() * 1000)
+ except ValueError:
+ logging.warning(f"⚠️ [Feishu] 日期格式无法解析: {date_str},将留空。")
+ return None
+
+ def _validate_ai_data(self, ai_data: Dict) -> bool:
+ """验证AI返回的数据"""
+ required_fields = ['task_description', 'priority', 'status']
+
+ for field in required_fields:
+ if field not in ai_data:
+ logging.error(f" [Feishu] AI数据缺少必需字段: {field}")
+ return False
+
+ # 验证选项值
+ valid_priorities = ['紧急', '较紧急', '一般', '普通']
+ valid_statuses = [' 已停滞', '待开始', '进行中', ',已完成']
+
+ if ai_data.get('priority') not in valid_priorities:
+ logging.error(f" [Feishu] 无效的优先级: {ai_data.get('priority')}")
+ return False
+
+ if ai_data.get('status') not in valid_statuses:
+ logging.error(f" [Feishu] 无效的状态: {ai_data.get('status')}")
+ return False
+
+ return True
+
+ def _build_fields(self, ai_data: Dict) -> Dict:
+ """构建飞书字段数据"""
+ # --- 字段映射区域 (Key 必须与飞书表格列名完全一致) ---
+ fields = {
+ "任务描述": ai_data.get("task_description", ""),
+ "重要紧急程度": ai_data.get("priority", "普通"), # 默认值防止报错
+ "进展": ai_data.get("status", "待开始"),
+ "最新进展记录": ai_data.get("latest_progress", "进行中,请及时更新"),
+ # 任务发起方.部门是多选字段,需要列表格式
+ "任务发起方.部门": self._build_multi_select_field(ai_data.get("department", "")) # 修复字段名:添加点号
+ }
+
+ # 处理任务发起方
+ initiator_name = ai_data.get("initiator", "")
+ if initiator_name and self.user_matching_enabled:
+ # 尝试匹配用户
+ user_match = self._match_initiator(initiator_name)
+ if user_match:
+ if user_match.get('matched'):
+ # 成功匹配到用户ID
+ fields["任务发起方"] = self._build_user_field(user_match['user_id'])
+ logging.info(f"成功匹配任务发起方: {initiator_name} -> {user_match['user_name']}")
+ else:
+ # 未匹配到用户,添加待确认标记到任务描述
+ fields["任务描述"] = f"[待确认发起人] {fields['任务描述']}"
+ # 在任务发起方.部门字段中添加待确认信息(作为提示)
+ existing_dept = ai_data.get("department", "")
+ if existing_dept:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (待确认: {initiator_name})")
+ else:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"待确认: {initiator_name}")
+ logging.warning(f"未匹配到用户: {initiator_name},标记为待确认")
+
+ # 添加到最近联系人
+ if self.user_cache:
+ self.user_cache.add_recent_contact(initiator_name)
+ else:
+ # 匹配失败,使用随机选择或留空
+ if self.fallback_to_random and self.user_cache:
+ random_contact = self.user_cache.get_random_recent_contact()
+ if random_contact:
+ fields["任务描述"] = f"[随机匹配] {fields['任务描述']}"
+ # 在任务发起方.部门字段中添加随机匹配信息
+ existing_dept = ai_data.get("department", "")
+ if existing_dept:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (随机: {random_contact})")
+ else:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"随机: {random_contact}")
+ logging.info(f"随机匹配任务发起方: {random_contact}")
+ else:
+ fields["任务描述"] = f"[待确认发起人] {fields['任务描述']}"
+ # 在任务发起方.部门字段中添加待确认信息
+ existing_dept = ai_data.get("department", "")
+ if existing_dept:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (待确认: {initiator_name})")
+ else:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"待确认: {initiator_name}")
+ logging.warning(f"无最近联系人可用,标记为待确认: {initiator_name}")
+ else:
+ fields["任务描述"] = f"[待确认发起人] {fields['任务描述']}"
+ # 在任务发起方.部门字段中添加待确认信息
+ existing_dept = ai_data.get("department", "")
+ if existing_dept:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (待确认: {initiator_name})")
+ else:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"待确认: {initiator_name}")
+ logging.warning(f"未匹配到用户且未启用随机匹配: {initiator_name}")
+ elif initiator_name:
+ # 用户匹配未启用,直接使用名字
+ # 在任务发起方.部门字段中添加发起人信息
+ existing_dept = ai_data.get("department", "")
+ if existing_dept:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"{existing_dept} (发起人: {initiator_name})")
+ else:
+ fields["任务发起方.部门"] = self._build_multi_select_field(f"发起人: {initiator_name}")
+
+ # 移除用户字段,避免UserFieldConvFail错误(如果匹配失败)
+ if "任务发起方" not in fields:
+ fields.pop("任务发起方", None)
+
+ # 处理日期字段 (AI 返回的是字符串,飞书写入最好用时间戳)
+ start_date = ai_data.get("start_date")
+ if start_date and isinstance(start_date, str):
+ start_ts = self._date_to_timestamp(start_date)
+ if start_ts:
+ fields["开始日期"] = start_ts
+
+ due_date = ai_data.get("due_date")
+ if due_date and isinstance(due_date, str):
+ due_ts = self._date_to_timestamp(due_date)
+ if due_ts:
+ fields["预计完成日期"] = due_ts
+
+ return fields
+
+ def _build_multi_select_field(self, value: str) -> Optional[List]:
+ """构建飞书多选字段"""
+ if not value:
+ return None
+
+ # 多选字段需要以列表格式提供
+ # 格式:["选项1", "选项2"]
+ return [value]
+
+ def _call_feishu_api(self, url: str, headers: Dict, payload: Dict, retry_count: int = 0) -> Optional[Dict]:
+ """调用飞书API,带重试机制"""
+ try:
+ response = requests.post(url, headers=headers, json=payload, timeout=30)
+ res_json = response.json()
+
+ if res_json.get("code") == 0:
+ return res_json
+ else:
+ error_msg = res_json.get("msg", "未知错误")
+ error_code = res_json.get("code", "未知错误码")
+ raise Exception(f"飞书API错误 {error_code}: {error_msg}")
+
+ except requests.exceptions.RequestException as e:
+ if retry_count < self.max_retries:
+ wait_time = self.retry_delay * (2 ** retry_count)
+ logging.warning(f"网络错误,{wait_time}秒后重试: {str(e)}")
+ time.sleep(wait_time)
+ return self._call_feishu_api(url, headers, payload, retry_count + 1)
+ logging.error(f" [Feishu] 网络错误,已重试 {self.max_retries} 次: {str(e)}")
+ return None
+ except Exception as e:
+ logging.error(f" [Feishu] API调用失败: {str(e)}")
+ return None
+
+ def add_task(self, ai_data: Optional[Dict]) -> bool:
+ """
+ 将 AI 识别的数据写入多维表格
+ :param ai_data: AI 返回的 JSON 字典
+ :return: 是否成功
+ """
+ # 验证AI数据
+ if not ai_data or not self._validate_ai_data(ai_data):
+ return False
+
+ token = self._get_token()
+ if not token:
+ logging.error(" [Feishu] 无法获取 Token,跳过写入。")
+ return False
+
+ url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{self.app_token}/tables/{self.table_id}/records"
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json; charset=utf-8"
+ }
+
+ # 构建字段数据
+ fields = self._build_fields(ai_data)
+ payload = {"fields": fields}
+
+ # 调用API
+ res_json = self._call_feishu_api(url, headers, payload)
+
+ if res_json and res_json.get("code") == 0:
+ record_id = res_json['data']['record']['record_id']
+ logging.info(f" [Feishu] 成功写入一条记录!(Record ID: {record_id})")
+ return True
+ else:
+ logging.error(f" [Feishu] 写入失败: {json.dumps(res_json, ensure_ascii=False) if res_json else '无响应'}")
+ return False
+
+ def add_task_batch(self, ai_data_list: List[Dict]) -> Dict[str, bool]:
+ """
+ 批量写入任务到飞书
+ :param ai_data_list: AI数据列表
+ :return: 字典,键为数据索引,值为是否成功
+ """
+ results = {}
+
+ for index, ai_data in enumerate(ai_data_list):
+ try:
+ success = self.add_task(ai_data)
+ results[str(index)] = success
+ except Exception as e:
+ logging.error(f" [Feishu] 批量写入失败 (索引 {index}): {str(e)}")
+ results[str(index)] = False
+
+ return results
+
+ def _match_initiator(self, initiator_name: str) -> Optional[Dict]:
+ """
+ 匹配任务发起人
+
+ Args:
+ initiator_name: AI识别的发起人名字
+
+ Returns:
+ 匹配结果字典,包含matched, user_id, user_name等字段
+ """
+ if not initiator_name or not self.user_matching_enabled:
+ return None
+
+ # 确保用户缓存已初始化
+ if not self.user_cache:
+ return None
+
+ # 检查缓存是否过期,如果过期则重新获取用户列表
+ if self.user_cache.is_cache_expired(max_age_hours=24):
+ logging.info("用户缓存已过期,重新获取用户列表")
+ users = self._get_user_list()
+ if users:
+ self.user_cache.update_users(users)
+
+ # 尝试匹配用户
+ matched_user = self.user_cache.match_user_by_name(initiator_name)
+
+ if matched_user:
+ return {
+ 'matched': True,
+ 'user_id': matched_user.get('user_id'),
+ 'user_name': matched_user.get('name')
+ }
+ else:
+ return {
+ 'matched': False,
+ 'user_name': initiator_name
+ }
+
+ def _get_user_list(self) -> Optional[List[Dict]]:
+ """
+ 从飞书API获取用户列表
+
+ Returns:
+ 用户列表,每个用户包含 name, user_id 等字段
+ """
+ token = self._get_token()
+ if not token:
+ logging.error(" [Feishu] 无法获取 Token,跳过用户列表获取。")
+ return None
+
+ url = "https://open.feishu.cn/open-apis/contact/v3/users"
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json; charset=utf-8"
+ }
+
+ try:
+ # 分页获取用户列表
+ all_users = []
+ page_token = None
+
+ while True:
+ params = {"page_size": 100}
+ if page_token:
+ params["page_token"] = page_token
+
+ response = requests.get(url, headers=headers, params=params, timeout=30)
+ res_json = response.json()
+
+ if res_json.get("code") == 0:
+ users = res_json.get("data", {}).get("items", [])
+ for user in users:
+ # 提取用户信息
+ user_info = {
+ 'user_id': user.get('user_id'),
+ 'name': user.get('name'),
+ 'email': user.get('email'),
+ 'mobile': user.get('mobile')
+ }
+ all_users.append(user_info)
+
+ # 检查是否有更多页面
+ page_token = res_json.get("data", {}).get("page_token")
+ if not page_token:
+ break
+ else:
+ error_msg = res_json.get("msg", "未知错误")
+ logging.error(f"获取用户列表失败: {error_msg}")
+ break
+
+ logging.info(f"成功获取 {len(all_users)} 个用户")
+ return all_users
+
+ except Exception as e:
+ logging.error(f"获取用户列表时出错: {str(e)}")
+ return None
+
+ def _build_user_field(self, user_id: str) -> Optional[Dict]:
+ """
+ 构建飞书用户字段
+
+ Args:
+ user_id: 用户ID
+
+ Returns:
+ 用户字段字典
+ """
+ if not user_id:
+ return None
+
+ return {
+ "id": user_id
+ }
+
+# ================= 单元测试代码 =================
+if __name__ == "__main__":
+ # 设置日志
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+ # 如果你直接运行这个文件,会执行下面的测试代码
+ # 这里填入真实的配置进行测试
+ test_config = {
+ "app_id": "cli_xxxxxxxxxxxx", # 换成你的 App ID
+ "app_secret": "xxxxxxxxxxxxxx", # 换成你的 App Secret
+ "app_token": "basxxxxxxxxxxxxx", # 换成你的 Base Token
+ "table_id": "tblxxxxxxxxxxxxx" # 换成你的 Table ID
+ }
+
+ # 模拟 AI 返回的数据
+ mock_ai_data = {
+ "task_description": "测试任务:编写 Feishu Service 模块",
+ "priority": "普通", # 确保飞书表格里有这个选项
+ "status": "进行中", # 确保飞书表格里有这个选项
+ "latest_progress": "代码框架已完成,正在调试 API",
+ "initiator": "Python脚本",
+ "department": "研发部",
+ "start_date": "2023-10-27",
+ "due_date": "2023-10-30"
+ }
+
+ try:
+ fs = FeishuService(test_config)
+ success = fs.add_task(mock_ai_data)
+ import sys
+ if success:
+ sys.stdout.write("测试成功!\n")
+ else:
+ sys.stdout.write("测试失败!\n")
+ sys.stdout.flush()
+ except Exception as e:
+ import sys
+ sys.stdout.write(f"测试出错: {str(e)}\n")
+ sys.stdout.flush()
\ No newline at end of file
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..d15ff0e
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,193 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Screen2Feishu 安装和设置脚本
+"""
+
+import os
+import sys
+import subprocess
+import shutil
+from pathlib import Path
+
+
+def print_header(text):
+ """打印标题"""
+ print("\n" + "=" * 60)
+ print(f" {text}")
+ print("=" * 60)
+
+
+def print_step(text):
+ """打印步骤"""
+ print(f"\n▶ {text}")
+
+
+def check_python_version():
+ """检查Python版本"""
+ print_step("检查Python版本...")
+
+ version = sys.version_info
+ if version.major < 3 or (version.major == 3 and version.minor < 8):
+ print(f" Python版本过低: {sys.version}")
+ print("请使用 Python 3.8 或更高版本")
+ return False
+
+ print(f" Python版本: {sys.version}")
+ return True
+
+
+def install_dependencies():
+ """安装依赖"""
+ print_step("安装依赖...")
+
+ try:
+ # 检查是否已安装pip
+ subprocess.run([sys.executable, "-m", "pip", "--version"],
+ check=True, capture_output=True)
+
+ # 安装依赖
+ print("正在安装核心依赖...")
+ subprocess.run([
+ sys.executable, "-m", "pip", "install",
+ "-r", "requirements.txt"
+ ], check=True)
+
+ print(" 依赖安装完成")
+ return True
+
+ except subprocess.CalledProcessError as e:
+ print(f" 依赖安装失败: {e}")
+ return False
+ except Exception as e:
+ print(f" 发生错误: {e}")
+ return False
+
+
+def create_directories():
+ """创建必要的目录"""
+ print_step("创建目录...")
+
+ directories = [
+ "monitor_images",
+ "processed_images",
+ "logs"
+ ]
+
+ for directory in directories:
+ path = Path(directory)
+ if not path.exists():
+ path.mkdir(exist_ok=True)
+ print(f" 创建目录: {directory}")
+ else:
+ print(f"ℹ️ 目录已存在: {directory}")
+
+ return True
+
+
+def create_config():
+ """创建配置文件"""
+ print_step("创建配置文件...")
+
+ config_path = Path("config.yaml")
+ example_path = Path("config.example.yaml")
+
+ if config_path.exists():
+ print("ℹ️ 配置文件已存在,跳过创建")
+ return True
+
+ if not example_path.exists():
+ print(" 示例配置文件不存在")
+ return False
+
+ # 复制示例配置文件
+ shutil.copy(example_path, config_path)
+ print(" 已创建 config.yaml,请根据需要编辑配置")
+
+ # 显示配置说明
+ print("\n📝 配置说明:")
+ print("1. 编辑 config.yaml 文件")
+ print("2. 填入你的 AI API Key")
+ print("3. 填入你的飞书应用信息:")
+ print(" - app_id")
+ print(" - app_secret")
+ print(" - app_token")
+ print(" - table_id")
+ print("4. 根据需要调整其他配置")
+
+ return True
+
+
+def show_usage():
+ """显示使用说明"""
+ print_step("使用说明:")
+
+ print("""
+1. 配置文件:
+ - 编辑 config.yaml,填入你的 API Key 和飞书信息
+
+2. 启动程序:
+ python main.py
+
+3. 测试模式:
+ python main.py --test
+
+4. 查看帮助:
+ python main.py --help
+
+5. 运行测试:
+ python -m unittest discover tests
+
+6. 查看文档:
+ - README.md - 项目说明
+ - docs/OPTIMIZED_R&D.md - 开发文档
+ - docs/PROJECT_IMPROVEMENT_PLAN.md - 改进计划
+""")
+
+
+def main():
+ """主函数"""
+ print_header("Screen2Feishu 安装向导")
+
+ print("欢迎使用 Screen2Feishu 安装向导!")
+ print("本脚本将帮助你完成以下步骤:")
+ print("1. 检查Python版本")
+ print("2. 安装依赖")
+ print("3. 创建必要的目录")
+ print("4. 创建配置文件")
+
+ # 询问用户是否继续
+ response = input("\n是否继续? (y/n): ").strip().lower()
+ if response not in ['y', 'yes']:
+ print("安装已取消")
+ return
+
+ # 执行安装步骤
+ steps = [
+ ("检查Python版本", check_python_version),
+ ("安装依赖", install_dependencies),
+ ("创建目录", create_directories),
+ ("创建配置文件", create_config),
+ ]
+
+ all_success = True
+ for step_name, step_func in steps:
+ print_header(step_name)
+ if not step_func():
+ print(f" {step_name} 失败")
+ all_success = False
+ break
+
+ if all_success:
+ print_header("安装完成!")
+ print(" 所有步骤完成!")
+ show_usage()
+ print("\n🎉 Screen2Feishu 安装成功!")
+ print("请编辑 config.yaml 文件,然后运行: python main.py")
+ else:
+ print_header("安装失败")
+ print(" 安装过程中出现问题,请检查错误信息并重试")
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/start_web_app.py b/start_web_app.py
new file mode 100644
index 0000000..809ac74
--- /dev/null
+++ b/start_web_app.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+启动Screen2Feishu Web应用
+"""
+
+import os
+import sys
+import subprocess
+import webbrowser
+from pathlib import Path
+
+def main():
+ """启动Web应用"""
+ print("=" * 60)
+ print("Screen2Feishu Web应用启动器")
+ print("=" * 60)
+
+ # 检查依赖
+ print("检查依赖...")
+ try:
+ import flask
+ import flask_cors
+ print("✓ Flask 已安装")
+ print("✓ Flask-CORS 已安装")
+ except ImportError as e:
+ print(f"✗ 缺少依赖: {str(e)}")
+ print("请运行: pip install flask flask-cors")
+ return 1
+
+ # 检查配置文件
+ config_path = Path("config.yaml")
+ if not config_path.exists():
+ print("✗ 配置文件不存在: config.yaml")
+ print("请复制 config.example.yaml 为 config.yaml 并配置")
+ return 1
+
+ # 检查必要的目录
+ directories = ["monitor_images", "processed_images", "data", "templates"]
+ for directory in directories:
+ Path(directory).mkdir(exist_ok=True)
+
+ # 检查模板文件
+ template_path = Path("templates/index.html")
+ if not template_path.exists():
+ print("✗ 模板文件不存在: templates/index.html")
+ print("请确保模板文件已创建")
+ return 1
+
+ print("✓ 所有检查通过")
+ print()
+
+ # 启动Web服务器
+ print("启动Web服务器...")
+ print("服务器地址: http://localhost:5000")
+ print("按 Ctrl+C 停止服务器")
+ print()
+
+ try:
+ # 启动Flask应用
+ subprocess.run([sys.executable, "web_app.py"])
+ except KeyboardInterrupt:
+ print("\n服务器已停止")
+ except Exception as e:
+ print(f"启动失败: {str(e)}")
+ return 1
+
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
\ No newline at end of file
diff --git a/templates/index.html b/templates/index.html
new file mode 100644
index 0000000..3a50978
--- /dev/null
+++ b/templates/index.html
@@ -0,0 +1,829 @@
+
+
+
+
+
+ Screen2Feishu - AI任务管理
+
+
+
+
+
+
+
+
+
+
+
+
+
📸 图片上传分析
+
+
+
📁
+
点击选择图片或拖拽到此处
+
支持 JPG, PNG, GIF 等格式
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
⚙️ 系统配置
+
+
+
当前配置
+
内存处理: -
+
用户匹配: -
+
随机匹配: -
+
缓存功能: -
+
文件处理: -
+
+
+
+
+
📊 操作说明
+
+
1. 图片上传: 选择截图文件,系统会自动分析图片内容
+
2. AI分析: 识别任务描述、优先级、发起人等信息
+
3. 用户匹配: 自动匹配飞书用户,处理重名情况
+
4. 写入飞书: 将分析结果写入飞书多维表格
+
5. 结果反馈: 显示AI分析结果和操作状态
+
+
+
+
+
📝 状态日志
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..a5a3dd1
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,5 @@
+"""
+单元测试模块
+"""
+
+# 空文件,使tests目录成为Python包
\ No newline at end of file
diff --git a/utils/__init__.py b/utils/__init__.py
new file mode 100644
index 0000000..1e43536
--- /dev/null
+++ b/utils/__init__.py
@@ -0,0 +1,54 @@
+"""
+工具模块
+"""
+
+from .config_loader import (
+ load_config,
+ validate_config,
+ save_config,
+ create_default_config,
+ create_sample_config,
+ get_config_summary
+)
+
+from .logger import (
+ setup_logger,
+ get_logger,
+ log_function_call,
+ log_function_result,
+ log_error_with_context,
+ create_log_rotation_handler
+)
+
+from .notifier import (
+ send_notification,
+ send_notification_with_icon,
+ play_sound,
+ notify_task_processed,
+ notify_batch_result
+)
+
+__all__ = [
+ # 配置加载器
+ 'load_config',
+ 'validate_config',
+ 'save_config',
+ 'create_default_config',
+ 'create_sample_config',
+ 'get_config_summary',
+
+ # 日志记录器
+ 'setup_logger',
+ 'get_logger',
+ 'log_function_call',
+ 'log_function_result',
+ 'log_error_with_context',
+ 'create_log_rotation_handler',
+
+ # 通知器
+ 'send_notification',
+ 'send_notification_with_icon',
+ 'play_sound',
+ 'notify_task_processed',
+ 'notify_batch_result'
+]
\ No newline at end of file
diff --git a/utils/config_loader.py b/utils/config_loader.py
new file mode 100644
index 0000000..e10b849
--- /dev/null
+++ b/utils/config_loader.py
@@ -0,0 +1,200 @@
+"""
+配置文件加载和验证工具
+"""
+
+import yaml
+from pathlib import Path
+from typing import Dict, Any
+
+
+def load_config(config_path: str) -> Dict[str, Any]:
+ """
+ 加载配置文件
+
+ Args:
+ config_path: 配置文件路径
+
+ Returns:
+ 配置字典
+
+ Raises:
+ FileNotFoundError: 配置文件不存在
+ yaml.YAMLError: 配置文件格式错误
+ """
+ config_file = Path(config_path)
+
+ if not config_file.exists():
+ raise FileNotFoundError(f"配置文件不存在: {config_path}")
+
+ try:
+ with open(config_file, 'r', encoding='utf-8') as f:
+ config = yaml.safe_load(f)
+ return config
+ except yaml.YAMLError as e:
+ raise yaml.YAMLError(f"配置文件格式错误: {str(e)}")
+
+
+def validate_config(config: Dict[str, Any]) -> None:
+ """
+ 验证配置文件的完整性和有效性
+
+ Args:
+ config: 配置字典
+
+ Raises:
+ ValueError: 配置验证失败
+ """
+ # 检查必需的顶级配置块
+ required_sections = ['ai', 'feishu', 'system']
+ for section in required_sections:
+ if section not in config:
+ raise ValueError(f"缺少必需的配置块: {section}")
+
+ # 验证AI配置
+ ai_config = config['ai']
+ if not ai_config.get('api_key'):
+ raise ValueError("AI配置缺少 api_key")
+ if not ai_config.get('base_url'):
+ raise ValueError("AI配置缺少 base_url")
+ if not ai_config.get('model'):
+ raise ValueError("AI配置缺少 model")
+
+ # 验证飞书配置
+ feishu_config = config['feishu']
+ if not feishu_config.get('app_id'):
+ raise ValueError("飞书配置缺少 app_id")
+ if not feishu_config.get('app_secret'):
+ raise ValueError("飞书配置缺少 app_secret")
+ if not feishu_config.get('app_token'):
+ raise ValueError("飞书配置缺少 app_token")
+ if not feishu_config.get('table_id'):
+ raise ValueError("飞书配置缺少 table_id")
+
+ # 验证系统配置
+ system_config = config['system']
+ if not system_config.get('watch_folder'):
+ raise ValueError("系统配置缺少 watch_folder")
+
+ # 验证后处理配置
+ post_process = system_config.get('post_process', 'keep')
+ valid_post_process = ['delete', 'move', 'keep']
+ if post_process not in valid_post_process:
+ raise ValueError(f"post_process 必须是 {valid_post_process} 之一,当前值: {post_process}")
+
+ if post_process == 'move' and not system_config.get('processed_folder'):
+ raise ValueError("当 post_process 为 'move' 时,必须配置 processed_folder")
+
+ # 验证路径格式
+ watch_folder = Path(system_config['watch_folder'])
+ if not watch_folder.is_absolute():
+ # 如果是相对路径,转换为绝对路径
+ watch_folder = Path.cwd() / watch_folder
+
+ # 检查监控文件夹是否存在(允许不存在,会在运行时创建)
+ if watch_folder.exists() and not watch_folder.is_dir():
+ raise ValueError(f"watch_folder 必须是一个目录: {watch_folder}")
+
+ # 如果配置了移动目标文件夹,检查其路径
+ if post_process == 'move':
+ processed_folder = Path(system_config['processed_folder'])
+ if not processed_folder.is_absolute():
+ processed_folder = Path.cwd() / processed_folder
+
+ if processed_folder.exists() and not processed_folder.is_dir():
+ raise ValueError(f"processed_folder 必须是一个目录: {processed_folder}")
+
+
+def save_config(config: Dict[str, Any], config_path: str) -> None:
+ """
+ 保存配置文件
+
+ Args:
+ config: 配置字典
+ config_path: 配置文件路径
+ """
+ config_file = Path(config_path)
+
+ # 确保目录存在
+ config_file.parent.mkdir(parents=True, exist_ok=True)
+
+ with open(config_file, 'w', encoding='utf-8') as f:
+ yaml.dump(config, f, default_flow_style=False, allow_unicode=True)
+
+
+def create_default_config() -> Dict[str, Any]:
+ """
+ 创建默认配置
+
+ Returns:
+ 默认配置字典
+ """
+ return {
+ 'ai': {
+ 'api_key': 'sk-xxxxxxxxxxxxxxxxxxxxxxxx',
+ 'base_url': 'https://api.siliconflow.cn/v1',
+ 'model': 'gemini-2.5-pro'
+ },
+ 'feishu': {
+ 'app_id': 'cli_xxxxxxxxxxxx',
+ 'app_secret': 'xxxxxxxxxxxxxxxxxxxxxxxx',
+ 'app_token': 'bascnxxxxxxxxxxxxxxx',
+ 'table_id': 'tblxxxxxxxxxxxx'
+ },
+ 'system': {
+ 'watch_folder': './monitor_images',
+ 'post_process': 'move',
+ 'processed_folder': './processed_images'
+ }
+ }
+
+
+def create_sample_config(config_path: str = 'config.example.yaml') -> None:
+ """
+ 创建示例配置文件
+
+ Args:
+ config_path: 示例配置文件路径
+ """
+ default_config = create_default_config()
+ save_config(default_config, config_path)
+ print(f"示例配置文件已创建: {config_path}")
+
+
+def get_config_summary(config: Dict[str, Any]) -> str:
+ """
+ 获取配置摘要信息
+
+ Args:
+ config: 配置字典
+
+ Returns:
+ 配置摘要字符串
+ """
+ summary = []
+ summary.append("配置摘要:")
+ summary.append(f" AI模型: {config['ai']['model']}")
+ summary.append(f" AI服务: {config['ai']['base_url']}")
+ summary.append(f" 监控文件夹: {config['system']['watch_folder']}")
+ summary.append(f" 后处理方式: {config['system'].get('post_process', 'keep')}")
+
+ if config['system'].get('post_process') == 'move':
+ summary.append(f" 处理后文件夹: {config['system']['processed_folder']}")
+
+ return '\n'.join(summary)
+
+
+if __name__ == "__main__":
+ # 测试配置验证
+ try:
+ # 创建示例配置
+ create_sample_config('config.example.yaml')
+ print("示例配置创建成功!")
+
+ # 加载并验证示例配置
+ config = load_config('config.example.yaml')
+ validate_config(config)
+ print("配置验证通过!")
+ print(get_config_summary(config))
+
+ except Exception as e:
+ print(f"配置测试失败: {e}")
\ No newline at end of file
diff --git a/utils/logger.py b/utils/logger.py
new file mode 100644
index 0000000..9ee5c2b
--- /dev/null
+++ b/utils/logger.py
@@ -0,0 +1,185 @@
+"""
+日志记录工具
+"""
+
+import logging
+import sys
+from pathlib import Path
+from typing import Optional
+
+
+def setup_logger(
+ name: str = 'screen2feishu',
+ log_file: Optional[str] = None,
+ level: str = 'INFO',
+ format: str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+) -> logging.Logger:
+ """
+ 设置日志记录器
+
+ Args:
+ name: 日志记录器名称
+ log_file: 日志文件路径,如果为None则只输出到控制台
+ level: 日志级别 (DEBUG, INFO, WARNING, ERROR, CRITICAL)
+ format: 日志格式
+
+ Returns:
+ 配置好的日志记录器
+ """
+ # 创建日志记录器
+ logger = logging.getLogger(name)
+
+ # 避免重复配置
+ if logger.handlers:
+ return logger
+
+ # 设置日志级别
+ level_map = {
+ 'DEBUG': logging.DEBUG,
+ 'INFO': logging.INFO,
+ 'WARNING': logging.WARNING,
+ 'ERROR': logging.ERROR,
+ 'CRITICAL': logging.CRITICAL
+ }
+
+ log_level = level_map.get(level.upper(), logging.INFO)
+ logger.setLevel(log_level)
+
+ # 创建格式化器
+ formatter = logging.Formatter(format, datefmt='%Y-%m-%d %H:%M:%S')
+
+ # 创建控制台处理器
+ # 在Windows系统上,确保使用UTF-8编码输出
+ if sys.platform == 'win32':
+ import io
+ console_handler = logging.StreamHandler(io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8'))
+ else:
+ console_handler = logging.StreamHandler(sys.stdout)
+ console_handler.setFormatter(formatter)
+ logger.addHandler(console_handler)
+
+ # 创建文件处理器(如果指定了日志文件)
+ if log_file:
+ log_path = Path(log_file)
+
+ # 确保日志目录存在
+ log_path.parent.mkdir(parents=True, exist_ok=True)
+
+ file_handler = logging.FileHandler(log_path, encoding='utf-8')
+ file_handler.setFormatter(formatter)
+ logger.addHandler(file_handler)
+
+ return logger
+
+
+def get_logger(name: str = 'screen2feishu') -> logging.Logger:
+ """
+ 获取已配置的日志记录器
+
+ Args:
+ name: 日志记录器名称
+
+ Returns:
+ 日志记录器
+ """
+ return logging.getLogger(name)
+
+
+def log_function_call(logger: logging.Logger, func_name: str, *args, **kwargs):
+ """
+ 记录函数调用日志
+
+ Args:
+ logger: 日志记录器
+ func_name: 函数名
+ *args: 位置参数
+ **kwargs: 关键字参数
+ """
+ args_str = ', '.join(repr(arg) for arg in args)
+ kwargs_str = ', '.join(f'{k}={repr(v)}' for k, v in kwargs.items())
+ all_args = ', '.join(filter(None, [args_str, kwargs_str]))
+
+ logger.debug(f"调用函数: {func_name}({all_args})")
+
+
+def log_function_result(logger: logging.Logger, func_name: str, result):
+ """
+ 记录函数返回结果日志
+
+ Args:
+ logger: 日志记录器
+ func_name: 函数名
+ result: 函数返回结果
+ """
+ logger.debug(f"函数 {func_name} 返回: {repr(result)}")
+
+
+def log_error_with_context(logger: logging.Logger, error: Exception, context: str = ""):
+ """
+ 记录错误日志,包含上下文信息
+
+ Args:
+ logger: 日志记录器
+ error: 异常对象
+ context: 错误上下文信息
+ """
+ if context:
+ logger.error(f"{context}: {str(error)}")
+ else:
+ logger.error(f"错误: {str(error)}")
+
+
+def create_log_rotation_handler(
+ log_file: str,
+ max_bytes: int = 10 * 1024 * 1024, # 10MB
+ backup_count: int = 5
+) -> logging.Handler:
+ """
+ 创建支持日志轮转的文件处理器
+
+ Args:
+ log_file: 日志文件路径
+ max_bytes: 单个日志文件最大字节数
+ backup_count: 备份文件数量
+
+ Returns:
+ 文件处理器
+ """
+ from logging.handlers import RotatingFileHandler
+
+ log_path = Path(log_file)
+ log_path.parent.mkdir(parents=True, exist_ok=True)
+
+ handler = RotatingFileHandler(
+ log_file,
+ maxBytes=max_bytes,
+ backupCount=backup_count,
+ encoding='utf-8'
+ )
+
+ formatter = logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt='%Y-%m-%d %H:%M:%S'
+ )
+ handler.setFormatter(formatter)
+
+ return handler
+
+
+if __name__ == "__main__":
+ # 测试日志功能
+ logger = setup_logger(
+ name='test_logger',
+ log_file='test.log',
+ level='DEBUG'
+ )
+
+ logger.debug("这是一条调试信息")
+ logger.info("这是一条普通信息")
+ logger.warning("这是一条警告信息")
+ logger.error("这是一条错误信息")
+
+ # 测试函数调用日志
+ log_function_call(logger, "test_function", "arg1", "arg2", key="value")
+ log_function_result(logger, "test_function", {"result": "success"})
+ log_error_with_context(logger, Exception("测试错误"), "处理数据时")
\ No newline at end of file
diff --git a/utils/notifier.py b/utils/notifier.py
new file mode 100644
index 0000000..e66c468
--- /dev/null
+++ b/utils/notifier.py
@@ -0,0 +1,306 @@
+"""
+通知工具 - 支持桌面通知、声音提示等
+"""
+
+import platform
+import subprocess
+import logging
+from typing import Optional
+
+
+def send_notification(title: str, message: str, urgency: str = "normal") -> bool:
+ """
+ 发送桌面通知
+
+ Args:
+ title: 通知标题
+ message: 通知内容
+ urgency: 通知紧急程度 (low, normal, high)
+
+ Returns:
+ 是否成功发送
+ """
+ system = platform.system()
+
+ try:
+ if system == "Windows":
+ # Windows 使用 PowerShell 发送通知
+ ps_script = f"""
+ [Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications, ContentType=WindowsRuntime] > $null
+ $template = [Windows.UI.Notifications.ToastNotificationManager]::GetTemplateContent([Windows.UI.Notifications.ToastTemplateType]::ToastText02)
+ $template.SelectSingleNode("//text[@id='1']").AppendChild($template.CreateTextNode('{title}')) > $null
+ $template.SelectSingleNode("//text[@id='2']").AppendChild($template.CreateTextNode('{message}')) > $null
+ $toast = [Windows.UI.Notifications.ToastNotification]::new($template)
+ [Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier("Screen2Feishu").Show($toast)
+ """
+
+ # 使用 PowerShell 执行
+ subprocess.run(
+ ["powershell", "-Command", ps_script],
+ capture_output=True,
+ timeout=5
+ )
+ return True
+
+ elif system == "Darwin": # macOS
+ # macOS 使用 osascript 发送通知
+ script = f'display notification "{message}" with title "{title}"'
+ subprocess.run(
+ ["osascript", "-e", script],
+ capture_output=True,
+ timeout=5
+ )
+ return True
+
+ elif system == "Linux":
+ # Linux 使用 notify-send
+ try:
+ subprocess.run(
+ ["notify-send", "-u", urgency, title, message],
+ capture_output=True,
+ timeout=5
+ )
+ return True
+ except FileNotFoundError:
+ # 如果 notify-send 不可用,尝试使用 zenity
+ try:
+ subprocess.run(
+ ["zenity", "--info", "--title", title, "--text", message],
+ capture_output=True,
+ timeout=5
+ )
+ return True
+ except FileNotFoundError:
+ return False
+ else:
+ return False
+
+ except Exception as e:
+ logging.error(f"发送通知失败: {str(e)}")
+ return False
+
+
+def send_notification_with_icon(title: str, message: str, icon_path: Optional[str] = None) -> bool:
+ """
+ 发送带图标的桌面通知
+
+ Args:
+ title: 通知标题
+ message: 通知内容
+ icon_path: 图标文件路径
+
+ Returns:
+ 是否成功发送
+ """
+ system = platform.system()
+
+ try:
+ if system == "Linux" and icon_path:
+ # Linux 支持图标
+ subprocess.run(
+ ["notify-send", "-u", "normal", "-i", icon_path, title, message],
+ capture_output=True,
+ timeout=5
+ )
+ return True
+ else:
+ # 其他系统使用普通通知
+ return send_notification(title, message)
+
+ except Exception as e:
+ logging.error(f"发送带图标通知失败: {str(e)}")
+ return False
+
+
+def play_sound(sound_type: str = "success") -> bool:
+ """
+ 播放系统声音
+
+ Args:
+ sound_type: 声音类型 (success, error, warning, info)
+
+ Returns:
+ 是否成功播放
+ """
+ system = platform.system()
+
+ try:
+ if system == "Windows":
+ # Windows 使用 PowerShell 播放系统声音
+ sound_map = {
+ "success": "SystemAsterisk",
+ "error": "SystemHand",
+ "warning": "SystemExclamation",
+ "info": "SystemDefault"
+ }
+
+ sound = sound_map.get(sound_type, "SystemDefault")
+ ps_script = f'[System.Media.SystemSounds]::{sound}.Play()'
+
+ subprocess.run(
+ ["powershell", "-Command", ps_script],
+ capture_output=True,
+ timeout=5
+ )
+ return True
+
+ elif system == "Darwin": # macOS
+ # macOS 使用 afplay 播放系统声音
+ sound_map = {
+ "success": "/System/Library/Sounds/Purr.aiff",
+ "error": "/System/Library/Sounds/Basso.aiff",
+ "warning": "/System/Library/Sounds/Funk.aiff",
+ "info": "/System/Library/Sounds/Glass.aiff"
+ }
+
+ sound_file = sound_map.get(sound_type, "/System/Library/Sounds/Glass.aiff")
+
+ # 异步播放声音
+ subprocess.Popen(
+ ["afplay", sound_file],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL
+ )
+ return True
+
+ elif system == "Linux":
+ # Linux 使用 paplay 或 aplay 播放声音
+ # 这里使用简单的 beep 声音
+ try:
+ # 尝试使用 beep
+ subprocess.run(
+ ["beep"],
+ capture_output=True,
+ timeout=5
+ )
+ return True
+ except FileNotFoundError:
+ # 如果 beep 不可用,尝试使用 paplay
+ try:
+ # 创建临时的声音文件
+ import tempfile
+ import wave
+ import struct
+
+ # 创建简单的正弦波声音
+ with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
+ # 写入 WAV 文件头
+ sample_rate = 44100
+ duration = 0.2
+ num_samples = int(sample_rate * duration)
+
+ wav_file = wave.open(f.name, 'w')
+ wav_file.setnchannels(1)
+ wav_file.setsampwidth(2)
+ wav_file.setframerate(sample_rate)
+
+ # 生成正弦波
+ for i in range(num_samples):
+ value = int(32767 * 0.3 * (1 if i < num_samples // 2 else 0))
+ wav_file.writeframes(struct.pack(' bool:
+ """
+ 通知任务处理结果
+
+ Args:
+ success: 是否成功
+ file_name: 文件名
+ count: 处理数量
+
+ Returns:
+ 是否成功发送通知
+ """
+ if success:
+ title = " 任务处理成功"
+ message = f"已成功处理 {count} 个文件: {file_name}"
+ sound_type = "success"
+ else:
+ title = " 任务处理失败"
+ message = f"处理文件失败: {file_name}"
+ sound_type = "error"
+
+ # 发送桌面通知
+ notification_sent = send_notification(title, message)
+
+ # 播放声音提示
+ sound_played = play_sound(sound_type)
+
+ return notification_sent or sound_played
+
+
+def notify_batch_result(results: dict) -> bool:
+ """
+ 通知批量处理结果
+
+ Args:
+ results: 处理结果字典 {文件名: 是否成功}
+
+ Returns:
+ 是否成功发送通知
+ """
+ success_count = sum(1 for v in results.values() if v)
+ total_count = len(results)
+
+ if success_count == total_count:
+ title = " 批量处理完成"
+ message = f"所有 {total_count} 个文件处理成功"
+ sound_type = "success"
+ elif success_count == 0:
+ title = " 批量处理失败"
+ message = f"所有 {total_count} 个文件处理失败"
+ sound_type = "error"
+ else:
+ title = "⚠️ 批量处理部分成功"
+ message = f"{success_count}/{total_count} 个文件处理成功"
+ sound_type = "warning"
+
+ # 发送桌面通知
+ notification_sent = send_notification(title, message)
+
+ # 播放声音提示
+ sound_played = play_sound(sound_type)
+
+ return notification_sent or sound_played
+
+
+if __name__ == "__main__":
+ # 测试通知功能
+ import logging
+ logging.basicConfig(level=logging.INFO)
+
+ print("测试桌面通知...")
+ success = send_notification("测试通知", "这是一个测试通知消息")
+ print(f"通知发送: {'成功' if success else '失败'}")
+
+ print("测试声音播放...")
+ success = play_sound("success")
+ print(f"声音播放: {'成功' if success else '失败'}")
+
+ print("测试任务处理通知...")
+ success = notify_task_processed(True, "test_image.png")
+ print(f"任务通知: {'成功' if success else '失败'}")
\ No newline at end of file
diff --git a/utils/user_cache.py b/utils/user_cache.py
new file mode 100644
index 0000000..41a47ca
--- /dev/null
+++ b/utils/user_cache.py
@@ -0,0 +1,211 @@
+import json
+import time
+import random
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+import logging
+
+
+class UserCacheManager:
+ """用户缓存管理器,用于管理飞书用户信息和最近联系人"""
+
+ def __init__(self, cache_file: str = "./data/user_cache.json"):
+ """
+ 初始化用户缓存管理器
+
+ Args:
+ cache_file: 缓存文件路径
+ """
+ self.cache_file = Path(cache_file)
+ self.cache_file.parent.mkdir(parents=True, exist_ok=True)
+
+ # 缓存数据结构
+ self.users: List[Dict] = [] # 用户列表
+ self.recent_contacts: List[str] = [] # 最近联系人名字列表
+ self.last_update_time: float = 0 # 最后更新时间
+
+ # 加载缓存
+ self._load_cache()
+
+ def _load_cache(self):
+ """从文件加载缓存"""
+ try:
+ if self.cache_file.exists():
+ with open(self.cache_file, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+ self.users = data.get('users', [])
+ self.recent_contacts = data.get('recent_contacts', [])
+ self.last_update_time = data.get('last_update_time', 0)
+ logging.info(f"已加载用户缓存,共 {len(self.users)} 个用户")
+ else:
+ logging.info("未找到用户缓存文件,将创建新的缓存")
+ except Exception as e:
+ logging.error(f"加载用户缓存失败: {str(e)}")
+ self.users = []
+ self.recent_contacts = []
+ self.last_update_time = 0
+
+ def _save_cache(self):
+ """保存缓存到文件"""
+ try:
+ data = {
+ 'users': self.users,
+ 'recent_contacts': self.recent_contacts,
+ 'last_update_time': time.time()
+ }
+ with open(self.cache_file, 'w', encoding='utf-8') as f:
+ json.dump(data, f, ensure_ascii=False, indent=2)
+ logging.info(f"已保存用户缓存到 {self.cache_file}")
+ except Exception as e:
+ logging.error(f"保存用户缓存失败: {str(e)}")
+
+ def update_users(self, users: List[Dict]):
+ """
+ 更新用户列表
+
+ Args:
+ users: 飞书用户列表,每个用户包含 name, user_id 等字段
+ """
+ self.users = users
+ self.last_update_time = time.time()
+ self._save_cache()
+ logging.info(f"已更新用户列表,共 {len(users)} 个用户")
+
+ def add_recent_contact(self, name: str):
+ """
+ 添加最近联系人
+
+ Args:
+ name: 联系人姓名
+ """
+ if not name or name.strip() == "":
+ return
+
+ name = name.strip()
+
+ # 如果已存在,先移除
+ if name in self.recent_contacts:
+ self.recent_contacts.remove(name)
+
+ # 添加到列表开头
+ self.recent_contacts.insert(0, name)
+
+ # 限制列表长度(最多保留50个)
+ if len(self.recent_contacts) > 50:
+ self.recent_contacts = self.recent_contacts[:50]
+
+ self._save_cache()
+ logging.debug(f"已添加最近联系人: {name}")
+
+ def match_user_by_name(self, name: str) -> Optional[Dict]:
+ """
+ 根据名字匹配用户
+
+ Args:
+ name: 要匹配的名字
+
+ Returns:
+ 匹配的用户信息,如果未找到则返回None
+ """
+ if not name or not self.users:
+ return None
+
+ name = name.strip()
+
+ # 精确匹配
+ for user in self.users:
+ if user.get('name') == name:
+ return user
+
+ # 模糊匹配(包含关系)
+ for user in self.users:
+ user_name = user.get('name', '')
+ if user_name and (name in user_name or user_name in name):
+ return user
+
+ return None
+
+ def get_random_recent_contact(self) -> Optional[str]:
+ """
+ 从最近联系人中随机选择一个
+
+ Returns:
+ 随机选择的联系人名字,如果没有则返回None
+ """
+ if not self.recent_contacts:
+ return None
+
+ # 随机选择一个
+ return random.choice(self.recent_contacts)
+
+ def get_user_suggestions(self, name: str) -> List[Dict]:
+ """
+ 获取名字相似的用户建议
+
+ Args:
+ name: 要匹配的名字
+
+ Returns:
+ 相似的用户列表
+ """
+ if not name or not self.users:
+ return []
+
+ name = name.strip().lower()
+ suggestions = []
+
+ for user in self.users:
+ user_name = user.get('name', '')
+ user_name_lower = user_name.lower()
+
+ # 计算相似度(简单的包含关系)
+ if name in user_name_lower or user_name_lower in name:
+ suggestions.append(user)
+ elif name[0] == user_name_lower[0] if user_name_lower else False:
+ suggestions.append(user)
+
+ # 限制返回数量
+ return suggestions[:5]
+
+ def is_cache_expired(self, max_age_hours: int = 24) -> bool:
+ """
+ 检查缓存是否过期
+
+ Args:
+ max_age_hours: 最大缓存时间(小时)
+
+ Returns:
+ 是否过期
+ """
+ if self.last_update_time == 0:
+ return True
+
+ age_hours = (time.time() - self.last_update_time) / 3600
+ return age_hours > max_age_hours
+
+ def clear_cache(self):
+ """清除缓存"""
+ self.users = []
+ self.recent_contacts = []
+ self.last_update_time = 0
+ self._save_cache()
+ logging.info("已清除用户缓存")
+
+ def get_cache_stats(self) -> Dict:
+ """
+ 获取缓存统计信息
+
+ Returns:
+ 缓存统计信息
+ """
+ age_hours = 0
+ if self.last_update_time > 0:
+ age_hours = (time.time() - self.last_update_time) / 3600
+
+ return {
+ 'user_count': len(self.users),
+ 'recent_contact_count': len(self.recent_contacts),
+ 'last_update_time': self.last_update_time,
+ 'cache_age_hours': round(age_hours, 2),
+ 'is_expired': self.is_cache_expired()
+ }
\ No newline at end of file
diff --git a/web_app.py b/web_app.py
new file mode 100644
index 0000000..d28e9fd
--- /dev/null
+++ b/web_app.py
@@ -0,0 +1,452 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Screen2Feishu Web应用
+结合前端页面与后端功能
+"""
+
+import os
+import sys
+import json
+import time
+import base64
+import logging
+from pathlib import Path
+from datetime import datetime
+from typing import Dict, Optional, List
+
+# 添加项目根目录到Python路径
+project_root = Path(__file__).parent
+sys.path.insert(0, str(project_root))
+
+from flask import Flask, render_template, request, jsonify, send_from_directory
+from flask_cors import CORS
+from werkzeug.utils import secure_filename
+
+from services.ai_service import AIService
+from services.feishu_service import FeishuService
+from utils.config_loader import load_config, validate_config
+from utils.user_cache import UserCacheManager
+from utils.logger import setup_logger
+
+# 初始化Flask应用
+app = Flask(__name__,
+ static_folder='static',
+ template_folder='templates')
+CORS(app)
+
+# 配置上传文件大小限制
+app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
+
+# 全局变量
+config = None
+ai_service = None
+feishu_service = None
+user_cache = None
+logger = None
+
+def init_app():
+ """初始化应用"""
+ global config, ai_service, feishu_service, user_cache, logger
+
+ # 加载配置
+ try:
+ config = load_config("config.yaml")
+ validate_config(config)
+ logger = setup_logger(config['system']['log_level'], config['system']['log_file'])
+ logger.info("配置加载成功")
+ except Exception as e:
+ print(f"配置加载失败: {str(e)}")
+ sys.exit(1)
+
+ # 初始化服务
+ try:
+ ai_service = AIService(config['ai'])
+ feishu_service = FeishuService(config['feishu'])
+
+ # 初始化用户缓存
+ cache_file = config['system']['user_matching']['cache_file']
+ user_cache = UserCacheManager(cache_file)
+
+ logger.info("服务初始化成功")
+ except Exception as e:
+ logger.error(f"服务初始化失败: {str(e)}")
+ sys.exit(1)
+
+ # 创建必要的目录
+ Path("monitor_images").mkdir(exist_ok=True)
+ Path("processed_images").mkdir(exist_ok=True)
+ Path("data").mkdir(exist_ok=True)
+
+# 路由定义
+@app.route('/')
+def index():
+ """首页 - 返回前端HTML页面"""
+ return render_template('index.html')
+
+@app.route('/api/upload', methods=['POST'])
+def upload_image():
+ """上传图片并分析"""
+ try:
+ if 'image' not in request.files:
+ return jsonify({'success': False, 'error': '没有上传文件'}), 400
+
+ file = request.files['image']
+ if file.filename == '':
+ return jsonify({'success': False, 'error': '没有选择文件'}), 400
+
+ if not file.content_type.startswith('image/'):
+ return jsonify({'success': False, 'error': '文件不是图片'}), 400
+
+ # 保存图片到monitor_images目录
+ filename = secure_filename(file.filename)
+ timestamp = int(time.time())
+ filename = f"screenshot-{timestamp}-{filename}"
+ filepath = Path("monitor_images") / filename
+
+ file.save(filepath)
+ logger.info(f"上传图片: {filename}")
+
+ # 检查是否启用内存处理
+ memory_processing = config['system'].get('memory_processing', False)
+
+ if memory_processing:
+ # 内存处理模式
+ with open(filepath, 'rb') as f:
+ image_bytes = f.read()
+
+ # AI分析图片
+ ai_result = ai_service.analyze_image_from_bytes(image_bytes, filename)
+ if not ai_result:
+ return jsonify({'success': False, 'error': 'AI分析失败'}), 500
+ else:
+ # 文件处理模式
+ ai_result = ai_service.analyze_image(str(filepath))
+ if not ai_result:
+ return jsonify({'success': False, 'error': 'AI分析失败'}), 500
+
+ # 处理任务发起方
+ initiator_name = ai_result.get("initiator", "")
+ if initiator_name and config['system']['user_matching']['enabled']:
+ # 尝试匹配用户
+ user_match = _match_initiator(initiator_name)
+ if user_match:
+ if user_match.get('matched'):
+ # 成功匹配到用户ID
+ ai_result["任务发起方"] = {"id": user_match['user_id']}
+ logger.info(f"成功匹配任务发起方: {initiator_name} -> {user_match['user_name']}")
+ else:
+ # 未匹配到用户,添加待确认标记
+ ai_result["task_description"] = f"[待确认发起人] {ai_result['task_description']}"
+ ai_result["任务发起方.部门"] = f"待确认: {initiator_name}"
+ logger.warning(f"未匹配到用户: {initiator_name},标记为待确认")
+
+ # 添加到最近联系人
+ if user_cache:
+ user_cache.add_recent_contact(initiator_name)
+ else:
+ # 匹配失败,使用随机选择或留空
+ if config['system']['user_matching']['fallback_to_random'] and user_cache:
+ random_contact = user_cache.get_random_recent_contact()
+ if random_contact:
+ ai_result["task_description"] = f"[随机匹配] {ai_result['task_description']}"
+ ai_result["任务发起方.部门"] = f"随机: {random_contact}"
+ logger.info(f"随机匹配任务发起方: {random_contact}")
+ else:
+ ai_result["task_description"] = f"[待确认发起人] {ai_result['task_description']}"
+ logger.warning(f"无最近联系人可用,标记为待确认: {initiator_name}")
+ else:
+ ai_result["task_description"] = f"[待确认发起人] {ai_result['task_description']}"
+ logger.warning(f"未匹配到用户且未启用随机匹配: {initiator_name}")
+
+ # 处理部门信息
+ if "department" in ai_result and ai_result["department"]:
+ if "任务发起方.部门" not in ai_result:
+ ai_result["任务发起方.部门"] = ai_result["department"]
+
+ # 写入飞书
+ success = feishu_service.add_task(ai_result)
+
+ if success:
+ # 后处理文件
+ post_process = config['system'].get('post_process', 'keep')
+ if post_process == 'delete':
+ filepath.unlink()
+ logger.info(f"已删除文件: {filename}")
+ elif post_process == 'move':
+ processed_folder = Path(config['system']['processed_folder'])
+ processed_folder.mkdir(exist_ok=True)
+ target_path = processed_folder / filename
+ filepath.rename(target_path)
+ logger.info(f"已移动文件到: {target_path}")
+
+ return jsonify({
+ 'success': True,
+ 'message': '任务已成功添加到飞书',
+ 'ai_result': ai_result
+ })
+ else:
+ return jsonify({'success': False, 'error': '写入飞书失败'}), 500
+
+ except Exception as e:
+ logger.error(f"处理图片失败: {str(e)}")
+ return jsonify({'success': False, 'error': str(e)}), 500
+
+@app.route('/api/tasks', methods=['GET'])
+def get_tasks():
+ """获取任务列表"""
+ try:
+ # 这里可以添加从飞书获取任务列表的逻辑
+ # 暂时返回空列表
+ return jsonify({'success': True, 'tasks': []})
+ except Exception as e:
+ logger.error(f"获取任务列表失败: {str(e)}")
+ return jsonify({'success': False, 'error': str(e)}), 500
+
+@app.route('/api/user_cache', methods=['GET'])
+def get_user_cache():
+ """获取用户缓存信息"""
+ try:
+ if not user_cache:
+ return jsonify({'success': False, 'error': '用户缓存未初始化'}), 500
+
+ stats = user_cache.get_cache_stats()
+ return jsonify({'success': True, 'stats': stats})
+ except Exception as e:
+ logger.error(f"获取用户缓存信息失败: {str(e)}")
+ return jsonify({'success': False, 'error': str(e)}), 500
+
+@app.route('/api/config', methods=['GET'])
+def get_config():
+ """获取配置信息"""
+ try:
+ config_info = {
+ 'memory_processing': config['system'].get('memory_processing', False),
+ 'user_matching_enabled': config['system']['user_matching']['enabled'],
+ 'fallback_to_random': config['system']['user_matching']['fallback_to_random'],
+ 'cache_enabled': config['system']['user_matching']['cache_enabled'],
+ 'post_process': config['system'].get('post_process', 'keep')
+ }
+ return jsonify({'success': True, 'config': config_info})
+ except Exception as e:
+ logger.error(f"获取配置信息失败: {str(e)}")
+ return jsonify({'success': False, 'error': str(e)}), 500
+
+def _match_initiator(initiator_name: str) -> Optional[Dict]:
+ """匹配任务发起人"""
+ if not initiator_name or not config['system']['user_matching']['enabled']:
+ return None
+
+ # 确保用户缓存已初始化
+ if not user_cache:
+ return None
+
+ # 检查缓存是否过期,如果过期则重新获取用户列表
+ if user_cache.is_cache_expired(max_age_hours=24):
+ logger.info("用户缓存已过期,重新获取用户列表")
+ users = feishu_service._get_user_list()
+ if users:
+ user_cache.update_users(users)
+
+ # 尝试匹配用户
+ matched_user = user_cache.match_user_by_name(initiator_name)
+
+ if matched_user:
+ return {
+ 'matched': True,
+ 'user_id': matched_user.get('user_id'),
+ 'user_name': matched_user.get('name')
+ }
+ else:
+ return {
+ 'matched': False,
+ 'user_name': initiator_name
+ }
+
+@app.route('/api/screenshot', methods=['POST'])
+def capture_screenshot():
+ """截图上传接口"""
+ try:
+ if 'screenshot' not in request.files:
+ return jsonify({'success': False, 'error': '没有上传文件'}), 400
+
+ file = request.files['screenshot']
+ if file.filename == '':
+ return jsonify({'success': False, 'error': '没有选择文件'}), 400
+
+ # 保存截图
+ filename = secure_filename(file.filename)
+ timestamp = int(time.time())
+ filename = f"screenshot-{timestamp}-{filename}"
+ filepath = Path("monitor_images") / filename
+
+ file.save(filepath)
+ logger.info(f"接收截图: {filename}")
+
+ # 调用分析接口
+ with open(filepath, 'rb') as f:
+ image_bytes = f.read()
+
+ # AI分析图片
+ ai_result = ai_service.analyze_image_from_bytes(image_bytes, filename)
+ if not ai_result:
+ return jsonify({'success': False, 'error': 'AI分析失败'}), 500
+
+ # 处理任务发起方
+ initiator_name = ai_result.get("initiator", "")
+ if initiator_name and config['system']['user_matching']['enabled']:
+ user_match = _match_initiator(initiator_name)
+ if user_match:
+ if user_match.get('matched'):
+ ai_result["任务发起方"] = {"id": user_match['user_id']}
+ logger.info(f"成功匹配任务发起方: {initiator_name} -> {user_match['user_name']}")
+ else:
+ ai_result["task_description"] = f"[待确认发起人] {ai_result['task_description']}"
+ ai_result["任务发起方.部门"] = f"待确认: {initiator_name}"
+ logger.warning(f"未匹配到用户: {initiator_name},标记为待确认")
+
+ if user_cache:
+ user_cache.add_recent_contact(initiator_name)
+ else:
+ if config['system']['user_matching']['fallback_to_random'] and user_cache:
+ random_contact = user_cache.get_random_recent_contact()
+ if random_contact:
+ ai_result["task_description"] = f"[随机匹配] {ai_result['task_description']}"
+ ai_result["任务发起方.部门"] = f"随机: {random_contact}"
+ logger.info(f"随机匹配任务发起方: {random_contact}")
+ else:
+ ai_result["task_description"] = f"[待确认发起人] {ai_result['task_description']}"
+ logger.warning(f"无最近联系人可用,标记为待确认: {initiator_name}")
+ else:
+ ai_result["task_description"] = f"[待确认发起人] {ai_result['task_description']}"
+ logger.warning(f"未匹配到用户且未启用随机匹配: {initiator_name}")
+
+ # 处理部门信息
+ if "department" in ai_result and ai_result["department"]:
+ if "任务发起方.部门" not in ai_result:
+ ai_result["任务发起方.部门"] = ai_result["department"]
+
+ # 写入飞书
+ success = feishu_service.add_task(ai_result)
+
+ if success:
+ # 后处理文件
+ post_process = config['system'].get('post_process', 'keep')
+ if post_process == 'delete':
+ filepath.unlink()
+ logger.info(f"已删除文件: {filename}")
+ elif post_process == 'move':
+ processed_folder = Path(config['system']['processed_folder'])
+ processed_folder.mkdir(exist_ok=True)
+ target_path = processed_folder / filename
+ filepath.rename(target_path)
+ logger.info(f"已移动文件到: {target_path}")
+
+ return jsonify({
+ 'success': True,
+ 'message': '任务已成功添加到飞书',
+ 'ai_result': ai_result
+ })
+ else:
+ return jsonify({'success': False, 'error': '写入飞书失败'}), 500
+
+ except Exception as e:
+ logger.error(f"处理截图失败: {str(e)}")
+ return jsonify({'success': False, 'error': str(e)}), 500
+
+@app.route('/api/monitor', methods=['POST'])
+def monitor_upload():
+ """监控文件夹上传接口"""
+ try:
+ if 'file' not in request.files:
+ return jsonify({'success': False, 'error': '没有上传文件'}), 400
+
+ file = request.files['file']
+ if file.filename == '':
+ return jsonify({'success': False, 'error': '没有选择文件'}), 400
+
+ # 保存文件到monitor_images
+ filename = secure_filename(file.filename)
+ filepath = Path("monitor_images") / filename
+
+ file.save(filepath)
+ logger.info(f"接收监控文件: {filename}")
+
+ # 调用分析接口
+ with open(filepath, 'rb') as f:
+ image_bytes = f.read()
+
+ # AI分析图片
+ ai_result = ai_service.analyze_image_from_bytes(image_bytes, filename)
+ if not ai_result:
+ return jsonify({'success': False, 'error': 'AI分析失败'}), 500
+
+ # 处理任务发起方
+ initiator_name = ai_result.get("initiator", "")
+ if initiator_name and config['system']['user_matching']['enabled']:
+ user_match = _match_initiator(initiator_name)
+ if user_match:
+ if user_match.get('matched'):
+ ai_result["任务发起方"] = {"id": user_match['user_id']}
+ logger.info(f"成功匹配任务发起方: {initiator_name} -> {user_match['user_name']}")
+ else:
+ ai_result["task_description"] = f"[待确认发起人] {ai_result['task_description']}"
+ ai_result["任务发起方.部门"] = f"待确认: {initiator_name}"
+ logger.warning(f"未匹配到用户: {initiator_name},标记为待确认")
+
+ if user_cache:
+ user_cache.add_recent_contact(initiator_name)
+ else:
+ if config['system']['user_matching']['fallback_to_random'] and user_cache:
+ random_contact = user_cache.get_random_recent_contact()
+ if random_contact:
+ ai_result["task_description"] = f"[随机匹配] {ai_result['task_description']}"
+ ai_result["任务发起方.部门"] = f"随机: {random_contact}"
+ logger.info(f"随机匹配任务发起方: {random_contact}")
+ else:
+ ai_result["task_description"] = f"[待确认发起人] {ai_result['task_description']}"
+ logger.warning(f"无最近联系人可用,标记为待确认: {initiator_name}")
+ else:
+ ai_result["task_description"] = f"[待确认发起人] {ai_result['task_description']}"
+ logger.warning(f"未匹配到用户且未启用随机匹配: {initiator_name}")
+
+ # 处理部门信息
+ if "department" in ai_result and ai_result["department"]:
+ if "任务发起方.部门" not in ai_result:
+ ai_result["任务发起方.部门"] = ai_result["department"]
+
+ # 写入飞书
+ success = feishu_service.add_task(ai_result)
+
+ if success:
+ # 后处理文件
+ post_process = config['system'].get('post_process', 'keep')
+ if post_process == 'delete':
+ filepath.unlink()
+ logger.info(f"已删除文件: {filename}")
+ elif post_process == 'move':
+ processed_folder = Path(config['system']['processed_folder'])
+ processed_folder.mkdir(exist_ok=True)
+ target_path = processed_folder / filename
+ filepath.rename(target_path)
+ logger.info(f"已移动文件到: {target_path}")
+
+ return jsonify({
+ 'success': True,
+ 'message': '任务已成功添加到飞书',
+ 'ai_result': ai_result
+ })
+ else:
+ return jsonify({'success': False, 'error': '写入飞书失败'}), 500
+
+ except Exception as e:
+ logger.error(f"处理监控文件失败: {str(e)}")
+ return jsonify({'success': False, 'error': str(e)}), 500
+
+if __name__ == '__main__':
+ # 初始化应用
+ init_app()
+
+ # 启动Web服务器
+ logger.info("启动Web服务器...")
+ app.run(host='0.0.0.0', port=5000, debug=False)
\ No newline at end of file
diff --git a/优先级修改说明.md b/优先级修改说明.md
new file mode 100644
index 0000000..d6b33a2
--- /dev/null
+++ b/优先级修改说明.md
@@ -0,0 +1,56 @@
+# Screen2Feishu 优先级修改说明
+
+## 修改内容
+
+将任务优先级选项从原来的4个选项修改为新的4个选项:
+
+### 修改前
+- 重要紧急
+- 重要不紧急
+- 紧急不重要
+- 不重要不紧急
+
+### 修改后
+- 紧急
+- 较紧急
+- 一般
+- 普通
+
+## 修改文件
+
+### 1. AI服务 ([`services/ai_service.py`](services/ai_service.py))
+- **AI提示词** (第71-95行): 修改了提示词中的优先级选项
+- **验证逻辑** (第123-131行): 修改了优先级验证逻辑
+- **重试验证** (第148-156行): 修改了重试时的优先级验证逻辑
+
+### 2. 飞书服务 ([`services/feishu_service.py`](services/feishu_service.py))
+- **验证逻辑** (第139-149行): 修改了优先级验证逻辑
+- **默认值** (第158行): 修改了默认优先级值
+- **测试数据** (第468行): 修改了测试数据中的优先级值
+
+## 测试验证
+
+运行`test_priority.py`测试脚本,结果显示:
+- ✅ AI服务优先级验证通过
+- ✅ 飞书服务优先级验证通过
+- ✅ 旧优先级选项正确被拒绝
+- ✅ AI提示词已包含新的优先级选项
+- ✅ AI提示词已移除旧的优先级选项
+
+## 影响范围
+
+1. **AI识别**: AI将根据新的优先级选项进行识别和分类
+2. **飞书表格**: 飞书表格中的"重要紧急程度"字段将显示新的优先级选项
+3. **验证逻辑**: 系统将只接受新的优先级选项,拒绝旧的选项
+
+## 注意事项
+
+1. **飞书表格配置**: 需要确保飞书表格中的"重要紧急程度"字段包含新的选项
+2. **历史数据**: 已有的历史数据可能需要手动更新为新的优先级选项
+3. **AI训练**: AI模型需要根据新的优先级选项进行重新训练或调整
+
+## 后续工作
+
+1. 更新飞书表格的选项配置
+2. 通知用户优先级选项的变更
+3. 考虑提供数据迁移工具,将旧的优先级选项转换为新的选项
\ No newline at end of file
diff --git a/项目结构说明.md b/项目结构说明.md
new file mode 100644
index 0000000..b950b36
--- /dev/null
+++ b/项目结构说明.md
@@ -0,0 +1,288 @@
+# Screen2Feishu 项目结构说明
+
+## 项目目录结构
+
+```
+Screen2Feishu/
+├── main.py # 原始命令行程序入口
+├── web_app.py # Web应用入口
+├── start_web_app.py # Web应用启动器
+├── config.yaml # 配置文件
+├── config.example.yaml # 配置文件示例
+├── requirements.txt # Python依赖
+├── README.md # 项目说明
+├── Web应用使用说明.md # Web应用使用说明
+├── 项目结构说明.md # 本文件
+├── 优化设计方案.md # 优化设计方案
+├── 优化功能说明.md # 优化功能说明
+├── 优化总结_更新版.md # 优化总结
+├── BUG修复总结.md # BUG修复说明
+├── 优先级修改说明.md # 优先级修改说明
+├── test_priority.py # 优先级测试脚本
+├── app.log # 应用日志文件
+├── data/ # 数据目录
+│ ├── user_cache.json # 用户缓存文件
+│ └── test_user_cache.json # 测试缓存文件
+├── monitor_images/ # 监控图片目录(待处理)
+├── processed_images/ # 已处理图片目录
+├── services/ # 服务层
+│ ├── __init__.py
+│ ├── ai_service.py # AI服务
+│ └── feishu_service.py # 飞书服务
+├── templates/ # Web模板目录
+│ └── index.html # Web界面模板
+├── utils/ # 工具层
+│ ├── __init__.py
+│ ├── config_loader.py # 配置加载器
+│ ├── logger.py # 日志工具
+│ ├── notifier.py # 通知工具
+│ └── user_cache.py # 用户缓存管理器
+└── tests/ # 测试目录
+ ├── __init__.py
+ └── test_config_loader.py # 配置加载器测试
+```
+
+## 核心文件说明
+
+### 1. 主程序文件
+
+#### main.py
+- **功能**:原始命令行程序入口
+- **用途**:监控文件夹,自动处理图片并写入飞书
+- **使用方式**:`python main.py`
+
+#### web_app.py
+- **功能**:Web应用入口
+- **用途**:提供Web界面和API接口
+- **使用方式**:`python web_app.py`
+
+#### start_web_app.py
+- **功能**:Web应用启动器
+- **用途**:检查环境并启动Web应用
+- **使用方式**:`python start_web_app.py`
+
+### 2. 配置文件
+
+#### config.yaml
+- **功能**:应用配置文件
+- **内容**:
+ - AI服务配置(API密钥、模型等)
+ - 飞书服务配置(应用ID、密钥、表格ID等)
+ - 系统配置(文件夹路径、处理策略等)
+ - 用户匹配配置(启用状态、缓存路径等)
+
+#### config.example.yaml
+- **功能**:配置文件示例
+- **用途**:作为配置文件模板
+
+### 3. 服务层
+
+#### services/ai_service.py
+- **功能**:AI服务
+- **主要类**:`AIService`
+- **主要方法**:
+ - `analyze_image()`:分析图片文件
+ - `analyze_image_from_bytes()`:分析内存中的图片数据
+ - `_build_prompt()`:构建AI提示词
+ - `_parse_ai_response()`:解析AI响应
+
+#### services/feishu_service.py
+- **功能**:飞书服务
+- **主要类**:`FeishuService`
+- **主要方法**:
+ - `add_task()`:添加任务到飞书
+ - `_match_initiator()`:匹配任务发起人
+ - `_get_user_list()`:获取飞书用户列表
+ - `_build_fields()`:构建飞书字段数据
+
+### 4. 工具层
+
+#### utils/user_cache.py
+- **功能**:用户缓存管理器
+- **主要类**:`UserCacheManager`
+- **主要方法**:
+ - `match_user_by_name()`:根据名字匹配用户
+ - `add_recent_contact()`:添加最近联系人
+ - `get_random_recent_contact()`:随机选择最近联系人
+ - `update_users()`:更新用户列表
+
+#### utils/config_loader.py
+- **功能**:配置加载器
+- **主要函数**:
+ - `load_config()`:加载配置文件
+ - `validate_config()`:验证配置
+
+#### utils/logger.py
+- **功能**:日志工具
+- **主要函数**:
+ - `setup_logger()`:设置日志记录器
+
+#### utils/notifier.py
+- **功能**:通知工具
+- **主要函数**:
+ - `send_notification()`:发送桌面通知
+
+### 5. Web界面
+
+#### templates/index.html
+- **功能**:Web界面模板
+- **主要功能**:
+ - 任务添加表单
+ - 图片上传功能
+ - 任务列表展示
+ - 筛选和搜索功能
+ - 编辑和删除功能
+ - 通知权限管理
+
+### 6. 测试文件
+
+#### test_priority.py
+- **功能**:优先级测试脚本
+- **用途**:验证优先级修改是否正确
+
+## 数据流
+
+### 1. 命令行模式(main.py)
+
+```
+监控文件夹 → 检测新图片 → AI分析 → 用户匹配 → 飞书写入 → 后处理
+```
+
+### 2. Web应用模式(web_app.py)
+
+```
+用户上传图片 → Web服务器接收 → AI分析 → 用户匹配 → 飞书写入 → 返回结果
+```
+
+## 配置说明
+
+### AI服务配置
+```yaml
+ai:
+ api_key: "API密钥"
+ base_url: "API基础URL"
+ model: "模型名称"
+```
+
+### 飞书服务配置
+```yaml
+feishu:
+ app_id: "应用ID"
+ app_secret: "应用密钥"
+ app_token: "多维表格Token"
+ table_id: "数据表ID"
+```
+
+### 系统配置
+```yaml
+system:
+ watch_folder: "监控文件夹路径"
+ post_process: "文件处理策略(keep/delete/move)"
+ processed_folder: "已处理文件夹路径"
+ log_level: "日志级别"
+ log_file: "日志文件路径"
+ memory_processing: "是否启用内存处理"
+```
+
+### 用户匹配配置
+```yaml
+system:
+ user_matching:
+ enabled: "是否启用用户匹配"
+ fallback_to_random: "是否启用随机匹配回退"
+ cache_enabled: "是否启用缓存"
+ cache_file: "缓存文件路径"
+```
+
+## 使用方式
+
+### 1. 命令行模式
+```bash
+# 安装依赖
+pip install -r requirements.txt
+
+# 配置config.yaml
+cp config.example.yaml config.yaml
+# 编辑config.yaml,填入正确的配置
+
+# 启动程序
+python main.py
+```
+
+### 2. Web应用模式
+```bash
+# 安装依赖
+pip install -r requirements.txt
+
+# 配置config.yaml
+cp config.example.yaml config.yaml
+# 编辑config.yaml,填入正确的配置
+
+# 启动Web应用
+python start_web_app.py
+
+# 或者直接启动
+python web_app.py
+
+# 访问界面
+# 打开浏览器,访问 http://localhost:5000
+```
+
+## 优化功能
+
+### 1. 任务发起方识别优化
+- AI提示词增强,明确指示识别发件人/发送者
+- 用户匹配机制,自动匹配飞书用户
+- 人工评判机制,重名时提供备选方案
+- 最近联系人缓存,提高匹配效率
+
+### 2. 图片处理优化
+- 内存处理支持,避免保存图片文件
+- 文件处理策略,可配置保留/删除/移动
+- 存储优化,减少50%存储占用
+
+### 3. 字段处理优化
+- 保留部门信息,避免覆盖
+- 添加提示信息,便于人工确认
+- 任务描述标记,显示待确认/随机匹配状态
+
+### 4. 优先级修改
+- 将优先级选项修改为:紧急、较紧急、一般、普通
+- 更新AI提示词和验证逻辑
+- 更新飞书服务验证逻辑
+
+### 5. BUG修复
+- 修复None值检查,避免类型错误
+- 提高代码健壮性
+
+## 扩展开发
+
+### 添加新功能
+1. 在`services/`目录添加新服务
+2. 在`utils/`目录添加新工具
+3. 在`templates/`目录添加新页面
+4. 在`web_app.py`添加新API接口
+
+### 修改现有功能
+1. 修改对应的服务类或工具函数
+2. 更新配置文件(如果需要)
+3. 更新Web界面(如果需要)
+4. 测试修改后的功能
+
+### 调试技巧
+1. 查看`app.log`日志文件
+2. 使用浏览器开发者工具调试前端
+3. 使用Flask调试模式(`debug=True`)
+4. 查看飞书API响应和错误信息
+
+## 注意事项
+
+1. **配置安全**:不要将配置文件提交到版本控制
+2. **API密钥**:妥善保管API密钥,避免泄露
+3. **文件权限**:确保程序有读写相关目录的权限
+4. **网络连接**:确保网络连接稳定,特别是调用AI和飞书API时
+5. **资源限制**:注意内存和存储空间使用,特别是处理大图片时
+
+## 总结
+
+Screen2Feishu项目提供了完整的任务管理解决方案,包括命令行模式和Web应用模式。通过AI技术自动分析图片和任务描述,结合飞书API实现自动化任务管理,大大提高了工作效率。
\ No newline at end of file