Files

116 lines
4.2 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
import json
from typing import Any
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from .config import Settings, get_settings
from .git_manager import GitCommandError, GitManager
from .locks import WorkspaceLockManager
from .models import HealthResponse, QueryRequest, QueryResponse, StatusResponse, SyncRequest, SyncResponse, WorkspaceStatusItem
from .qmd_client import QMDClient, QMDCommandError
from .query_service import QueryService
from .sync_service import SyncService
from .workspace_manager import WorkspaceManager
def create_app(
settings: Settings | None = None,
*,
qmd_client: QMDClient | None = None,
git_manager: GitManager | None = None,
lock_manager: WorkspaceLockManager | None = None,
) -> FastAPI:
app = FastAPI(title="memory-gateway", version="0.1.0")
resolved_settings = settings or get_settings()
workspace_manager = WorkspaceManager(
workspaces_root=resolved_settings.workspaces_root,
default_branch=resolved_settings.default_branch,
qmd_index_prefix=resolved_settings.qmd_index_prefix,
)
resolved_git_manager = git_manager or GitManager(
remote_url=resolved_settings.git_remote_url,
mirror_path=resolved_settings.git_mirror_path,
)
resolved_qmd_client = qmd_client or QMDClient(
qmd_binary=resolved_settings.qmd_binary,
timeout_seconds=resolved_settings.qmd_timeout_seconds,
xdg_cache_home=resolved_settings.xdg_cache_home,
xdg_config_home=resolved_settings.xdg_config_home,
)
resolved_lock_manager = lock_manager or WorkspaceLockManager()
sync_service = SyncService(
settings=resolved_settings,
workspace_manager=workspace_manager,
git_manager=resolved_git_manager,
qmd_client=resolved_qmd_client,
)
query_service = QueryService(
sync_service=sync_service,
qmd_client=resolved_qmd_client,
lock_manager=resolved_lock_manager,
)
app.state.settings = resolved_settings
app.state.sync_service = sync_service
app.state.query_service = query_service
@app.exception_handler(ValueError)
def handle_value_error(_: Any, exc: ValueError) -> JSONResponse:
return JSONResponse(status_code=400, content={"ok": False, "error": str(exc)})
@app.exception_handler(GitCommandError)
def handle_git_error(_: Any, exc: GitCommandError) -> JSONResponse:
return JSONResponse(status_code=500, content={"ok": False, "error": str(exc)})
@app.exception_handler(QMDCommandError)
def handle_qmd_error(_: Any, exc: QMDCommandError) -> JSONResponse:
return JSONResponse(status_code=500, content={"ok": False, "error": str(exc)})
@app.get("/health", response_model=HealthResponse)
def health() -> HealthResponse:
return HealthResponse(ok=True, service="memory-gateway", timestamp=datetime.now(timezone.utc))
@app.post("/query", response_model=QueryResponse)
def query(request: QueryRequest) -> QueryResponse:
return app.state.query_service.handle_query(request)
@app.post("/sync", response_model=SyncResponse)
def sync(request: SyncRequest) -> SyncResponse:
return app.state.query_service.handle_sync(request)
@app.get("/status", response_model=StatusResponse)
def status() -> StatusResponse:
raw_states = app.state.sync_service.list_workspace_states()
items: list[WorkspaceStatusItem] = []
for row in raw_states:
synced_at_raw = row.get("synced_at")
if not synced_at_raw:
continue
items.append(
WorkspaceStatusItem(
branch=row["branch"],
workspace=row["workspace"],
commit_hash=row["commit_hash"],
synced_at=datetime.fromisoformat(synced_at_raw),
qmd_collection=row["qmd_collection"],
)
)
return StatusResponse(
ok=True,
default_branch=app.state.settings.default_branch,
mirror_path=str(app.state.settings.git_mirror_path),
workspaces=items,
)
return app
app = create_app()