46 lines
1.6 KiB
Python
46 lines
1.6 KiB
Python
from pathlib import Path
|
|
from typing import List, Dict
|
|
import shutil
|
|
from fastapi import UploadFile
|
|
from ..core.config import settings
|
|
|
|
|
|
class WorkspaceManager:
|
|
def __init__(self) -> None:
|
|
self.base_path = Path(settings.WORKSPACE_BASE_PATH)
|
|
self.base_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
def create_workspace(self, job_id: str) -> Dict[str, Path]:
|
|
root = self.base_path / job_id
|
|
input_dir = root / "inputs"
|
|
logs_dir = root / "logs"
|
|
results_dir = root / "results"
|
|
for d in [root, input_dir, logs_dir, results_dir]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return {"root": root, "inputs": input_dir, "logs": logs_dir, "results": results_dir}
|
|
|
|
def save_input_files(self, job_id: str, files: List[UploadFile]) -> List[dict]:
|
|
ws = self.create_workspace(job_id)
|
|
saved: List[dict] = []
|
|
for f in files:
|
|
dst = ws["inputs"] / f.filename
|
|
with dst.open("wb") as out:
|
|
out.write(f.file.read())
|
|
saved.append({"filename": f.filename, "path": str(dst)})
|
|
return saved
|
|
|
|
def cleanup_workspace(self, job_id: str, keep_results: bool = False) -> None:
|
|
root = self.base_path / job_id
|
|
if not root.exists():
|
|
return
|
|
if keep_results:
|
|
# 仅清理 inputs 和 logs
|
|
for name in ["inputs", "logs"]:
|
|
p = root / name
|
|
if p.exists():
|
|
shutil.rmtree(p, ignore_errors=True)
|
|
else:
|
|
shutil.rmtree(root, ignore_errors=True)
|
|
|
|
|