Files

317 lines
10 KiB
Python
Raw Permalink Normal View History

2026-03-18 15:15:52 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Screen2Feishu - AI飞书多维表格自动录入助手
监控文件夹自动处理图片并写入飞书多维表格
"""
import os
import sys
import time
import argparse
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from services.ai_service import AIService
from services.feishu_service import FeishuService
from utils.logger import setup_logger
from utils.notifier import send_notification
from utils.config_loader import load_config, validate_config
class ImageFileHandler(FileSystemEventHandler):
"""文件系统事件处理器"""
def __init__(self, config, logger):
self.config = config
self.logger = logger
self.ai_service = AIService(config['ai'])
self.feishu_service = FeishuService(config['feishu'])
self.processed_files = set() # 已处理文件记录
self.supported_extensions = {'.png', '.jpg', '.jpeg', '.bmp', '.gif'}
# 创建必要的文件夹
self._create_directories()
def _create_directories(self):
"""创建必要的文件夹"""
watch_folder = Path(self.config['system']['watch_folder'])
watch_folder.mkdir(exist_ok=True)
if self.config['system'].get('post_process') == 'move':
processed_folder = Path(self.config['system']['processed_folder'])
processed_folder.mkdir(exist_ok=True)
def _is_supported_image(self, file_path):
"""检查文件是否为支持的图片格式"""
return Path(file_path).suffix.lower() in self.supported_extensions
def _is_file_ready(self, file_path):
"""检查文件是否已完全写入(通过文件大小稳定判断)"""
try:
size1 = os.path.getsize(file_path)
time.sleep(0.5) # 等待0.5秒
size2 = os.path.getsize(file_path)
return size1 == size2 and size1 > 0
except (OSError, IOError):
return False
def _process_image(self, file_path):
"""处理单个图片文件"""
file_path = Path(file_path)
# 检查是否已处理
if str(file_path) in self.processed_files:
return
self.logger.info(f"开始处理图片: {file_path.name}")
ai_result = None # 初始化变量
try:
# 检查是否启用内存处理
memory_processing = self.config['system'].get('memory_processing', False)
if memory_processing:
# 内存处理模式直接读取图片到内存不保存到processed_images
try:
with open(file_path, 'rb') as f:
image_bytes = f.read()
# 1. AI分析图片内存模式
ai_result = self.ai_service.analyze_image_from_bytes(image_bytes, file_path.name)
if not ai_result:
self.logger.error(f"AI分析失败: {file_path.name}")
return
except Exception as e:
self.logger.error(f"内存处理图片失败: {str(e)}")
# 回退到文件处理模式
memory_processing = False
if not memory_processing:
# 文件处理模式:传统方式
# 1. AI分析图片
ai_result = self.ai_service.analyze_image(str(file_path))
if not ai_result:
self.logger.error(f"AI分析失败: {file_path.name}")
return
# 2. 写入飞书
success = self.feishu_service.add_task(ai_result)
if success:
self.logger.info(f"成功写入飞书: {file_path.name}")
self.processed_files.add(str(file_path))
# 3. 后处理
self._post_process_file(file_path)
# 4. 发送通知
send_notification(
"任务处理成功",
f"已成功处理 {file_path.name} 并写入飞书表格"
)
else:
self.logger.error(f"写入飞书失败: {file_path.name}")
except Exception as e:
self.logger.error(f"处理图片 {file_path.name} 时出错: {str(e)}")
def _post_process_file(self, file_path):
"""文件后处理"""
post_process = self.config['system'].get('post_process', 'keep')
if post_process == 'delete':
try:
file_path.unlink()
self.logger.info(f"已删除文件: {file_path.name}")
except Exception as e:
self.logger.error(f"删除文件失败: {str(e)}")
elif post_process == 'move':
try:
processed_folder = Path(self.config['system']['processed_folder'])
target_path = processed_folder / file_path.name
# 如果目标文件已存在,添加时间戳
if target_path.exists():
timestamp = int(time.time())
target_path = processed_folder / f"{file_path.stem}_{timestamp}{file_path.suffix}"
file_path.rename(target_path)
self.logger.info(f"已移动文件到: {target_path}")
except Exception as e:
self.logger.error(f"移动文件失败: {str(e)}")
def on_created(self, event):
"""文件创建事件处理"""
if event.is_directory:
return
file_path = event.src_path
if not self._is_supported_image(file_path):
return
self.logger.info(f"检测到新文件: {file_path}")
# 等待文件完全写入
if self._is_file_ready(file_path):
self._process_image(file_path)
else:
self.logger.warning(f"文件未完全写入,跳过处理: {file_path}")
def on_modified(self, event):
"""文件修改事件处理"""
if event.is_directory:
return
file_path = event.src_path
if not self._is_supported_image(file_path):
return
# 对于修改事件,也检查文件是否已完全写入
if self._is_file_ready(file_path):
self._process_image(file_path)
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description="AI飞书多维表格自动录入助手",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
python main.py # 使用默认配置启动
python main.py --config custom.yaml # 使用自定义配置文件
python main.py --test # 测试模式(不监控文件夹)
"""
)
parser.add_argument(
'--config', '-c',
default='config.yaml',
help='配置文件路径 (默认: config.yaml)'
)
parser.add_argument(
'--test', '-t',
action='store_true',
help='测试模式:处理现有文件后退出,不监控文件夹'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='显示详细日志'
)
return parser.parse_args()
def main():
"""主函数"""
args = parse_arguments()
# 加载配置
try:
config = load_config(args.config)
except Exception as e:
print(f" 配置加载失败: {e}")
sys.exit(1)
# 验证配置
try:
validate_config(config)
except Exception as e:
print(f" 配置验证失败: {e}")
sys.exit(1)
# 设置日志
log_level = 'DEBUG' if args.verbose else 'INFO'
logger = setup_logger(
name='screen2feishu',
log_file='app.log',
level=log_level
)
logger.info("=" * 50)
logger.info("Screen2Feishu 启动")
logger.info(f"配置文件: {args.config}")
logger.info(f"监控文件夹: {config['system']['watch_folder']}")
logger.info("=" * 50)
# 创建文件处理器
handler = ImageFileHandler(config, logger)
if args.test:
# 测试模式:处理现有文件
watch_folder = Path(config['system']['watch_folder'])
image_files = [
f for f in watch_folder.iterdir()
if f.is_file() and handler._is_supported_image(f)
]
if not image_files:
logger.info("没有找到待处理的图片文件")
return
logger.info(f"找到 {len(image_files)} 个图片文件,开始处理...")
for image_file in image_files:
handler._process_image(image_file)
logger.info("测试模式处理完成")
return
# 监控模式
observer = Observer()
observer.schedule(
handler,
path=config['system']['watch_folder'],
recursive=False
)
try:
observer.start()
logger.info("文件监控已启动,按 Ctrl+C 停止程序")
# 发送启动通知
send_notification(
"Screen2Feishu 已启动",
f"正在监控文件夹: {config['system']['watch_folder']}"
)
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("收到停止信号,正在关闭...")
observer.stop()
# 发送停止通知
send_notification(
"Screen2Feishu 已停止",
"程序已正常关闭"
)
except Exception as e:
logger.error(f"程序运行出错: {str(e)}")
observer.stop()
# 发送错误通知
send_notification(
"Screen2Feishu 错误",
f"程序运行出错: {str(e)}"
)
sys.exit(1)
finally:
observer.join()
logger.info("程序已完全停止")
if __name__ == "__main__":
main()