Files
bttoxin-pipeline/backend/app/schemas/job.py
2025-10-13 21:05:00 +08:00

154 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""任务相关的 Pydantic Schema"""
from typing import Optional, List
from pydantic import BaseModel, Field, field_validator, model_validator
from enum import Enum
class SequenceType(str, Enum):
NUCL = "nucl"
PROT = "prot"
ORFS = "orfs"
READS = "reads"
class PlatformType(str, Enum):
ILLUMINA = "illumina"
PACBIO = "pacbio"
OXFORD = "oxford"
HYBRID = "hybrid"
class JobCreateRequest(BaseModel):
"""创建任务请求(包含各序列类型的参数)"""
# 基本信息
name: str = Field(..., min_length=1, max_length=255, description="任务名称")
description: Optional[str] = Field(None, max_length=1000, description="任务描述")
# 序列类型
sequence_type: SequenceType = Field(default=SequenceType.NUCL, description="输入序列类型")
# nucl
scaf_suffix: Optional[str] = Field(
None, pattern=r"^\.\w+$", description="基因组文件后缀nucl", examples=[".fna", ".fasta", ".fa"]
)
# orfs
orfs_suffix: Optional[str] = Field(None, pattern=r"^\.\w+$", description="ORF 文件后缀orfs")
# prot
prot_suffix: Optional[str] = Field(None, pattern=r"^\.\w+$", description="蛋白文件后缀prot")
# reads
platform: Optional[PlatformType] = Field(None, description="测序平台reads")
reads1_suffix: Optional[str] = Field(None, description="Reads1 后缀illumina/hybrid")
reads2_suffix: Optional[str] = Field(None, description="Reads2 后缀illumina/hybrid")
genome_size: Optional[str] = Field(
None, pattern=r"^\d+(\.\d+)?[mMgG]?$", description="基因组大小估计pacbio/oxford"
)
suffix_len: Optional[int] = Field(None, ge=0, description="reads 文件后缀长度")
# hybrid 需要完整文件名
short1: Optional[str] = Field(None, description="短 reads 1 文件名(完整文件名)")
short2: Optional[str] = Field(None, description="短 reads 2 文件名(完整文件名)")
long: Optional[str] = Field(None, description="长 reads 文件名(完整文件名)")
# 执行参数
threads: int = Field(default=4, ge=1, le=32, description="线程数")
update_db: bool = Field(default=False, description="是否更新数据库")
assemble_only: bool = Field(default=False, description="仅执行组装")
@field_validator("scaf_suffix", "orfs_suffix", "prot_suffix")
@classmethod
def validate_suffix(cls, v: Optional[str]) -> Optional[str]:
if v is not None and not v.startswith("."):
raise ValueError("文件后缀必须以 . 开头")
return v
@model_validator(mode="after")
def validate_by_type(self):
if self.sequence_type == SequenceType.NUCL:
if not self.scaf_suffix:
self.scaf_suffix = ".fna"
elif self.sequence_type == SequenceType.ORFS:
if not self.orfs_suffix:
self.orfs_suffix = ".ffn"
elif self.sequence_type == SequenceType.PROT:
if not self.prot_suffix:
self.prot_suffix = ".faa"
elif self.sequence_type == SequenceType.READS:
if not self.platform:
raise ValueError("reads 类型必须指定 platform")
if self.platform == PlatformType.ILLUMINA:
if not self.reads1_suffix or not self.reads2_suffix:
raise ValueError("illumina 平台必须指定 reads1_suffix 和 reads2_suffix")
elif self.platform in [PlatformType.PACBIO, PlatformType.OXFORD]:
if not self.reads1_suffix:
raise ValueError(f"{self.platform} 平台必须指定 reads1_suffix")
if not self.genome_size:
raise ValueError(f"{self.platform} 平台必须指定 genome_size")
elif self.platform == PlatformType.HYBRID:
if not all([self.short1, self.short2, self.long]):
raise ValueError("hybrid 平台必须指定 short1, short2, long")
return self
class FileUploadInfo(BaseModel):
filename: str
size: int
content_type: Optional[str] = None
path: str
class JobCreateResponse(BaseModel):
job_id: str
message: str
uploaded_files: List[FileUploadInfo]
workspace_path: str
celery_task_id: Optional[str] = None
warnings: Optional[List[str]] = None
class JobStatusResponse(BaseModel):
job_id: str
name: str
status: str
progress: int
current_step: Optional[str] = None
error_message: Optional[str] = None
created_at: str
started_at: Optional[str] = None
completed_at: Optional[str] = None
"""任务 Schema"""
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime
from enum import Enum
class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class JobCreate(BaseModel):
input_files: List[str]
sequence_type: str = "nucl"
scaf_suffix: str = ".fna"
threads: int = 4
class JobResponse(BaseModel):
id: str
status: JobStatus
input_files: List[str]
sequence_type: str
threads: int
result_url: Optional[str] = None
error_message: Optional[str] = None
created_at: datetime
completed_at: Optional[datetime] = None
class Config:
from_attributes = True