Files
qmd-memory-gateway/gateway/app/git_manager.py

154 lines
5.5 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import re
import shlex
import subprocess
from urllib.parse import urlparse
class GitCommandError(RuntimeError):
def __init__(self, args: list[str], returncode: int, stdout: str, stderr: str) -> None:
cmd = " ".join(shlex.quote(x) for x in args)
message = f"git command failed ({returncode}): {cmd}\nstdout:\n{stdout}\nstderr:\n{stderr}"
super().__init__(message)
self.args_list = args
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
@dataclass(frozen=True)
class SyncResult:
previous_commit: str | None
commit_hash: str
changed: bool
class GitManager:
def __init__(self, remote_url: str, mirror_path: Path) -> None:
self.remote_url = remote_url
self.mirror_path = mirror_path
def ensure_mirror(self) -> None:
if not self.remote_url:
raise ValueError("git_remote_url is required")
self._allow_local_remote_if_needed()
head_file = self.mirror_path / "HEAD"
if not head_file.exists():
self.mirror_path.parent.mkdir(parents=True, exist_ok=True)
if self.mirror_path.exists() and any(self.mirror_path.iterdir()):
raise ValueError(f"mirror path exists and is not empty: {self.mirror_path}")
self._run(["git", "clone", "--mirror", self.remote_url, str(self.mirror_path)])
else:
self._run(
[
"git",
f"--git-dir={self.mirror_path}",
"remote",
"set-url",
"origin",
self.remote_url,
]
)
def fetch_origin(self) -> None:
self._run(["git", f"--git-dir={self.mirror_path}", "fetch", "--prune", "origin"])
def get_branch_commit(self, branch: str) -> str:
refs_to_try = [
f"refs/heads/{branch}",
f"refs/remotes/origin/{branch}",
]
last_error: GitCommandError | None = None
for ref in refs_to_try:
try:
return self._run(
["git", f"--git-dir={self.mirror_path}", "rev-parse", "--verify", ref]
).strip()
except GitCommandError as exc:
last_error = exc
if last_error is None:
raise RuntimeError("unexpected branch lookup failure without error")
raise last_error
def get_workspace_head(self, workspace_path: Path) -> str | None:
if not (workspace_path / ".git").exists():
return None
try:
return self._run(["git", "-C", str(workspace_path), "rev-parse", "HEAD"]).strip()
except GitCommandError:
return None
def sync_workspace(self, workspace_path: Path, branch: str, commit_hash: str) -> SyncResult:
previous = self.get_workspace_head(workspace_path)
self._ensure_workspace_repo(workspace_path)
self._run(["git", "-C", str(workspace_path), "fetch", "--prune", "origin"])
local_branch = self._local_branch_name(branch)
self._run(
[
"git",
"-C",
str(workspace_path),
"checkout",
"-B",
local_branch,
commit_hash,
]
)
self._run(["git", "-C", str(workspace_path), "reset", "--hard", commit_hash])
self._run(["git", "-C", str(workspace_path), "clean", "-fd"])
current = self.get_workspace_head(workspace_path)
if not current:
raise RuntimeError(f"failed to read workspace HEAD after sync: {workspace_path}")
return SyncResult(previous_commit=previous, commit_hash=current, changed=(previous != current))
def _ensure_workspace_repo(self, workspace_path: Path) -> None:
if not (workspace_path / ".git").exists():
workspace_path.parent.mkdir(parents=True, exist_ok=True)
if workspace_path.exists() and any(workspace_path.iterdir()):
raise ValueError(f"workspace exists and is not a git repo: {workspace_path}")
self._run(["git", "clone", str(self.mirror_path), str(workspace_path)])
self._run(
[
"git",
"-C",
str(workspace_path),
"remote",
"set-url",
"origin",
str(self.mirror_path),
]
)
def _local_branch_name(self, branch: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9_.-]+", "-", branch).strip("-")
return f"gateway-{cleaned or 'default'}"
def _run(self, args: list[str]) -> str:
proc = subprocess.run(args, check=False, capture_output=True, text=True)
if proc.returncode != 0:
raise GitCommandError(args=args, returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr)
return proc.stdout
def _allow_local_remote_if_needed(self) -> None:
candidate: str | None = None
if self.remote_url.startswith("/"):
candidate = self.remote_url
elif self.remote_url.startswith("file://"):
parsed = urlparse(self.remote_url)
candidate = parsed.path
if not candidate:
return
if not Path(candidate).exists():
return
# Avoid git safety checks blocking local bind-mounted repositories.
self._run(["git", "config", "--global", "--add", "safe.directory", candidate])