Initial commit: BtToxin Pipeline project structure

This commit is contained in:
2025-10-13 19:22:56 +08:00
commit c7744836e9
37 changed files with 1146 additions and 0 deletions

2
backend/app/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""BtToxin Pipeline Backend Application"""
__version__ = "1.0.0"

View File

View File

@@ -0,0 +1,98 @@
"""任务管理 API"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from sqlalchemy.orm import Session
from typing import List
from pathlib import Path
import uuid
import shutil
from ...database import get_db
from ...models.job import Job, JobStatus
from ...schemas.job import JobResponse
from ...workers.tasks import run_bttoxin_analysis
from ...config import settings
router = APIRouter()
UPLOAD_DIR = Path(settings.UPLOAD_DIR)
RESULTS_DIR = Path(settings.RESULTS_DIR)
UPLOAD_DIR.mkdir(exist_ok=True)
RESULTS_DIR.mkdir(exist_ok=True)
@router.post("/create", response_model=JobResponse)
async def create_job(
files: List[UploadFile] = File(...),
sequence_type: str = "nucl",
scaf_suffix: str = ".fna",
threads: int = 4,
db: Session = Depends(get_db)
):
"""创建新任务"""
job_id = str(uuid.uuid4())
job_input_dir = UPLOAD_DIR / job_id
job_output_dir = RESULTS_DIR / job_id
job_input_dir.mkdir(parents=True)
job_output_dir.mkdir(parents=True)
uploaded_files = []
for file in files:
file_path = job_input_dir / file.filename
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
uploaded_files.append(file.filename)
job = Job(
id=job_id,
status=JobStatus.PENDING,
input_files=uploaded_files,
sequence_type=sequence_type,
scaf_suffix=scaf_suffix,
threads=threads
)
db.add(job)
db.commit()
db.refresh(job)
task = run_bttoxin_analysis.delay(
job_id=job_id,
input_dir=str(job_input_dir),
output_dir=str(job_output_dir),
sequence_type=sequence_type,
scaf_suffix=scaf_suffix,
threads=threads
)
job.celery_task_id = task.id
db.commit()
return job
@router.get("/{job_id}", response_model=JobResponse)
async def get_job(job_id: str, db: Session = Depends(get_db)):
"""获取任务详情"""
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@router.get("/{job_id}/progress")
async def get_job_progress(job_id: str, db: Session = Depends(get_db)):
"""获取任务进度"""
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job.celery_task_id:
from ...core.celery_app import celery_app
task = celery_app.AsyncResult(job.celery_task_id)
return {
'job_id': job_id,
'status': job.status,
'celery_state': task.state,
'progress': task.info if task.state == 'PROGRESS' else None
}
return {'job_id': job_id, 'status': job.status}

View File

@@ -0,0 +1,8 @@
"""结果查询 API"""
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
async def results_info():
return {"message": "Results endpoint"}

View File

@@ -0,0 +1,8 @@
"""文件上传 API"""
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
async def upload_info():
return {"message": "Upload endpoint"}

47
backend/app/config.py Normal file
View File

@@ -0,0 +1,47 @@
"""应用配置"""
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""应用设置"""
# 应用基础配置
APP_NAME: str = "BtToxin Pipeline"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
# API 配置
API_V1_STR: str = "/api/v1"
# 数据库
DATABASE_URL: str = "postgresql://postgres:password@localhost:5432/bttoxin"
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
# S3/MinIO
S3_ENDPOINT: Optional[str] = None
S3_ACCESS_KEY: str = ""
S3_SECRET_KEY: str = ""
S3_BUCKET: str = "bttoxin-results"
S3_REGION: str = "us-east-1"
# Docker
DOCKER_IMAGE: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0"
# 文件路径
UPLOAD_DIR: str = "uploads"
RESULTS_DIR: str = "results"
# Celery
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
# CORS
CORS_ORIGINS: list = ["http://localhost:3000", "http://localhost:5173"]
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()

View File

@@ -0,0 +1,21 @@
"""Celery 配置"""
from celery import Celery
from ..config import settings
celery_app = Celery(
"bttoxin_worker",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=['app.workers.tasks']
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=7200,
worker_prefetch_multiplier=1,
)

View File

@@ -0,0 +1,72 @@
"""Docker 客户端管理"""
import docker
from typing import Dict, Any
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class DockerManager:
"""Docker 容器管理器"""
def __init__(self, image: str = None):
from ..config import settings
self.client = docker.from_env()
self.image = image or settings.DOCKER_IMAGE
def ensure_image(self) -> bool:
"""确保镜像存在"""
try:
self.client.images.get(self.image)
return True
except docker.errors.ImageNotFound:
logger.info(f"Pulling image {self.image}...")
self.client.images.pull(self.image)
return True
def run_bttoxin_digger(
self,
input_dir: Path,
output_dir: Path,
sequence_type: str = "nucl",
scaf_suffix: str = ".fna",
threads: int = 4
) -> Dict[str, Any]:
"""运行 BtToxin_Digger"""
self.ensure_image()
volumes = {
str(input_dir.absolute()): {'bind': '/data', 'mode': 'ro'},
str(output_dir.absolute()): {'bind': '/results', 'mode': 'rw'}
}
command = [
"/usr/local/env-execute", "BtToxin_Digger",
"--SeqPath", "/data",
"--SequenceType", sequence_type,
"--Scaf_suffix", scaf_suffix,
"--threads", str(threads)
]
try:
container = self.client.containers.run(
self.image,
command=command,
volumes=volumes,
platform="linux/amd64",
detach=True,
remove=False
)
result = container.wait()
logs = container.logs().decode('utf-8')
container.remove()
return {
'success': result['StatusCode'] == 0,
'logs': logs,
'exit_code': result['StatusCode']
}
except Exception as e:
logger.error(f"Error: {e}")
return {'success': False, 'error': str(e)}

24
backend/app/database.py Normal file
View File

@@ -0,0 +1,24 @@
"""数据库连接"""
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from .config import settings
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
echo=settings.DEBUG
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""获取数据库会话"""
db = SessionLocal()
try:
yield db
finally:
db.close()

49
backend/app/main.py Normal file
View File

@@ -0,0 +1,49 @@
"""FastAPI 主应用"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from .config import settings
from .api.v1 import jobs, upload, results
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时
print("🚀 Starting BtToxin Pipeline API...")
yield
# 关闭时
print("👋 Shutting down BtToxin Pipeline API...")
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
description="Automated Bacillus thuringiensis toxin mining pipeline",
lifespan=lifespan
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 路由
app.include_router(jobs.router, prefix=f"{settings.API_V1_STR}/jobs", tags=["jobs"])
app.include_router(upload.router, prefix=f"{settings.API_V1_STR}/upload", tags=["upload"])
app.include_router(results.router, prefix=f"{settings.API_V1_STR}/results", tags=["results"])
@app.get("/")
async def root():
return {
"name": settings.APP_NAME,
"version": settings.APP_VERSION,
"status": "healthy"
}
@app.get("/health")
async def health():
return {"status": "ok"}

32
backend/app/models/job.py Normal file
View File

@@ -0,0 +1,32 @@
"""任务模型"""
from sqlalchemy import Column, String, Integer, DateTime, JSON, Enum, Text
from sqlalchemy.sql import func
import enum
from ..database import Base
class JobStatus(str, enum.Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class Job(Base):
__tablename__ = "jobs"
id = Column(String, primary_key=True, index=True)
celery_task_id = Column(String, nullable=True)
status = Column(Enum(JobStatus), default=JobStatus.PENDING)
input_files = Column(JSON)
sequence_type = Column(String, default="nucl")
scaf_suffix = Column(String, default=".fna")
threads = Column(Integer, default=4)
result_url = Column(String, nullable=True)
logs = Column(Text, nullable=True)
error_message = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
completed_at = Column(DateTime(timezone=True), nullable=True)

View File

@@ -0,0 +1,31 @@
"""任务 Schema"""
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
from enum import Enum
class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class JobCreate(BaseModel):
input_files: List[str]
sequence_type: str = "nucl"
scaf_suffix: str = ".fna"
threads: int = 4
class JobResponse(BaseModel):
id: str
status: JobStatus
input_files: List[str]
sequence_type: str
threads: int
result_url: Optional[str] = None
error_message: Optional[str] = None
created_at: datetime
completed_at: Optional[datetime] = None
class Config:
from_attributes = True

View File

@@ -0,0 +1,64 @@
"""Celery 任务"""
from celery import Task
from pathlib import Path
import shutil
import logging
from ..core.celery_app import celery_app
from ..core.docker_client import DockerManager
from ..database import SessionLocal
from ..models.job import Job, JobStatus
logger = logging.getLogger(__name__)
@celery_app.task(bind=True)
def run_bttoxin_analysis(
self,
job_id: str,
input_dir: str,
output_dir: str,
sequence_type: str = "nucl",
scaf_suffix: str = ".fna",
threads: int = 4
):
"""执行分析任务"""
db = SessionLocal()
try:
job = db.query(Job).filter(Job.id == job_id).first()
job.status = JobStatus.RUNNING
db.commit()
self.update_state(
state='PROGRESS',
meta={'current': 20, 'total': 100, 'status': 'Running analysis...'}
)
docker_manager = DockerManager()
result = docker_manager.run_bttoxin_digger(
input_dir=Path(input_dir),
output_dir=Path(output_dir),
sequence_type=sequence_type,
scaf_suffix=scaf_suffix,
threads=threads
)
if result['success']:
job.status = JobStatus.COMPLETED
job.logs = result.get('logs', '')
else:
job.status = JobStatus.FAILED
job.error_message = result.get('error', 'Analysis failed')
db.commit()
return {'job_id': job_id, 'status': job.status}
except Exception as e:
logger.error(f"Task failed: {e}")
job.status = JobStatus.FAILED
job.error_message = str(e)
db.commit()
raise
finally:
db.close()

38
backend/requirements.txt Normal file
View File

@@ -0,0 +1,38 @@
# Web 框架
fastapi==0.115.5
uvicorn[standard]==0.32.1
python-multipart==0.0.20
# 任务队列
celery==5.4.0
redis==5.2.1
flower==2.0.1
# 容器管理
docker==7.1.0
# 数据库
sqlalchemy==2.0.36
alembic==1.14.0
psycopg2-binary==2.9.10
# 对象存储
boto3==1.35.78
minio==7.2.11
# 数据处理
biopython==1.84
pandas==2.2.3
# 工具
pydantic==2.10.4
pydantic-settings==2.6.1
python-dotenv==1.0.1
aiofiles==24.1.0
# 监控
prometheus-client==0.21.0
# 测试
pytest==8.3.4
httpx==0.28.1