Files
vibe_data_ana/webapp/task_runner.py

148 lines
5.7 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
Background task runner for analysis jobs.
"""
import os
import shutil
import threading
from concurrent.futures import ThreadPoolExecutor
from contextlib import redirect_stderr, redirect_stdout
from typing import Optional
from utils.create_session_dir import create_session_output_dir
from webapp.session_manager import SessionManager
from webapp.storage import Storage, utcnow_iso
class TaskRunner:
"""Runs analysis tasks in background worker threads."""
def __init__(
self,
storage: Storage,
uploads_dir: str,
outputs_dir: str,
session_manager: SessionManager,
max_workers: int = 2,
):
self.storage = storage
self.uploads_dir = os.path.abspath(uploads_dir)
self.outputs_dir = os.path.abspath(outputs_dir)
self.session_manager = session_manager
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._lock = threading.Lock()
self._submitted = set()
def submit(self, task_id: str, user_id: str) -> None:
with self._lock:
if task_id in self._submitted:
return
self._submitted.add(task_id)
self._executor.submit(self._run_task, task_id, user_id)
def _run_task(self, task_id: str, user_id: str) -> None:
try:
task = self.storage.get_task(task_id, user_id)
if not task:
return
session = self.storage.get_session(task["session_id"], user_id)
if not session:
return
uploaded_files = self.storage.list_uploaded_files(
task["uploaded_file_ids"], user_id
)
data_files = [item["stored_path"] for item in uploaded_files]
template_path = self._resolve_template_path(task, user_id)
session_output_dir = session.get("session_output_dir")
if not session_output_dir:
session_output_dir = create_session_output_dir(
self.outputs_dir, session["title"]
)
self.storage.update_session(
session["id"],
session_output_dir=session_output_dir,
)
session = self.storage.get_session(task["session_id"], user_id)
runtime = self.session_manager.get_or_create(
session_id=session["id"],
user_id=user_id,
session_output_dir=session_output_dir,
uploaded_files=data_files,
template_path=template_path,
)
self.storage.update_task(
task_id,
status="running",
session_output_dir=session_output_dir,
started_at=utcnow_iso(),
error_message=None,
)
self.storage.update_session(session["id"], status="running")
log_path = os.path.join(session_output_dir, "task.log")
with runtime.lock:
with open(log_path, "a", encoding="utf-8") as log_file:
log_file.write(
f"[{utcnow_iso()}] task started for session {session['id']}\n"
)
try:
with redirect_stdout(log_file), redirect_stderr(log_file):
result = runtime.agent.analyze(
user_input=task["query"],
files=data_files,
template_path=template_path,
session_output_dir=session_output_dir,
reset_context=not runtime.initialized,
keep_session_open=True,
)
runtime.initialized = True
except Exception as exc:
self.storage.update_task(
task_id,
status="failed",
error_message=str(exc),
finished_at=utcnow_iso(),
report_file_path=None,
)
self.storage.update_session(session["id"], status="open")
log_file.write(f"[{utcnow_iso()}] task failed: {exc}\n")
return
report_file_path = self._persist_task_report(
task_id, session_output_dir, result.get("report_file_path")
)
self.storage.update_task(
task_id,
status="succeeded",
report_file_path=report_file_path,
finished_at=utcnow_iso(),
error_message=None,
)
self.storage.update_session(session["id"], status="open")
finally:
with self._lock:
self._submitted.discard(task_id)
def _resolve_template_path(self, task: dict, user_id: str) -> Optional[str]:
template_file_id = task.get("template_file_id")
if not template_file_id:
return None
file_record = self.storage.get_uploaded_file(template_file_id, user_id)
return file_record["stored_path"] if file_record else None
@staticmethod
def _persist_task_report(
task_id: str, session_output_dir: str, current_report_path: Optional[str]
) -> Optional[str]:
if not current_report_path or not os.path.exists(current_report_path):
return current_report_path
task_report_path = os.path.join(session_output_dir, f"report_{task_id}.md")
if os.path.abspath(current_report_path) != os.path.abspath(task_report_path):
shutil.copyfile(current_report_path, task_report_path)
return task_report_path