Files
tts_trans/app/main.py

544 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
TTS Book Service - 小米 MiMo TTS 转换服务
为听书 App 提供音频接入接口
"""
import os
import json
import base64
import subprocess
import uuid
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from pathlib import Path
import httpx
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from sqlalchemy import Column, Integer, String, Text, DateTime, func, select, delete
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
import config
# ── Logging ───────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("tts-service")
# ── Database ──────────────────────────────────────────────────────────────
engine = create_async_engine(config.DATABASE_URL, echo=False)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class Book(Base):
__tablename__ = "books"
id = Column(Integer, primary_key=True, autoincrement=True)
book_id = Column(String(100), unique=True, nullable=False, index=True)
title = Column(String(500), nullable=False)
author = Column(String(200), default="")
created_at = Column(DateTime, server_default=func.now())
class Chapter(Base):
__tablename__ = "chapters"
id = Column(Integer, primary_key=True, autoincrement=True)
book_id = Column(String(100), nullable=False, index=True)
chapter_id = Column(String(100), nullable=False, index=True)
app_chapter_id = Column(String(100), default="")
title = Column(String(500), default="")
text_content = Column(Text, default="")
audio_file = Column(String(500), default="")
status = Column(String(20), default="pending")
error_msg = Column(Text, default="")
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
# ── TTS Service ───────────────────────────────────────────────────────────
async def call_mimo_tts(text: str, style: str = "") -> bytes:
"""调用小米 MiMo TTS API返回 WAV 音频字节"""
if not config.MIMO_API_KEY:
raise HTTPException(500, "MIMO_API_KEY 未配置,请设置环境变量")
content = f"<style>{style}</style>{text}" if style else text
payload = {
"model": config.MIMO_TTS_MODEL,
"audio": {"format": "wav", "voice": config.MIMO_VOICE},
"messages": [{"role": "assistant", "content": content}],
}
headers = {
"Content-Type": "application/json",
"api-key": config.MIMO_API_KEY,
}
t0 = time.time()
logger.info(f"MiMo TTS 请求: text_len={len(text)}, style={style or '(默认)'}")
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(config.MIMO_API_ENDPOINT, json=payload, headers=headers)
elapsed = round(time.time() - t0, 2)
if resp.status_code != 200:
logger.error(f"MiMo TTS 错误: HTTP {resp.status_code}, 耗时 {elapsed}s, 响应: {resp.text[:200]}")
raise HTTPException(502, f"MiMo TTS API 错误: HTTP {resp.status_code} - {resp.text[:300]}")
data = resp.json()
if data.get("error"):
logger.error(f"MiMo TTS 业务错误: {data['error']}, 耗时 {elapsed}s")
raise HTTPException(502, f"MiMo TTS 错误: {data['error']}")
try:
audio_b64 = data["choices"][0]["message"]["audio"]["data"]
wav_bytes = base64.b64decode(audio_b64)
logger.info(f"MiMo TTS 成功: wav_size={len(wav_bytes)} bytes, 耗时 {elapsed}s")
return wav_bytes
except (KeyError, IndexError, TypeError) as e:
logger.error(f"MiMo TTS 响应解析失败: {e}, 耗时 {elapsed}s")
raise HTTPException(502, f"MiMo TTS 响应解析失败: {e}")
def wav_to_mp3(wav_path: str, mp3_path: str):
"""用 ffmpeg 将 WAV 转为 MP3"""
result = subprocess.run(
["ffmpeg", "-y", "-i", wav_path, "-codec:a", "libmp3lame", "-qscale:a", "2", mp3_path],
capture_output=True, text=True,
)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg 转换失败: {result.stderr[:300]}")
async def generate_chapter_audio(chapter_id_str: str):
"""为指定章节生成音频WAV → MP3"""
async with async_session() as db:
result = await db.execute(select(Chapter).where(Chapter.chapter_id == chapter_id_str))
chapter = result.scalar_one_or_none()
if not chapter:
return
if not chapter.text_content.strip():
chapter.status = "error"
chapter.error_msg = "文本内容为空"
await db.commit()
return
chapter.status = "generating"
await db.commit()
try:
audio_dir = Path(config.AUDIO_DIR) / chapter.book_id
audio_dir.mkdir(parents=True, exist_ok=True)
wav_path = str(audio_dir / f"{chapter.chapter_id}.wav")
mp3_path = str(audio_dir / f"{chapter.chapter_id}.mp3")
# MiMo TTS 生成 WAV
wav_bytes = await call_mimo_tts(chapter.text_content)
with open(wav_path, "wb") as f:
f.write(wav_bytes)
# WAV → MP3
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, wav_to_mp3, wav_path, mp3_path)
# 删除 WAV 源文件,只保留 MP3
os.remove(wav_path)
chapter.audio_file = mp3_path
chapter.status = "ready"
chapter.error_msg = ""
except Exception as e:
chapter.status = "error"
chapter.error_msg = str(e)[:500]
await db.commit()
# ── App Lifecycle ──────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
os.makedirs(config.AUDIO_DIR, exist_ok=True)
os.makedirs(os.path.join(config.BASE_DIR, "data"), exist_ok=True)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
app = FastAPI(title="TTS Book Service", lifespan=lifespan)
# ── 听书 App 音频接入接口 ─────────────────────────────────────────────────
@app.get("/api/book/{book_id}")
async def get_book_info(book_id: str):
"""获取书籍信息及章节列表(听书 App 调用)"""
async with async_session() as db:
book_result = await db.execute(select(Book).where(Book.book_id == book_id))
book = book_result.scalar_one_or_none()
if not book:
raise HTTPException(404, f"书籍 {book_id} 不存在")
ch_result = await db.execute(
select(Chapter).where(Chapter.book_id == book_id).order_by(Chapter.id)
)
chapters = ch_result.scalars().all()
return {
"book_id": book.book_id,
"title": book.title,
"author": book.author,
"chapters": [
{
"chapter_id": ch.chapter_id,
"app_chapter_id": ch.app_chapter_id,
"title": ch.title,
"status": ch.status,
"audio_url": f"/api/book/{book_id}/chapter/{ch.chapter_id}/audio"
if ch.status == "ready" else None,
}
for ch in chapters
],
}
@app.get("/api/book/{book_id}/chapter/{chapter_id}/audio")
async def get_chapter_audio(book_id: str, chapter_id: str):
"""获取章节音频文件(听书 App 调用)"""
async with async_session() as db:
result = await db.execute(
select(Chapter).where(
Chapter.book_id == book_id, Chapter.chapter_id == chapter_id
)
)
chapter = result.scalar_one_or_none()
if not chapter:
raise HTTPException(404, "章节不存在")
if chapter.status != "ready" or not chapter.audio_file:
raise HTTPException(404, f"音频尚未生成,当前状态: {chapter.status}")
if not os.path.exists(chapter.audio_file):
raise HTTPException(404, "音频文件丢失")
return FileResponse(chapter.audio_file, media_type="audio/mpeg", filename=f"{chapter_id}.mp3")
# ── 实时 TTS 接口(兼容听书 App 格式)─────────────────────────────────────
from fastapi.responses import Response
from urllib.parse import parse_qs
@app.post("/api/tts")
async def realtime_tts(request: Request):
"""
实时 TTS 生成接口
兼容两种 App 发送格式:
1. JSON body: {"text": "内容", "style": "开心"}
2. Form body: tex=内容&spd=5 (百度风格)
返回: MP3 音频二进制流 (audio/mpeg),失败返回 JSON
"""
text = ""
style = ""
content_type = request.headers.get("content-type", "")
try:
if "json" in content_type:
data = await request.json()
text = data.get("text", "").strip()
style = data.get("style", "").strip()
else:
# form-urlencoded (百度风格)
body_bytes = await request.body()
params = parse_qs(body_bytes.decode("utf-8"))
text = (params.get("tex", [""])[0]).strip()
# URL 解码(百度会 double-encode
from urllib.parse import unquote
text = unquote(unquote(text))
except Exception:
pass
if not text:
return Response(
content=json.dumps({"status": 40000001, "message": "text/tex 不能为空"}, ensure_ascii=False),
media_type="application/json",
status_code=400,
)
try:
# MiMo TTS 生成 WAV
wav_bytes = await call_mimo_tts(text, style)
# WAV → MP3临时文件
tmp_dir = Path(config.AUDIO_DIR) / "_tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_id = uuid.uuid4().hex
wav_path = str(tmp_dir / f"{tmp_id}.wav")
mp3_path = str(tmp_dir / f"{tmp_id}.mp3")
with open(wav_path, "wb") as f:
f.write(wav_bytes)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, wav_to_mp3, wav_path, mp3_path)
with open(mp3_path, "rb") as f:
mp3_bytes = f.read()
# 清理临时文件
os.remove(wav_path)
os.remove(mp3_path)
return Response(content=mp3_bytes, media_type="audio/mpeg")
except Exception as e:
return Response(
content=json.dumps({"status": 50000002, "message": str(e)[:300]}, ensure_ascii=False),
media_type="application/json",
status_code=500,
)
# ── 管理 API ──────────────────────────────────────────────────────────────
# --- Books ---
@app.get("/admin/api/books")
async def list_books():
async with async_session() as db:
result = await db.execute(select(Book).order_by(Book.id.desc()))
books = result.scalars().all()
return [{"book_id": b.book_id, "title": b.title, "author": b.author} for b in books]
@app.post("/admin/api/books")
async def create_book(request: Request):
data = await request.json()
book_id = data.get("book_id", "").strip()
title = data.get("title", "").strip()
author = data.get("author", "").strip()
if not book_id or not title:
raise HTTPException(400, "book_id 和 title 不能为空")
async with async_session() as db:
existing = await db.execute(select(Book).where(Book.book_id == book_id))
if existing.scalar_one_or_none():
raise HTTPException(409, f"书籍 {book_id} 已存在")
book = Book(book_id=book_id, title=title, author=author)
db.add(book)
await db.commit()
return {"ok": True, "book_id": book_id}
@app.delete("/admin/api/books/{book_id}")
async def delete_book(book_id: str):
async with async_session() as db:
await db.execute(delete(Chapter).where(Chapter.book_id == book_id))
await db.execute(delete(Book).where(Book.book_id == book_id))
await db.commit()
return {"ok": True}
# --- Chapters ---
@app.get("/admin/api/books/{book_id}/chapters")
async def list_chapters(book_id: str):
async with async_session() as db:
result = await db.execute(
select(Chapter).where(Chapter.book_id == book_id).order_by(Chapter.id)
)
chapters = result.scalars().all()
return [
{
"chapter_id": ch.chapter_id,
"app_chapter_id": ch.app_chapter_id,
"title": ch.title,
"text_content": ch.text_content[:200] + "..." if len(ch.text_content) > 200 else ch.text_content,
"text_length": len(ch.text_content),
"status": ch.status,
"error_msg": ch.error_msg,
"has_audio": ch.status == "ready",
}
for ch in chapters
]
@app.post("/admin/api/books/{book_id}/chapters")
async def create_chapter(book_id: str, request: Request):
data = await request.json()
chapter_id = data.get("chapter_id", "").strip()
title = data.get("title", "").strip()
app_chapter_id = data.get("app_chapter_id", "").strip()
text_content = data.get("text_content", "").strip()
if not chapter_id:
raise HTTPException(400, "chapter_id 不能为空")
async with async_session() as db:
existing = await db.execute(
select(Chapter).where(Chapter.book_id == book_id, Chapter.chapter_id == chapter_id)
)
if existing.scalar_one_or_none():
raise HTTPException(409, f"章节 {chapter_id} 已存在")
chapter = Chapter(
book_id=book_id,
chapter_id=chapter_id,
app_chapter_id=app_chapter_id or chapter_id,
title=title,
text_content=text_content,
)
db.add(chapter)
await db.commit()
return {"ok": True, "chapter_id": chapter_id}
@app.put("/admin/api/books/{book_id}/chapters/{chapter_id}")
async def update_chapter(book_id: str, chapter_id: str, request: Request):
data = await request.json()
async with async_session() as db:
result = await db.execute(
select(Chapter).where(Chapter.book_id == book_id, Chapter.chapter_id == chapter_id)
)
chapter = result.scalar_one_or_none()
if not chapter:
raise HTTPException(404, "章节不存在")
if "text_content" in data:
chapter.text_content = data["text_content"]
if "title" in data:
chapter.title = data["title"]
if "app_chapter_id" in data:
chapter.app_chapter_id = data["app_chapter_id"]
await db.commit()
return {"ok": True}
@app.delete("/admin/api/books/{book_id}/chapters/{chapter_id}")
async def delete_chapter(book_id: str, chapter_id: str):
async with async_session() as db:
await db.execute(
delete(Chapter).where(Chapter.book_id == book_id, Chapter.chapter_id == chapter_id)
)
await db.commit()
return {"ok": True}
# --- TTS ---
@app.post("/admin/api/books/{book_id}/chapters/{chapter_id}/generate")
async def generate_audio(book_id: str, chapter_id: str):
"""手动生成单章音频"""
await generate_chapter_audio(chapter_id)
async with async_session() as db:
result = await db.execute(
select(Chapter).where(Chapter.book_id == book_id, Chapter.chapter_id == chapter_id)
)
ch = result.scalar_one_or_none()
return {"ok": True, "status": ch.status, "error_msg": ch.error_msg}
@app.post("/admin/api/books/{book_id}/generate-all")
async def generate_all_chapters(book_id: str):
"""批量生成书籍所有章节音频"""
async with async_session() as db:
result = await db.execute(
select(Chapter).where(Chapter.book_id == book_id, Chapter.status != "ready")
)
chapters = result.scalars().all()
chapter_ids = [ch.chapter_id for ch in chapters]
for cid in chapter_ids:
await generate_chapter_audio(cid)
return {"ok": True, "total": len(chapter_ids), "chapter_ids": chapter_ids}
# --- TTS 试听 ---
@app.post("/admin/api/tts/preview")
async def tts_preview(request: Request):
"""试听 TTS 效果"""
data = await request.json()
text = data.get("text", "").strip()
style = data.get("style", "").strip()
if not text:
raise HTTPException(400, "文本不能为空")
wav_bytes = await call_mimo_tts(text, style)
audio_dir = Path(config.AUDIO_DIR) / "_preview"
audio_dir.mkdir(parents=True, exist_ok=True)
filename = f"{uuid.uuid4().hex}.mp3"
wav_path = str(audio_dir / f"{uuid.uuid4().hex}.wav")
mp3_path = str(audio_dir / filename)
with open(wav_path, "wb") as f:
f.write(wav_bytes)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, wav_to_mp3, wav_path, mp3_path)
os.remove(wav_path)
return {"ok": True, "url": f"/audio/_preview/{filename}"}
@app.get("/admin/api/config")
async def get_config():
return {
"endpoint": config.MIMO_API_ENDPOINT,
"model": config.MIMO_TTS_MODEL,
"voice": config.MIMO_VOICE,
"api_key_masked": config.MIMO_API_KEY[:6] + "****" if config.MIMO_API_KEY else "未配置",
}
# ── 配置文件下载 ───────────────────────────────────────────────────────────
@app.get("/httpTts.json")
async def serve_http_tts_config():
"""提供 App 导入用的音频源配置文件"""
config_path = os.path.join(config.BASE_DIR, "httpTts-mimo.json")
if os.path.exists(config_path):
return FileResponse(config_path, media_type="application/json")
raise HTTPException(404, "配置文件不存在")
# ── 静态文件 & 前端 ──────────────────────────────────────────────────────
app.mount("/audio", StaticFiles(directory=config.AUDIO_DIR), name="audio")
@app.get("/", response_class=HTMLResponse)
async def frontend():
html_path = os.path.join(config.BASE_DIR, "static", "index.html")
with open(html_path, "r", encoding="utf-8") as f:
return HTMLResponse(f.read())
# ── Main ──────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host=config.SERVER_HOST, port=config.SERVER_PORT, reload=True)