# -*- coding: utf-8 -*- """ 网页端应用 - 个性化饮食推荐助手 + 背诵排序功能 """ from flask import Flask, render_template, request, jsonify, session, Response import re import random import logging import json from pathlib import Path from datetime import datetime # 导入业务模块 from core.base import BaseConfig, AppCore, ModuleManager, ModuleType, initialize_app from modules.data_collection import DataCollectionModule from modules.ai_analysis import AIAnalysisModule from modules.recommendation_engine import RecommendationEngine from modules.ocr_calorie_recognition import OCRCalorieRecognitionModule # 配置日志 - 确保UTF-8编码 import sys # 设置标准输出编码为UTF-8 if sys.stdout.encoding != 'utf-8': sys.stdout.reconfigure(encoding='utf-8') if sys.stderr.encoding != 'utf-8': sys.stderr.reconfigure(encoding='utf-8') # 创建logs目录 Path('logs').mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/web_app.log', encoding='utf-8'), logging.StreamHandler(sys.stdout) ], force=True # 强制重新配置 ) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key-here' # 确保模板文件使用UTF-8编码读取 app.jinja_env.auto_reload = True app.config['TEMPLATES_AUTO_RELOAD'] = True # 设置Jinja2模板加载器使用UTF-8编码 from jinja2 import FileSystemLoader app.jinja_loader = FileSystemLoader('templates', encoding='utf-8') # 确保所有响应使用UTF-8编码 @app.after_request def after_request(response): """确保所有响应使用UTF-8编码""" response.headers['Content-Type'] = response.headers.get('Content-Type', 'text/html; charset=utf-8') if 'charset=' not in response.headers.get('Content-Type', ''): if response.headers.get('Content-Type', '').startswith('text/html'): response.headers['Content-Type'] = 'text/html; charset=utf-8' elif response.headers.get('Content-Type', '').startswith('application/json'): response.headers['Content-Type'] = 'application/json; charset=utf-8' return response # 初始化应用核心(延迟加载,避免在导入时就初始化) app_core = None config = None def get_app_core(): """获取应用核心实例(延迟初始化)""" global app_core, config if app_core is None: config = BaseConfig() if initialize_app(config): app_core = AppCore(config) # 注册所有模块 module_manager = ModuleManager(config) module_manager.register_module(DataCollectionModule(config)) module_manager.register_module(AIAnalysisModule(config)) module_manager.register_module(RecommendationEngine(config)) module_manager.register_module(OCRCalorieRecognitionModule(config)) module_manager.initialize_all() app_core.module_manager = module_manager app_core.start() logger.info("应用核心初始化完成") else: logger.error("应用核心初始化失败") return app_core class RecitationSorter: """背诵排序器""" def __init__(self): self.items = [] def extract_items(self, text): """从文本中提取背诵项目""" items = [] # 方法1: 按行分割,过滤空行和无关行 lines = text.strip().split('\n') for line in lines: line = line.strip() # 跳过空行 if not line: continue # 跳过明显的表头行(包含"章节"、"知识点"等) if any(keyword in line for keyword in ['章节', '知识点', '选择题', '主观题', '完成', '划']): continue # 跳过页码行 if re.match(r'^第\d+页', line) or re.match(r'^共\d+页', line): continue # 跳过说明文字 if any(keyword in line for keyword in ['使用说明', '祝:', '凯程', '框架', '理解', '背诵']): continue # 提取知识点的几种模式 # 模式1: 以数字或字母开头(如"1. 知识点"或"第一章 内容") match = re.match(r'^[第]?[一二三四五六七八九十\d]+[章节]?\s*[::、]?\s*(.+)', line) if match: item = match.group(1).strip() if item and len(item) > 1: # 至少2个字符才认为是有效知识点 items.append(item) continue # 模式2: 以"-"或"•"开头的列表项 match = re.match(r'^[-•]\s*(.+)', line) if match: item = match.group(1).strip() if item and len(item) > 1: items.append(item) continue # 模式3: 表格中的知识点(通常不包含特殊标记符) # 如果行中包含常见的中文标点,但不包含表格标记符,可能是知识点 if len(line) > 2 and not re.match(r'^[✓×√✗\s]+$', line): # 检查是否包含常见的中文内容 if re.search(r'[\u4e00-\u9fff]', line): # 包含中文 # 排除明显的表格分隔符 if not re.match(r'^[|+\-\s]+$', line): items.append(line) # 去重 unique_items = [] seen = set() for item in items: # 标准化:去除首尾空格,统一标点 normalized = item.strip() if normalized and normalized not in seen: seen.add(normalized) unique_items.append(normalized) return unique_items def random_sort(self, items): """随机排序项目""" shuffled = items.copy() random.shuffle(shuffled) return shuffled # 创建全局排序器实例 sorter = RecitationSorter() @app.route('/') def index(): """首页""" return render_template('index.html') @app.route('/recitation') def recitation(): """背诵排序页面""" return render_template('recitation.html') @app.route('/data-collection') def data_collection(): """数据采集页面""" return render_template('data_collection.html') @app.route('/recommendation') def recommendation(): """推荐页面""" return render_template('recommendation.html') @app.route('/analysis') def analysis(): """分析页面""" return render_template('analysis.html') @app.route('/api/extract', methods=['POST']) def extract_items(): """提取背诵项目API""" try: data = request.get_json() text = data.get('text', '') if not text: return jsonify({ 'success': False, 'message': '请输入要处理的文本' }), 400 # 提取项目 items = sorter.extract_items(text) if not items: return jsonify({ 'success': False, 'message': '未能识别到背诵内容,请检查文本格式' }), 400 logger.info(f"提取到 {len(items)} 个背诵项目") return jsonify({ 'success': True, 'items': items, 'count': len(items) }) except Exception as e: logger.error(f"提取项目失败: {e}") return jsonify({ 'success': False, 'message': f'处理失败: {str(e)}' }), 500 @app.route('/api/sort', methods=['POST']) def sort_items(): """随机排序API""" try: data = request.get_json() items = data.get('items', []) if not items: return jsonify({ 'success': False, 'message': '请先提取背诵项目' }), 400 # 随机排序 sorted_items = sorter.random_sort(items) logger.info(f"对 {len(sorted_items)} 个项目进行随机排序") return jsonify({ 'success': True, 'items': sorted_items, 'count': len(sorted_items) }) except Exception as e: logger.error(f"排序失败: {e}") return jsonify({ 'success': False, 'message': f'排序失败: {str(e)}' }), 500 @app.route('/api/export/sorted', methods=['POST']) def export_sorted(): """导出排序结果""" try: data = request.get_json() items = data.get('items', []) export_format = data.get('format', 'txt') # txt, json, csv if not items: return jsonify({ 'success': False, 'message': '没有可导出的数据' }), 400 if export_format == 'json': content = json.dumps({ 'items': items, 'count': len(items), 'export_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }, ensure_ascii=False, indent=2) filename = f'背诵排序结果_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json' mimetype = 'application/json; charset=utf-8' elif export_format == 'csv': import csv import io output = io.StringIO() writer = csv.writer(output) writer.writerow(['序号', '知识点']) for i, item in enumerate(items, 1): writer.writerow([i, item]) content = output.getvalue() filename = f'背诵排序结果_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv' mimetype = 'text/csv; charset=utf-8' else: # txt content_lines = ['背诵排序结果', '=' * 50, ''] for i, item in enumerate(items, 1): content_lines.append(f'{i}. {item}') content_lines.extend(['', '=' * 50, f'导出时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}']) content = '\n'.join(content_lines) filename = f'背诵排序结果_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt' mimetype = 'text/plain; charset=utf-8' # 修复文件名编码问题:HTTP头必须使用latin-1编码 # 使用RFC 5987标准编码中文文件名 from urllib.parse import quote # ASCII fallback文件名(确保兼容性) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") fallback_filename = f'recitation_sorted_{timestamp}.{export_format}' # RFC 5987编码:对UTF-8字节序列进行百分号编码 # 格式: filename="fallback"; filename*=UTF-8''encoded # 将所有字节都进行百分号编码,确保HTTP头的latin-1兼容性 try: utf8_bytes = filename.encode('utf-8') # 对所有字节进行百分号编码(大写十六进制) encoded_filename = ''.join([f'%{b:02X}' for b in utf8_bytes]) except Exception as e: logger.warning(f"文件名编码失败,使用fallback: {e}") encoded_filename = fallback_filename # 构建Content-Disposition头:同时提供fallback和UTF-8编码版本 content_disposition = ( f'attachment; filename="{fallback_filename}"; ' f"filename*=UTF-8''{encoded_filename}" ) response = Response( content.encode('utf-8'), mimetype=mimetype, headers={ 'Content-Disposition': content_disposition } ) return response except Exception as e: logger.error(f"导出失败: {e}") return jsonify({ 'success': False, 'message': f'导出失败: {str(e)}' }), 500 # ==================== 业务功能API ==================== @app.route('/api/user/login', methods=['POST']) def user_login(): """用户登录""" try: data = request.get_json() user_id = data.get('user_id', '').strip() if not user_id: return jsonify({ 'success': False, 'message': '请输入用户ID' }), 400 # 获取用户数据(如果不存在会自动创建) core = get_app_core() user_data = core.get_user_data(user_id) if user_data: session['user_id'] = user_id return jsonify({ 'success': True, 'user_id': user_id, 'name': user_data.profile.get('name', '未设置') }) else: return jsonify({ 'success': False, 'message': '用户数据获取失败' }), 500 except Exception as e: logger.error(f"用户登录失败: {e}") return jsonify({ 'success': False, 'message': f'登录失败: {str(e)}' }), 500 @app.route('/api/user/register', methods=['POST']) def user_register(): """用户注册""" try: data = request.get_json() user_id = data.get('user_id', '').strip() name = data.get('name', '').strip() if not user_id or not name: return jsonify({ 'success': False, 'message': '请输入用户ID和姓名' }), 400 core = get_app_core() user_data = core.get_user_data(user_id) # 更新用户基本信息 user_data.profile['name'] = name core.data_manager.save_user_data(user_data) session['user_id'] = user_id return jsonify({ 'success': True, 'user_id': user_id, 'name': name }) except Exception as e: logger.error(f"用户注册失败: {e}") return jsonify({ 'success': False, 'message': f'注册失败: {str(e)}' }), 500 @app.route('/api/questionnaire/submit', methods=['POST']) def submit_questionnaire(): """提交问卷""" try: user_id = session.get('user_id') or request.json.get('user_id') if not user_id: return jsonify({ 'success': False, 'message': '请先登录' }), 401 data = request.get_json() questionnaire_type = data.get('type', 'basic') # basic, taste, physiological answers = data.get('answers', {}) core = get_app_core() input_data = { 'type': 'questionnaire', 'questionnaire_type': questionnaire_type, 'answers': answers } result = core.process_user_request(ModuleType.DATA_COLLECTION, input_data, user_id) if result and result.result.get('success', False): return jsonify({ 'success': True, 'message': '问卷提交成功' }) else: return jsonify({ 'success': False, 'message': result.result.get('message', '问卷提交失败') if result else '处理失败' }), 500 except Exception as e: logger.error(f"提交问卷失败: {e}") return jsonify({ 'success': False, 'message': f'提交失败: {str(e)}' }), 500 @app.route('/api/meal/record', methods=['POST']) def record_meal(): """记录餐食""" try: user_id = session.get('user_id') or request.json.get('user_id') if not user_id: return jsonify({ 'success': False, 'message': '请先登录' }), 401 data = request.get_json() meal_data = { 'date': data.get('date', datetime.now().strftime('%Y-%m-%d')), 'meal_type': data.get('meal_type', 'lunch'), # breakfast, lunch, dinner 'foods': data.get('foods', []), 'quantities': data.get('quantities', []), 'calories': data.get('calories', 0), 'satisfaction_score': data.get('satisfaction_score', 3), 'notes': data.get('notes', '') } core = get_app_core() input_data = { 'type': 'meal_record', **meal_data } result = core.process_user_request(ModuleType.DATA_COLLECTION, input_data, user_id) if result and result.result.get('success', False): return jsonify({ 'success': True, 'message': '餐食记录成功' }) else: return jsonify({ 'success': False, 'message': result.result.get('message', '记录失败') if result else '处理失败' }), 500 except Exception as e: logger.error(f"记录餐食失败: {e}") return jsonify({ 'success': False, 'message': f'记录失败: {str(e)}' }), 500 @app.route('/api/recommendation/get', methods=['POST']) def get_recommendations(): """获取餐食推荐""" try: user_id = session.get('user_id') or request.json.get('user_id') if not user_id: return jsonify({ 'success': False, 'message': '请先登录' }), 401 data = request.get_json() meal_type = data.get('meal_type', 'lunch') preferences = data.get('preferences', {}) context = data.get('context', {}) core = get_app_core() input_data = { 'type': 'meal_recommendation', 'meal_type': meal_type, 'preferences': preferences, 'context': context } result = core.process_user_request(ModuleType.RECOMMENDATION, input_data, user_id) if result and result.result.get('success', False): return jsonify({ 'success': True, 'recommendations': result.result.get('recommendations', []) }) else: return jsonify({ 'success': False, 'message': result.result.get('message', '推荐失败') if result else '处理失败', 'recommendations': [] }), 500 except Exception as e: logger.error(f"获取推荐失败: {e}") return jsonify({ 'success': False, 'message': f'获取失败: {str(e)}', 'recommendations': [] }), 500 @app.route('/api/analysis/nutrition', methods=['POST']) def analyze_nutrition(): """营养分析""" try: user_id = session.get('user_id') or request.json.get('user_id') if not user_id: return jsonify({ 'success': False, 'message': '请先登录' }), 401 data = request.get_json() meal_data = data.get('meal_data', {}) core = get_app_core() input_data = { 'type': 'nutrition_analysis', 'meal_data': meal_data } result = core.process_user_request(ModuleType.USER_ANALYSIS, input_data, user_id) if result and result.result.get('success', False): return jsonify({ 'success': True, 'analysis': result.result.get('analysis', {}) }) else: return jsonify({ 'success': False, 'message': result.result.get('message', '分析失败') if result else '处理失败' }), 500 except Exception as e: logger.error(f"营养分析失败: {e}") return jsonify({ 'success': False, 'message': f'分析失败: {str(e)}' }), 500 @app.route('/health') def health(): """健康检查""" return jsonify({'status': 'ok'}) if __name__ == '__main__': # 创建必要的目录 Path('templates').mkdir(exist_ok=True) Path('static').mkdir(exist_ok=True) Path('logs').mkdir(exist_ok=True) # 启动应用 app.run(debug=True, host='0.0.0.0', port=5000)