Files
bttoxin-pipeline/backend/app/api/v1/results.py
zly c75c85c53b Refactor: Unified pipeline execution, simplified UI, and fixed Docker config
- Backend: Refactored tasks.py to directly invoke run_single_fna_pipeline.py for consistency.
- Backend: Changed output format to ZIP and added auto-cleanup of intermediate files.
- Backend: Fixed language parameter passing in API and tasks.
- Frontend: Removed CRISPR Fusion UI elements from Submit and Monitor views.
- Frontend: Implemented simulated progress bar for better UX.
- Frontend: Restored One-click load button and added result file structure documentation.
- Docker: Fixed critical Restarting loop by removing incorrect image directive in docker-compose.yml.
- Docker: Optimized Dockerfile to correct .pixi environment path issues and prevent accidental deletion of frontend assets.
2026-01-20 20:25:25 +08:00

124 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""结果下载 API"""
from fastapi import APIRouter, HTTPException, Response, Depends
from fastapi.responses import FileResponse, JSONResponse
from sqlalchemy.orm import Session
from pathlib import Path
import tarfile
import io
import shutil
import json
from ...database import get_db
from ...models.job import Job, JobStatus
from ...config import settings
from ...core.i18n import I18n, get_i18n
router = APIRouter()
RESULTS_DIR = Path(settings.RESULTS_DIR)
@router.get("/{job_id}/download")
async def download_results(job_id: str, db: Session = Depends(get_db), i18n: I18n = Depends(get_i18n)):
"""下载任务结果(打包为 .tar.gz"""
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail=i18n.t("job_not_found"))
if job.status != JobStatus.COMPLETED:
raise HTTPException(
status_code=400,
detail=i18n.t("job_not_completed", status=job.status)
)
job_output_dir = RESULTS_DIR / job_id
if not job_output_dir.exists():
raise HTTPException(status_code=404, detail=i18n.t("results_not_found"))
# 创建 tar.gz 文件到内存
tar_buffer = io.BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w:gz") as tar:
for file_path in job_output_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(job_output_dir)
tar.add(file_path, arcname=arcname)
tar_buffer.seek(0)
return Response(
content=tar_buffer.read(),
media_type="application/gzip",
headers={
"Content-Disposition": f"attachment; filename=bttoxin_{job_id}.tar.gz"
}
)
@router.get("/{job_id}/crispr")
async def get_crispr_results(job_id: str, db: Session = Depends(get_db), i18n: I18n = Depends(get_i18n)):
"""获取 CRISPR 分析结果"""
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail=i18n.t("job_not_found"))
if not job.crispr_fusion:
# 即使没启用如果用户请求了我们也可以返回一个提示或者空数据但400可能更合适
# 或者仅仅返回空对象
return {"status": "disabled", "message": "CRISPR analysis disabled"}
job_output_dir = RESULTS_DIR / job_id
# 优先返回融合分析结果
fusion_file = job_output_dir / "crispr" / "fusion_analysis.json"
if fusion_file.exists():
with open(fusion_file) as f:
return json.load(f)
# 其次返回检测结果
detect_file = job_output_dir / "crispr" / "results.json"
if detect_file.exists():
with open(detect_file) as f:
return json.load(f)
# 如果任务已完成但文件不存在
if job.status == JobStatus.COMPLETED:
return {"status": "empty", "message": "No CRISPR elements detected"}
return {"status": "pending", "message": "Analysis in progress"}
@router.delete("/{job_id}")
async def delete_job(job_id: str, db: Session = Depends(get_db), i18n: I18n = Depends(get_i18n)):
"""删除任务及其结果"""
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail=i18n.t("job_not_found"))
# 如果任务正在运行或排队,尝试取消 Celery 任务
if job.status in [JobStatus.PENDING, JobStatus.QUEUED, JobStatus.RUNNING] and job.celery_task_id:
try:
from ...core.celery_app import celery_app
celery_app.control.revoke(job.celery_task_id, terminate=True)
# 标记为已取消 (虽然后面马上删除了,但为了逻辑完整性)
job.status = JobStatus.FAILED
job.error_message = "Task cancelled by user"
db.commit()
except Exception as e:
print(f"Failed to revoke task {job.celery_task_id}: {e}")
# 删除磁盘上的文件
job_input_dir = Path(settings.UPLOAD_DIR) / job_id
job_output_dir = RESULTS_DIR / job_id
if job_input_dir.exists():
shutil.rmtree(job_input_dir)
if job_output_dir.exists():
shutil.rmtree(job_output_dir)
# 删除数据库记录
db.delete(job)
db.commit()
return {"message": i18n.t("job_deleted", job_id=job_id)}