from __future__ import annotations from dataclasses import dataclass from pathlib import Path import re import shlex import subprocess from urllib.parse import urlparse class GitCommandError(RuntimeError): def __init__(self, args: list[str], returncode: int, stdout: str, stderr: str) -> None: cmd = " ".join(shlex.quote(x) for x in args) message = f"git command failed ({returncode}): {cmd}\nstdout:\n{stdout}\nstderr:\n{stderr}" super().__init__(message) self.args_list = args self.returncode = returncode self.stdout = stdout self.stderr = stderr @dataclass(frozen=True) class SyncResult: previous_commit: str | None commit_hash: str changed: bool class GitManager: def __init__(self, remote_url: str, mirror_path: Path) -> None: self.remote_url = remote_url self.mirror_path = mirror_path def ensure_mirror(self) -> None: if not self.remote_url: raise ValueError("git_remote_url is required") self._allow_local_remote_if_needed() head_file = self.mirror_path / "HEAD" if not head_file.exists(): self.mirror_path.parent.mkdir(parents=True, exist_ok=True) if self.mirror_path.exists() and any(self.mirror_path.iterdir()): raise ValueError(f"mirror path exists and is not empty: {self.mirror_path}") self._run(["git", "clone", "--mirror", self.remote_url, str(self.mirror_path)]) else: self._run( [ "git", f"--git-dir={self.mirror_path}", "remote", "set-url", "origin", self.remote_url, ] ) def fetch_origin(self) -> None: self._run(["git", f"--git-dir={self.mirror_path}", "fetch", "--prune", "origin"]) def get_branch_commit(self, branch: str) -> str: refs_to_try = [ f"refs/heads/{branch}", f"refs/remotes/origin/{branch}", ] last_error: GitCommandError | None = None for ref in refs_to_try: try: return self._run( ["git", f"--git-dir={self.mirror_path}", "rev-parse", "--verify", ref] ).strip() except GitCommandError as exc: last_error = exc if last_error is None: raise RuntimeError("unexpected branch lookup failure without error") raise last_error def get_workspace_head(self, workspace_path: Path) -> str | None: if not (workspace_path / ".git").exists(): return None try: return self._run(["git", "-C", str(workspace_path), "rev-parse", "HEAD"]).strip() except GitCommandError: return None def sync_workspace(self, workspace_path: Path, branch: str, commit_hash: str) -> SyncResult: previous = self.get_workspace_head(workspace_path) self._ensure_workspace_repo(workspace_path) self._run(["git", "-C", str(workspace_path), "fetch", "--prune", "origin"]) local_branch = self._local_branch_name(branch) self._run( [ "git", "-C", str(workspace_path), "checkout", "-B", local_branch, commit_hash, ] ) self._run(["git", "-C", str(workspace_path), "reset", "--hard", commit_hash]) self._run(["git", "-C", str(workspace_path), "clean", "-fd"]) current = self.get_workspace_head(workspace_path) if not current: raise RuntimeError(f"failed to read workspace HEAD after sync: {workspace_path}") return SyncResult(previous_commit=previous, commit_hash=current, changed=(previous != current)) def _ensure_workspace_repo(self, workspace_path: Path) -> None: if not (workspace_path / ".git").exists(): workspace_path.parent.mkdir(parents=True, exist_ok=True) if workspace_path.exists() and any(workspace_path.iterdir()): raise ValueError(f"workspace exists and is not a git repo: {workspace_path}") self._run(["git", "clone", str(self.mirror_path), str(workspace_path)]) self._run( [ "git", "-C", str(workspace_path), "remote", "set-url", "origin", str(self.mirror_path), ] ) def _local_branch_name(self, branch: str) -> str: cleaned = re.sub(r"[^A-Za-z0-9_.-]+", "-", branch).strip("-") return f"gateway-{cleaned or 'default'}" def _run(self, args: list[str]) -> str: proc = subprocess.run(args, check=False, capture_output=True, text=True) if proc.returncode != 0: raise GitCommandError(args=args, returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr) return proc.stdout def _allow_local_remote_if_needed(self) -> None: candidate: str | None = None if self.remote_url.startswith("/"): candidate = self.remote_url elif self.remote_url.startswith("file://"): parsed = urlparse(self.remote_url) candidate = parsed.path if not candidate: return if not Path(candidate).exists(): return # Avoid git safety checks blocking local bind-mounted repositories. self._run(["git", "config", "--global", "--add", "safe.directory", candidate])