Files
feishu_screen/main.py

317 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Screen2Feishu - AI飞书多维表格自动录入助手
监控文件夹,自动处理图片并写入飞书多维表格
"""
import os
import sys
import time
import argparse
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 添加项目根目录到Python路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from services.ai_service import AIService
from services.feishu_service import FeishuService
from utils.logger import setup_logger
from utils.notifier import send_notification
from utils.config_loader import load_config, validate_config
class ImageFileHandler(FileSystemEventHandler):
"""文件系统事件处理器"""
def __init__(self, config, logger):
self.config = config
self.logger = logger
self.ai_service = AIService(config['ai'])
self.feishu_service = FeishuService(config['feishu'])
self.processed_files = set() # 已处理文件记录
self.supported_extensions = {'.png', '.jpg', '.jpeg', '.bmp', '.gif'}
# 创建必要的文件夹
self._create_directories()
def _create_directories(self):
"""创建必要的文件夹"""
watch_folder = Path(self.config['system']['watch_folder'])
watch_folder.mkdir(exist_ok=True)
if self.config['system'].get('post_process') == 'move':
processed_folder = Path(self.config['system']['processed_folder'])
processed_folder.mkdir(exist_ok=True)
def _is_supported_image(self, file_path):
"""检查文件是否为支持的图片格式"""
return Path(file_path).suffix.lower() in self.supported_extensions
def _is_file_ready(self, file_path):
"""检查文件是否已完全写入(通过文件大小稳定判断)"""
try:
size1 = os.path.getsize(file_path)
time.sleep(0.5) # 等待0.5秒
size2 = os.path.getsize(file_path)
return size1 == size2 and size1 > 0
except (OSError, IOError):
return False
def _process_image(self, file_path):
"""处理单个图片文件"""
file_path = Path(file_path)
# 检查是否已处理
if str(file_path) in self.processed_files:
return
self.logger.info(f"开始处理图片: {file_path.name}")
ai_result = None # 初始化变量
try:
# 检查是否启用内存处理
memory_processing = self.config['system'].get('memory_processing', False)
if memory_processing:
# 内存处理模式直接读取图片到内存不保存到processed_images
try:
with open(file_path, 'rb') as f:
image_bytes = f.read()
# 1. AI分析图片内存模式
ai_result = self.ai_service.analyze_image_from_bytes(image_bytes, file_path.name)
if not ai_result:
self.logger.error(f"AI分析失败: {file_path.name}")
return
except Exception as e:
self.logger.error(f"内存处理图片失败: {str(e)}")
# 回退到文件处理模式
memory_processing = False
if not memory_processing:
# 文件处理模式:传统方式
# 1. AI分析图片
ai_result = self.ai_service.analyze_image(str(file_path))
if not ai_result:
self.logger.error(f"AI分析失败: {file_path.name}")
return
# 2. 写入飞书
success = self.feishu_service.add_task(ai_result)
if success:
self.logger.info(f"成功写入飞书: {file_path.name}")
self.processed_files.add(str(file_path))
# 3. 后处理
self._post_process_file(file_path)
# 4. 发送通知
send_notification(
"任务处理成功",
f"已成功处理 {file_path.name} 并写入飞书表格"
)
else:
self.logger.error(f"写入飞书失败: {file_path.name}")
except Exception as e:
self.logger.error(f"处理图片 {file_path.name} 时出错: {str(e)}")
def _post_process_file(self, file_path):
"""文件后处理"""
post_process = self.config['system'].get('post_process', 'keep')
if post_process == 'delete':
try:
file_path.unlink()
self.logger.info(f"已删除文件: {file_path.name}")
except Exception as e:
self.logger.error(f"删除文件失败: {str(e)}")
elif post_process == 'move':
try:
processed_folder = Path(self.config['system']['processed_folder'])
target_path = processed_folder / file_path.name
# 如果目标文件已存在,添加时间戳
if target_path.exists():
timestamp = int(time.time())
target_path = processed_folder / f"{file_path.stem}_{timestamp}{file_path.suffix}"
file_path.rename(target_path)
self.logger.info(f"已移动文件到: {target_path}")
except Exception as e:
self.logger.error(f"移动文件失败: {str(e)}")
def on_created(self, event):
"""文件创建事件处理"""
if event.is_directory:
return
file_path = event.src_path
if not self._is_supported_image(file_path):
return
self.logger.info(f"检测到新文件: {file_path}")
# 等待文件完全写入
if self._is_file_ready(file_path):
self._process_image(file_path)
else:
self.logger.warning(f"文件未完全写入,跳过处理: {file_path}")
def on_modified(self, event):
"""文件修改事件处理"""
if event.is_directory:
return
file_path = event.src_path
if not self._is_supported_image(file_path):
return
# 对于修改事件,也检查文件是否已完全写入
if self._is_file_ready(file_path):
self._process_image(file_path)
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description="AI飞书多维表格自动录入助手",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
python main.py # 使用默认配置启动
python main.py --config custom.yaml # 使用自定义配置文件
python main.py --test # 测试模式(不监控文件夹)
"""
)
parser.add_argument(
'--config', '-c',
default='config.yaml',
help='配置文件路径 (默认: config.yaml)'
)
parser.add_argument(
'--test', '-t',
action='store_true',
help='测试模式:处理现有文件后退出,不监控文件夹'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='显示详细日志'
)
return parser.parse_args()
def main():
"""主函数"""
args = parse_arguments()
# 加载配置
try:
config = load_config(args.config)
except Exception as e:
print(f" 配置加载失败: {e}")
sys.exit(1)
# 验证配置
try:
validate_config(config)
except Exception as e:
print(f" 配置验证失败: {e}")
sys.exit(1)
# 设置日志
log_level = 'DEBUG' if args.verbose else 'INFO'
logger = setup_logger(
name='screen2feishu',
log_file='app.log',
level=log_level
)
logger.info("=" * 50)
logger.info("Screen2Feishu 启动")
logger.info(f"配置文件: {args.config}")
logger.info(f"监控文件夹: {config['system']['watch_folder']}")
logger.info("=" * 50)
# 创建文件处理器
handler = ImageFileHandler(config, logger)
if args.test:
# 测试模式:处理现有文件
watch_folder = Path(config['system']['watch_folder'])
image_files = [
f for f in watch_folder.iterdir()
if f.is_file() and handler._is_supported_image(f)
]
if not image_files:
logger.info("没有找到待处理的图片文件")
return
logger.info(f"找到 {len(image_files)} 个图片文件,开始处理...")
for image_file in image_files:
handler._process_image(image_file)
logger.info("测试模式处理完成")
return
# 监控模式
observer = Observer()
observer.schedule(
handler,
path=config['system']['watch_folder'],
recursive=False
)
try:
observer.start()
logger.info("文件监控已启动,按 Ctrl+C 停止程序")
# 发送启动通知
send_notification(
"Screen2Feishu 已启动",
f"正在监控文件夹: {config['system']['watch_folder']}"
)
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("收到停止信号,正在关闭...")
observer.stop()
# 发送停止通知
send_notification(
"Screen2Feishu 已停止",
"程序已正常关闭"
)
except Exception as e:
logger.error(f"程序运行出错: {str(e)}")
observer.stop()
# 发送错误通知
send_notification(
"Screen2Feishu 错误",
f"程序运行出错: {str(e)}"
)
sys.exit(1)
finally:
observer.join()
logger.info("程序已完全停止")
if __name__ == "__main__":
main()