Files
bttoxin-pipeline/backend/app/api/v1/results.py

77 lines
2.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""结果下载 API"""
from fastapi import APIRouter, HTTPException, Response
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from pathlib import Path
import tarfile
import io
import shutil
from ...database import get_db
from ...models.job import Job, JobStatus
from ...config import settings
router = APIRouter()
RESULTS_DIR = Path(settings.RESULTS_DIR)
@router.get("/{job_id}/download")
async def download_results(job_id: str, db: Session = Depends(get_db)):
"""下载任务结果(打包为 .tar.gz"""
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job.status != JobStatus.COMPLETED:
raise HTTPException(
status_code=400,
detail=f"Job not completed. Current status: {job.status}"
)
job_output_dir = RESULTS_DIR / job_id
if not job_output_dir.exists():
raise HTTPException(status_code=404, detail="Results not found on disk")
# 创建 tar.gz 文件到内存
tar_buffer = io.BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w:gz") as tar:
for file_path in job_output_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(job_output_dir)
tar.add(file_path, arcname=arcname)
tar_buffer.seek(0)
return Response(
content=tar_buffer.read(),
media_type="application/gzip",
headers={
"Content-Disposition": f"attachment; filename=bttoxin_{job_id}.tar.gz"
}
)
@router.delete("/{job_id}")
async def delete_job(job_id: str, db: Session = Depends(get_db)):
"""删除任务及其结果"""
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
# 删除磁盘上的文件
job_input_dir = Path(settings.UPLOAD_DIR) / job_id
job_output_dir = RESULTS_DIR / job_id
if job_input_dir.exists():
shutil.rmtree(job_input_dir)
if job_output_dir.exists():
shutil.rmtree(job_output_dir)
# 删除数据库记录
db.delete(job)
db.commit()
return {"message": f"Job {job_id} deleted successfully"}