Files
vibe_data_ana/webapp/storage.py

312 lines
11 KiB
Python

# -*- coding: utf-8 -*-
"""
SQLite-backed storage for uploaded files and analysis tasks.
"""
import json
import os
import sqlite3
import threading
import uuid
from contextlib import contextmanager
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional
def utcnow_iso() -> str:
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
class Storage:
"""Simple SQLite storage with thread-safe write operations."""
def __init__(self, db_path: str):
self.db_path = os.path.abspath(db_path)
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self._write_lock = threading.Lock()
self.init_db()
@contextmanager
def _connect(self):
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db(self) -> None:
with self._connect() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS uploaded_files (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
original_name TEXT NOT NULL,
stored_path TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS analysis_sessions (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
title TEXT NOT NULL,
status TEXT NOT NULL,
uploaded_file_ids TEXT NOT NULL,
template_file_id TEXT,
session_output_dir TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
closed_at TEXT
);
CREATE TABLE IF NOT EXISTS analysis_tasks (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
user_id TEXT NOT NULL,
query TEXT NOT NULL,
status TEXT NOT NULL,
uploaded_file_ids TEXT NOT NULL,
template_file_id TEXT,
session_output_dir TEXT,
report_file_path TEXT,
error_message TEXT,
created_at TEXT NOT NULL,
started_at TEXT,
finished_at TEXT
);
"""
)
def create_uploaded_file(
self, user_id: str, original_name: str, stored_path: str
) -> Dict[str, Any]:
record = {
"id": str(uuid.uuid4()),
"user_id": user_id,
"original_name": original_name,
"stored_path": os.path.abspath(stored_path),
"created_at": utcnow_iso(),
}
with self._write_lock, self._connect() as conn:
conn.execute(
"""
INSERT INTO uploaded_files (id, user_id, original_name, stored_path, created_at)
VALUES (:id, :user_id, :original_name, :stored_path, :created_at)
""",
record,
)
return record
def get_uploaded_file(self, file_id: str, user_id: str) -> Optional[Dict[str, Any]]:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM uploaded_files WHERE id = ? AND user_id = ?
""",
(file_id, user_id),
).fetchone()
return dict(row) if row else None
def list_uploaded_files(
self, file_ids: Iterable[str], user_id: str
) -> List[Dict[str, Any]]:
file_ids = list(file_ids)
if not file_ids:
return []
placeholders = ",".join("?" for _ in file_ids)
params = [*file_ids, user_id]
with self._connect() as conn:
rows = conn.execute(
f"""
SELECT * FROM uploaded_files
WHERE id IN ({placeholders}) AND user_id = ?
ORDER BY created_at ASC
""",
params,
).fetchall()
return [dict(row) for row in rows]
def list_all_uploaded_files(self, user_id: str) -> List[Dict[str, Any]]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT * FROM uploaded_files
WHERE user_id = ?
ORDER BY created_at DESC
""",
(user_id,),
).fetchall()
return [dict(row) for row in rows]
def create_task(
self,
session_id: str,
user_id: str,
query: str,
uploaded_file_ids: List[str],
template_file_id: Optional[str] = None,
) -> Dict[str, Any]:
record = {
"id": str(uuid.uuid4()),
"session_id": session_id,
"user_id": user_id,
"query": query,
"status": "queued",
"uploaded_file_ids": json.dumps(uploaded_file_ids, ensure_ascii=False),
"template_file_id": template_file_id,
"session_output_dir": None,
"report_file_path": None,
"error_message": None,
"created_at": utcnow_iso(),
"started_at": None,
"finished_at": None,
}
with self._write_lock, self._connect() as conn:
conn.execute(
"""
INSERT INTO analysis_tasks (
id, session_id, user_id, query, status, uploaded_file_ids, template_file_id,
session_output_dir, report_file_path, error_message,
created_at, started_at, finished_at
)
VALUES (
:id, :session_id, :user_id, :query, :status, :uploaded_file_ids, :template_file_id,
:session_output_dir, :report_file_path, :error_message,
:created_at, :started_at, :finished_at
)
""",
record,
)
return self.get_task(record["id"], user_id)
def get_task(self, task_id: str, user_id: str) -> Optional[Dict[str, Any]]:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM analysis_tasks WHERE id = ? AND user_id = ?
""",
(task_id, user_id),
).fetchone()
return self._normalize_task(row) if row else None
def list_tasks(self, user_id: str) -> List[Dict[str, Any]]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT * FROM analysis_tasks
WHERE user_id = ?
ORDER BY created_at DESC
""",
(user_id,),
).fetchall()
return [self._normalize_task(row) for row in rows]
def list_session_tasks(self, session_id: str, user_id: str) -> List[Dict[str, Any]]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT * FROM analysis_tasks
WHERE session_id = ? AND user_id = ?
ORDER BY created_at ASC
""",
(session_id, user_id),
).fetchall()
return [self._normalize_task(row) for row in rows]
def create_session(
self,
user_id: str,
title: str,
uploaded_file_ids: List[str],
template_file_id: Optional[str] = None,
) -> Dict[str, Any]:
now = utcnow_iso()
record = {
"id": str(uuid.uuid4()),
"user_id": user_id,
"title": title,
"status": "open",
"uploaded_file_ids": json.dumps(uploaded_file_ids, ensure_ascii=False),
"template_file_id": template_file_id,
"session_output_dir": None,
"created_at": now,
"updated_at": now,
"closed_at": None,
}
with self._write_lock, self._connect() as conn:
conn.execute(
"""
INSERT INTO analysis_sessions (
id, user_id, title, status, uploaded_file_ids, template_file_id,
session_output_dir, created_at, updated_at, closed_at
)
VALUES (
:id, :user_id, :title, :status, :uploaded_file_ids, :template_file_id,
:session_output_dir, :created_at, :updated_at, :closed_at
)
""",
record,
)
return self.get_session(record["id"], user_id)
def get_session(self, session_id: str, user_id: str) -> Optional[Dict[str, Any]]:
with self._connect() as conn:
row = conn.execute(
"""
SELECT * FROM analysis_sessions WHERE id = ? AND user_id = ?
""",
(session_id, user_id),
).fetchone()
return self._normalize_session(row) if row else None
def list_sessions(self, user_id: str) -> List[Dict[str, Any]]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT * FROM analysis_sessions
WHERE user_id = ?
ORDER BY updated_at DESC
""",
(user_id,),
).fetchall()
return [self._normalize_session(row) for row in rows]
def update_session(self, session_id: str, **fields: Any) -> None:
if not fields:
return
fields["updated_at"] = utcnow_iso()
assignments = ", ".join(f"{key} = :{key}" for key in fields.keys())
payload = dict(fields)
payload["id"] = session_id
with self._write_lock, self._connect() as conn:
conn.execute(
f"UPDATE analysis_sessions SET {assignments} WHERE id = :id",
payload,
)
def update_task(self, task_id: str, **fields: Any) -> None:
if not fields:
return
assignments = ", ".join(f"{key} = :{key}" for key in fields.keys())
payload = dict(fields)
payload["id"] = task_id
with self._write_lock, self._connect() as conn:
conn.execute(
f"UPDATE analysis_tasks SET {assignments} WHERE id = :id",
payload,
)
@staticmethod
def _normalize_task(row: sqlite3.Row) -> Dict[str, Any]:
task = dict(row)
task["uploaded_file_ids"] = json.loads(task["uploaded_file_ids"])
return task
@staticmethod
def _normalize_session(row: sqlite3.Row) -> Dict[str, Any]:
session = dict(row)
session["uploaded_file_ids"] = json.loads(session["uploaded_file_ids"])
return session