Files
recommend/web_app.py

701 lines
23 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
网页端应用 - 个性化饮食推荐助手 + 背诵排序功能
"""
from flask import Flask, render_template, request, jsonify, session, Response
import re
import random
import logging
import json
from pathlib import Path
from datetime import datetime
# 导入业务模块
from core.base import BaseConfig, AppCore, ModuleManager, ModuleType, initialize_app
from modules.data_collection import DataCollectionModule
from modules.ai_analysis import AIAnalysisModule
from modules.recommendation_engine import RecommendationEngine
from modules.ocr_calorie_recognition import OCRCalorieRecognitionModule
# 配置日志 - 确保UTF-8编码
import sys
# 设置标准输出编码为UTF-8
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8')
if sys.stderr.encoding != 'utf-8':
sys.stderr.reconfigure(encoding='utf-8')
# 创建logs目录
Path('logs').mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/web_app.log', encoding='utf-8'),
logging.StreamHandler(sys.stdout)
],
force=True # 强制重新配置
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
# 确保模板文件使用UTF-8编码读取
app.jinja_env.auto_reload = True
app.config['TEMPLATES_AUTO_RELOAD'] = True
# 设置Jinja2模板加载器使用UTF-8编码
from jinja2 import FileSystemLoader
app.jinja_loader = FileSystemLoader('templates', encoding='utf-8')
# 确保所有响应使用UTF-8编码
@app.after_request
def after_request(response):
"""确保所有响应使用UTF-8编码"""
response.headers['Content-Type'] = response.headers.get('Content-Type', 'text/html; charset=utf-8')
if 'charset=' not in response.headers.get('Content-Type', ''):
if response.headers.get('Content-Type', '').startswith('text/html'):
response.headers['Content-Type'] = 'text/html; charset=utf-8'
elif response.headers.get('Content-Type', '').startswith('application/json'):
response.headers['Content-Type'] = 'application/json; charset=utf-8'
return response
# 初始化应用核心(延迟加载,避免在导入时就初始化)
app_core = None
config = None
def get_app_core():
"""获取应用核心实例(延迟初始化)"""
global app_core, config
if app_core is None:
config = BaseConfig()
if initialize_app(config):
app_core = AppCore(config)
# 注册所有模块
module_manager = ModuleManager(config)
module_manager.register_module(DataCollectionModule(config))
module_manager.register_module(AIAnalysisModule(config))
module_manager.register_module(RecommendationEngine(config))
module_manager.register_module(OCRCalorieRecognitionModule(config))
module_manager.initialize_all()
app_core.module_manager = module_manager
app_core.start()
logger.info("应用核心初始化完成")
else:
logger.error("应用核心初始化失败")
return app_core
class RecitationSorter:
"""背诵排序器"""
def __init__(self):
self.items = []
def extract_items(self, text):
"""从文本中提取背诵项目"""
items = []
# 方法1: 按行分割,过滤空行和无关行
lines = text.strip().split('\n')
for line in lines:
line = line.strip()
# 跳过空行
if not line:
continue
# 跳过明显的表头行(包含"章节"、"知识点"等)
if any(keyword in line for keyword in ['章节', '知识点', '选择题', '主观题', '完成', '']):
continue
# 跳过页码行
if re.match(r'^第\d+页', line) or re.match(r'^共\d+页', line):
continue
# 跳过说明文字
if any(keyword in line for keyword in ['使用说明', '祝:', '凯程', '框架', '理解', '背诵']):
continue
# 提取知识点的几种模式
# 模式1: 以数字或字母开头(如"1. 知识点"或"第一章 内容"
match = re.match(r'^[第]?[一二三四五六七八九十\d]+[章节]?\s*[:、]?\s*(.+)', line)
if match:
item = match.group(1).strip()
if item and len(item) > 1: # 至少2个字符才认为是有效知识点
items.append(item)
continue
# 模式2: 以"-"或"•"开头的列表项
match = re.match(r'^[-•]\s*(.+)', line)
if match:
item = match.group(1).strip()
if item and len(item) > 1:
items.append(item)
continue
# 模式3: 表格中的知识点(通常不包含特殊标记符)
# 如果行中包含常见的中文标点,但不包含表格标记符,可能是知识点
if len(line) > 2 and not re.match(r'^[✓×√✗\s]+$', line):
# 检查是否包含常见的中文内容
if re.search(r'[\u4e00-\u9fff]', line): # 包含中文
# 排除明显的表格分隔符
if not re.match(r'^[|+\-\s]+$', line):
items.append(line)
# 去重
unique_items = []
seen = set()
for item in items:
# 标准化:去除首尾空格,统一标点
normalized = item.strip()
if normalized and normalized not in seen:
seen.add(normalized)
unique_items.append(normalized)
return unique_items
def random_sort(self, items):
"""随机排序项目"""
shuffled = items.copy()
random.shuffle(shuffled)
return shuffled
# 创建全局排序器实例
sorter = RecitationSorter()
@app.route('/')
def index():
"""首页"""
return render_template('index.html')
@app.route('/recitation')
def recitation():
"""背诵排序页面"""
return render_template('recitation.html')
@app.route('/data-collection')
def data_collection():
"""数据采集页面"""
return render_template('data_collection.html')
@app.route('/recommendation')
def recommendation():
"""推荐页面"""
return render_template('recommendation.html')
@app.route('/analysis')
def analysis():
"""分析页面"""
return render_template('analysis.html')
@app.route('/api/extract', methods=['POST'])
def extract_items():
"""提取背诵项目API"""
try:
data = request.get_json()
text = data.get('text', '')
if not text:
return jsonify({
'success': False,
'message': '请输入要处理的文本'
}), 400
# 提取项目
items = sorter.extract_items(text)
if not items:
return jsonify({
'success': False,
'message': '未能识别到背诵内容,请检查文本格式'
}), 400
logger.info(f"提取到 {len(items)} 个背诵项目")
return jsonify({
'success': True,
'items': items,
'count': len(items)
})
except Exception as e:
logger.error(f"提取项目失败: {e}")
return jsonify({
'success': False,
'message': f'处理失败: {str(e)}'
}), 500
@app.route('/api/sort', methods=['POST'])
def sort_items():
"""随机排序API"""
try:
data = request.get_json()
items = data.get('items', [])
if not items:
return jsonify({
'success': False,
'message': '请先提取背诵项目'
}), 400
# 随机排序
sorted_items = sorter.random_sort(items)
logger.info(f"{len(sorted_items)} 个项目进行随机排序")
return jsonify({
'success': True,
'items': sorted_items,
'count': len(sorted_items)
})
except Exception as e:
logger.error(f"排序失败: {e}")
return jsonify({
'success': False,
'message': f'排序失败: {str(e)}'
}), 500
@app.route('/api/export/sorted', methods=['POST'])
def export_sorted():
"""导出排序结果"""
try:
data = request.get_json()
items = data.get('items', [])
export_format = data.get('format', 'txt') # txt, json, csv
if not items:
return jsonify({
'success': False,
'message': '没有可导出的数据'
}), 400
if export_format == 'json':
content = json.dumps({
'items': items,
'count': len(items),
'export_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}, ensure_ascii=False, indent=2)
filename = f'背诵排序结果_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
mimetype = 'application/json; charset=utf-8'
elif export_format == 'csv':
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['序号', '知识点'])
for i, item in enumerate(items, 1):
writer.writerow([i, item])
content = output.getvalue()
filename = f'背诵排序结果_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
mimetype = 'text/csv; charset=utf-8'
else: # txt
content_lines = ['背诵排序结果', '=' * 50, '']
for i, item in enumerate(items, 1):
content_lines.append(f'{i}. {item}')
content_lines.extend(['', '=' * 50, f'导出时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'])
content = '\n'.join(content_lines)
filename = f'背诵排序结果_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt'
mimetype = 'text/plain; charset=utf-8'
# 修复文件名编码问题HTTP头必须使用latin-1编码
# 使用RFC 5987标准编码中文文件名
from urllib.parse import quote
# ASCII fallback文件名确保兼容性
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
fallback_filename = f'recitation_sorted_{timestamp}.{export_format}'
# RFC 5987编码对UTF-8字节序列进行百分号编码
# 格式: filename="fallback"; filename*=UTF-8''encoded
# 将所有字节都进行百分号编码确保HTTP头的latin-1兼容性
try:
utf8_bytes = filename.encode('utf-8')
# 对所有字节进行百分号编码(大写十六进制)
encoded_filename = ''.join([f'%{b:02X}' for b in utf8_bytes])
except Exception as e:
logger.warning(f"文件名编码失败使用fallback: {e}")
encoded_filename = fallback_filename
# 构建Content-Disposition头同时提供fallback和UTF-8编码版本
content_disposition = (
f'attachment; filename="{fallback_filename}"; '
f"filename*=UTF-8''{encoded_filename}"
)
response = Response(
content.encode('utf-8'),
mimetype=mimetype,
headers={
'Content-Disposition': content_disposition
}
)
return response
except Exception as e:
logger.error(f"导出失败: {e}")
return jsonify({
'success': False,
'message': f'导出失败: {str(e)}'
}), 500
# ==================== 业务功能API ====================
@app.route('/api/user/login', methods=['POST'])
def user_login():
"""用户登录"""
try:
data = request.get_json()
user_id = data.get('user_id', '').strip()
if not user_id:
return jsonify({
'success': False,
'message': '请输入用户ID'
}), 400
# 获取用户数据(如果不存在会自动创建)
core = get_app_core()
user_data = core.get_user_data(user_id)
if user_data:
session['user_id'] = user_id
return jsonify({
'success': True,
'user_id': user_id,
'name': user_data.profile.get('name', '未设置')
})
else:
return jsonify({
'success': False,
'message': '用户数据获取失败'
}), 500
except Exception as e:
logger.error(f"用户登录失败: {e}")
return jsonify({
'success': False,
'message': f'登录失败: {str(e)}'
}), 500
@app.route('/api/user/register', methods=['POST'])
def user_register():
"""用户注册"""
try:
data = request.get_json()
user_id = data.get('user_id', '').strip()
name = data.get('name', '').strip()
if not user_id or not name:
return jsonify({
'success': False,
'message': '请输入用户ID和姓名'
}), 400
core = get_app_core()
user_data = core.get_user_data(user_id)
# 如果用户不存在,创建新用户
if not user_data:
2025-11-03 12:29:32 +08:00
from core.base import UserData
# 直接创建UserData对象并保存
initial_profile = {
'name': name,
'age': 25,
'gender': '未知',
'height': 170,
'weight': 60,
'activity_level': 'moderate'
}
2025-11-03 12:29:32 +08:00
user_data = UserData(
user_id=user_id,
profile=initial_profile,
preferences={}
)
# 保存用户数据
if not core.data_manager.save_user_data(user_data):
logger.error(f"保存用户数据失败: user_id={user_id}")
return jsonify({
'success': False,
2025-11-03 12:29:32 +08:00
'message': '创建用户失败:数据保存失败'
}), 500
2025-11-03 12:29:32 +08:00
# 验证保存是否成功(重新获取一次)
saved_user_data = core.get_user_data(user_id)
if not saved_user_data:
logger.error(f"用户数据保存后无法获取: user_id={user_id}")
return jsonify({
'success': False,
'message': '创建用户失败:数据验证失败'
}), 500
user_data = saved_user_data
else:
2025-11-03 12:29:32 +08:00
# 用户已存在,更新姓名
user_data.profile['name'] = name
if not core.data_manager.save_user_data(user_data):
logger.error(f"更新用户数据失败: user_id={user_id}")
return jsonify({
'success': False,
'message': '更新用户信息失败'
}), 500
# 设置会话
session['user_id'] = user_id
return jsonify({
'success': True,
'user_id': user_id,
'name': user_data.profile.get('name', name)
})
except Exception as e:
logger.error(f"用户注册失败: {e}", exc_info=True)
return jsonify({
'success': False,
'message': f'注册失败: {str(e)}'
}), 500
@app.route('/api/questionnaire/submit', methods=['POST'])
def submit_questionnaire():
"""提交问卷"""
try:
user_id = session.get('user_id') or request.json.get('user_id')
if not user_id:
return jsonify({
'success': False,
'message': '请先登录'
}), 401
core = get_app_core()
# 确保用户存在
user_data = core.get_user_data(user_id)
if not user_data:
return jsonify({
'success': False,
'message': '用户数据不存在,请先注册'
}), 404
data = request.get_json()
questionnaire_type = data.get('type', 'basic') # basic, taste, physiological
answers = data.get('answers', {})
input_data = {
'type': 'questionnaire',
'questionnaire_type': questionnaire_type,
'answers': answers
}
result = core.process_user_request(ModuleType.DATA_COLLECTION, input_data, user_id)
if result and result.result.get('success', False):
return jsonify({
'success': True,
'message': '问卷提交成功'
})
else:
return jsonify({
'success': False,
'message': result.result.get('message', '问卷提交失败') if result else '处理失败'
}), 500
except Exception as e:
logger.error(f"提交问卷失败: {e}", exc_info=True)
return jsonify({
'success': False,
'message': f'提交失败: {str(e)}'
}), 500
@app.route('/api/meal/record', methods=['POST'])
def record_meal():
"""记录餐食"""
try:
user_id = session.get('user_id') or request.json.get('user_id')
if not user_id:
return jsonify({
'success': False,
'message': '请先登录'
}), 401
core = get_app_core()
# 确保用户存在
user_data = core.get_user_data(user_id)
if not user_data:
return jsonify({
'success': False,
'message': '用户数据不存在,请先注册'
}), 404
data = request.get_json()
meal_data = {
'date': data.get('date', datetime.now().strftime('%Y-%m-%d')),
'meal_type': data.get('meal_type', 'lunch'), # breakfast, lunch, dinner
'foods': data.get('foods', []),
'quantities': data.get('quantities', []),
'calories': data.get('calories', 0),
'satisfaction_score': data.get('satisfaction_score', 3),
'notes': data.get('notes', '')
}
input_data = {
'type': 'meal_record',
**meal_data
}
result = core.process_user_request(ModuleType.DATA_COLLECTION, input_data, user_id)
if result and result.result.get('success', False):
return jsonify({
'success': True,
'message': '餐食记录成功'
})
else:
return jsonify({
'success': False,
'message': result.result.get('message', '记录失败') if result else '处理失败'
}), 500
except Exception as e:
logger.error(f"记录餐食失败: {e}", exc_info=True)
return jsonify({
'success': False,
'message': f'记录失败: {str(e)}'
}), 500
@app.route('/api/recommendation/get', methods=['POST'])
def get_recommendations():
"""获取餐食推荐"""
try:
user_id = session.get('user_id') or request.json.get('user_id')
if not user_id:
return jsonify({
'success': False,
'message': '请先登录'
}), 401
core = get_app_core()
# 确保用户存在
user_data = core.get_user_data(user_id)
if not user_data:
return jsonify({
'success': False,
'message': '用户数据不存在,请先注册',
'recommendations': []
}), 404
data = request.get_json()
meal_type = data.get('meal_type', 'lunch')
preferences = data.get('preferences', {})
context = data.get('context', {})
input_data = {
'type': 'meal_recommendation',
'meal_type': meal_type,
'preferences': preferences,
'context': context
}
result = core.process_user_request(ModuleType.RECOMMENDATION, input_data, user_id)
if result and result.result.get('success', False):
return jsonify({
'success': True,
'recommendations': result.result.get('recommendations', [])
})
else:
return jsonify({
'success': False,
'message': result.result.get('message', '推荐失败') if result else '处理失败',
'recommendations': []
}), 500
except Exception as e:
logger.error(f"获取推荐失败: {e}", exc_info=True)
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}',
'recommendations': []
}), 500
@app.route('/api/analysis/nutrition', methods=['POST'])
def analyze_nutrition():
"""营养分析"""
try:
user_id = session.get('user_id') or request.json.get('user_id')
if not user_id:
return jsonify({
'success': False,
'message': '请先登录'
}), 401
core = get_app_core()
# 确保用户存在
user_data = core.get_user_data(user_id)
if not user_data:
return jsonify({
'success': False,
'message': '用户数据不存在,请先注册'
}), 404
data = request.get_json()
meal_data = data.get('meal_data', {})
input_data = {
'type': 'nutrition_analysis',
'meal_data': meal_data
}
result = core.process_user_request(ModuleType.USER_ANALYSIS, input_data, user_id)
if result and result.result.get('success', False):
return jsonify({
'success': True,
'analysis': result.result.get('analysis', {})
})
else:
return jsonify({
'success': False,
'message': result.result.get('message', '分析失败') if result else '处理失败'
}), 500
except Exception as e:
logger.error(f"营养分析失败: {e}", exc_info=True)
return jsonify({
'success': False,
'message': f'分析失败: {str(e)}'
}), 500
@app.route('/health')
def health():
"""健康检查"""
return jsonify({'status': 'ok'})
if __name__ == '__main__':
# 创建必要的目录
Path('templates').mkdir(exist_ok=True)
Path('static').mkdir(exist_ok=True)
Path('logs').mkdir(exist_ok=True)
# 启动应用
2025-11-05 13:32:14 +08:00
app.run(debug=True, host='0.0.0.0', port=7400)