Files
bttoxin-pipeline/backend/app/services/workspace_service.py
2025-10-13 21:05:00 +08:00

46 lines
1.6 KiB
Python

from pathlib import Path
from typing import List, Dict
import shutil
from fastapi import UploadFile
from ..core.config import settings
class WorkspaceManager:
def __init__(self) -> None:
self.base_path = Path(settings.WORKSPACE_BASE_PATH)
self.base_path.mkdir(parents=True, exist_ok=True)
def create_workspace(self, job_id: str) -> Dict[str, Path]:
root = self.base_path / job_id
input_dir = root / "inputs"
logs_dir = root / "logs"
results_dir = root / "results"
for d in [root, input_dir, logs_dir, results_dir]:
d.mkdir(parents=True, exist_ok=True)
return {"root": root, "inputs": input_dir, "logs": logs_dir, "results": results_dir}
def save_input_files(self, job_id: str, files: List[UploadFile]) -> List[dict]:
ws = self.create_workspace(job_id)
saved: List[dict] = []
for f in files:
dst = ws["inputs"] / f.filename
with dst.open("wb") as out:
out.write(f.file.read())
saved.append({"filename": f.filename, "path": str(dst)})
return saved
def cleanup_workspace(self, job_id: str, keep_results: bool = False) -> None:
root = self.base_path / job_id
if not root.exists():
return
if keep_results:
# 仅清理 inputs 和 logs
for name in ["inputs", "logs"]:
p = root / name
if p.exists():
shutil.rmtree(p, ignore_errors=True)
else:
shutil.rmtree(root, ignore_errors=True)