from __future__ import annotations from dataclasses import asdict, dataclass from datetime import datetime, timezone import json from pathlib import Path import threading from typing import Any from .config import Settings from .git_manager import GitCommandError, GitManager from .qmd_client import QMDClient, QMDCommandError from .workspace_manager import ResolvedWorkspace, WorkspaceManager @dataclass(frozen=True) class SyncMetadata: branch: str workspace_name: str workspace_path: Path workspace_relative_path: str qmd_collection: str qmd_index: str commit_hash: str previous_commit: str | None changed: bool synced_at: datetime created_collection: bool update_ran: bool embed_ran: bool embed_error: str | None class SyncService: def __init__( self, settings: Settings, workspace_manager: WorkspaceManager, git_manager: GitManager, qmd_client: QMDClient, ) -> None: self.settings = settings self.workspace_manager = workspace_manager self.git_manager = git_manager self.qmd_client = qmd_client self._mirror_lock = threading.Lock() self.settings.workspace_state_dir.mkdir(parents=True, exist_ok=True) def sync_for_query( self, *, branch: str | None, memory_profile: str | None, require_latest: bool, ) -> SyncMetadata: resolved = self.workspace_manager.resolve_workspace(branch=branch, memory_profile=memory_profile) return self._sync_resolved_workspace(resolved=resolved, require_latest=require_latest) def sync_explicit( self, *, branch: str | None, memory_profile: str | None, require_latest: bool, ) -> SyncMetadata: resolved = self.workspace_manager.resolve_workspace(branch=branch, memory_profile=memory_profile) return self._sync_resolved_workspace(resolved=resolved, require_latest=require_latest) def list_workspace_states(self) -> list[dict[str, Any]]: results: list[dict[str, Any]] = [] for path in sorted(self.settings.workspace_state_dir.glob("*.json")): try: payload = json.loads(path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): continue results.append(payload) return results def _sync_resolved_workspace(self, *, resolved: ResolvedWorkspace, require_latest: bool) -> SyncMetadata: with self._mirror_lock: self.git_manager.ensure_mirror() if require_latest: self.git_manager.fetch_origin() commit_hash = self._resolve_branch_commit(resolved.branch, force_fetch=not require_latest) git_sync = self.git_manager.sync_workspace( workspace_path=resolved.workspace_path, branch=resolved.branch, commit_hash=commit_hash, ) created_collection = self.qmd_client.ensure_collection( index_name=resolved.qmd_index, collection_name=resolved.qmd_collection, workspace_path=resolved.workspace_path, ) update_ran = require_latest and self.settings.qmd_update_on_latest_query if update_ran: self.qmd_client.update_workspace(index_name=resolved.qmd_index) should_embed = self.settings.qmd_embed_on_change and (git_sync.changed or created_collection) embed_ran = False embed_error: str | None = None if update_ran: try: embed_ran = self.qmd_client.embed_workspace_if_needed( index_name=resolved.qmd_index, should_embed=should_embed, ) except QMDCommandError as exc: embed_ran = False embed_error = str(exc) synced_at = datetime.now(timezone.utc) metadata = SyncMetadata( branch=resolved.branch, workspace_name=resolved.workspace_name, workspace_path=resolved.workspace_path, workspace_relative_path=self._to_project_relative_path(resolved.workspace_path), qmd_collection=resolved.qmd_collection, qmd_index=resolved.qmd_index, commit_hash=git_sync.commit_hash, previous_commit=git_sync.previous_commit, changed=git_sync.changed, synced_at=synced_at, created_collection=created_collection, update_ran=update_ran, embed_ran=embed_ran, embed_error=embed_error, ) self._write_state(metadata) return metadata def _resolve_branch_commit(self, branch: str, force_fetch: bool) -> str: try: return self.git_manager.get_branch_commit(branch) except GitCommandError: if not force_fetch: raise self.git_manager.fetch_origin() return self.git_manager.get_branch_commit(branch) def _write_state(self, metadata: SyncMetadata) -> None: path = self.settings.workspace_state_dir / f"{metadata.workspace_name}.json" payload = { "branch": metadata.branch, "workspace": metadata.workspace_name, "workspace_path": str(metadata.workspace_path), "workspace_relative_path": metadata.workspace_relative_path, "qmd_collection": metadata.qmd_collection, "qmd_index": metadata.qmd_index, "commit_hash": metadata.commit_hash, "previous_commit": metadata.previous_commit, "changed": metadata.changed, "created_collection": metadata.created_collection, "update_ran": metadata.update_ran, "embed_ran": metadata.embed_ran, "embed_error": metadata.embed_error, "synced_at": metadata.synced_at.isoformat(), } path.write_text(json.dumps(payload, ensure_ascii=True, indent=2), encoding="utf-8") def _to_project_relative_path(self, path: Path) -> str: try: rel = path.relative_to(self.settings.container_data_root) return str(Path(self.settings.project_data_relative_root) / rel) except ValueError: return str(path)