from __future__ import annotations from contextlib import contextmanager from dataclasses import dataclass import threading import time @dataclass class LockStats: key: str wait_ms: int class WorkspaceLockManager: def __init__(self) -> None: self._locks: dict[str, threading.Lock] = {} self._guard = threading.Lock() def _get_lock(self, key: str) -> threading.Lock: with self._guard: lock = self._locks.get(key) if lock is None: lock = threading.Lock() self._locks[key] = lock return lock @contextmanager def acquire(self, key: str): lock = self._get_lock(key) started = time.perf_counter() lock.acquire() wait_ms = int((time.perf_counter() - started) * 1000) try: yield LockStats(key=key, wait_ms=wait_ms) finally: lock.release()