154 lines
5.3 KiB
Python
154 lines
5.3 KiB
Python
"""任务相关的 Pydantic Schema"""
|
||
from typing import Optional, List
|
||
from pydantic import BaseModel, Field, field_validator, model_validator
|
||
from enum import Enum
|
||
|
||
|
||
class SequenceType(str, Enum):
|
||
NUCL = "nucl"
|
||
PROT = "prot"
|
||
ORFS = "orfs"
|
||
READS = "reads"
|
||
|
||
|
||
class PlatformType(str, Enum):
|
||
ILLUMINA = "illumina"
|
||
PACBIO = "pacbio"
|
||
OXFORD = "oxford"
|
||
HYBRID = "hybrid"
|
||
|
||
|
||
class JobCreateRequest(BaseModel):
|
||
"""创建任务请求(包含各序列类型的参数)"""
|
||
|
||
# 基本信息
|
||
name: str = Field(..., min_length=1, max_length=255, description="任务名称")
|
||
description: Optional[str] = Field(None, max_length=1000, description="任务描述")
|
||
|
||
# 序列类型
|
||
sequence_type: SequenceType = Field(default=SequenceType.NUCL, description="输入序列类型")
|
||
|
||
# nucl
|
||
scaf_suffix: Optional[str] = Field(
|
||
None, pattern=r"^\.\w+$", description="基因组文件后缀(nucl)", examples=[".fna", ".fasta", ".fa"]
|
||
)
|
||
|
||
# orfs
|
||
orfs_suffix: Optional[str] = Field(None, pattern=r"^\.\w+$", description="ORF 文件后缀(orfs)")
|
||
|
||
# prot
|
||
prot_suffix: Optional[str] = Field(None, pattern=r"^\.\w+$", description="蛋白文件后缀(prot)")
|
||
|
||
# reads
|
||
platform: Optional[PlatformType] = Field(None, description="测序平台(reads)")
|
||
reads1_suffix: Optional[str] = Field(None, description="Reads1 后缀(illumina/hybrid)")
|
||
reads2_suffix: Optional[str] = Field(None, description="Reads2 后缀(illumina/hybrid)")
|
||
genome_size: Optional[str] = Field(
|
||
None, pattern=r"^\d+(\.\d+)?[mMgG]?$", description="基因组大小估计(pacbio/oxford)"
|
||
)
|
||
suffix_len: Optional[int] = Field(None, ge=0, description="reads 文件后缀长度")
|
||
|
||
# hybrid 需要完整文件名
|
||
short1: Optional[str] = Field(None, description="短 reads 1 文件名(完整文件名)")
|
||
short2: Optional[str] = Field(None, description="短 reads 2 文件名(完整文件名)")
|
||
long: Optional[str] = Field(None, description="长 reads 文件名(完整文件名)")
|
||
|
||
# 执行参数
|
||
threads: int = Field(default=4, ge=1, le=32, description="线程数")
|
||
update_db: bool = Field(default=False, description="是否更新数据库")
|
||
assemble_only: bool = Field(default=False, description="仅执行组装")
|
||
|
||
@field_validator("scaf_suffix", "orfs_suffix", "prot_suffix")
|
||
@classmethod
|
||
def validate_suffix(cls, v: Optional[str]) -> Optional[str]:
|
||
if v is not None and not v.startswith("."):
|
||
raise ValueError("文件后缀必须以 . 开头")
|
||
return v
|
||
|
||
@model_validator(mode="after")
|
||
def validate_by_type(self):
|
||
if self.sequence_type == SequenceType.NUCL:
|
||
if not self.scaf_suffix:
|
||
self.scaf_suffix = ".fna"
|
||
elif self.sequence_type == SequenceType.ORFS:
|
||
if not self.orfs_suffix:
|
||
self.orfs_suffix = ".ffn"
|
||
elif self.sequence_type == SequenceType.PROT:
|
||
if not self.prot_suffix:
|
||
self.prot_suffix = ".faa"
|
||
elif self.sequence_type == SequenceType.READS:
|
||
if not self.platform:
|
||
raise ValueError("reads 类型必须指定 platform")
|
||
if self.platform == PlatformType.ILLUMINA:
|
||
if not self.reads1_suffix or not self.reads2_suffix:
|
||
raise ValueError("illumina 平台必须指定 reads1_suffix 和 reads2_suffix")
|
||
elif self.platform in [PlatformType.PACBIO, PlatformType.OXFORD]:
|
||
if not self.reads1_suffix:
|
||
raise ValueError(f"{self.platform} 平台必须指定 reads1_suffix")
|
||
if not self.genome_size:
|
||
raise ValueError(f"{self.platform} 平台必须指定 genome_size")
|
||
elif self.platform == PlatformType.HYBRID:
|
||
if not all([self.short1, self.short2, self.long]):
|
||
raise ValueError("hybrid 平台必须指定 short1, short2, long")
|
||
return self
|
||
|
||
|
||
class FileUploadInfo(BaseModel):
|
||
filename: str
|
||
size: int
|
||
content_type: Optional[str] = None
|
||
path: str
|
||
|
||
|
||
class JobCreateResponse(BaseModel):
|
||
job_id: str
|
||
message: str
|
||
uploaded_files: List[FileUploadInfo]
|
||
workspace_path: str
|
||
celery_task_id: Optional[str] = None
|
||
warnings: Optional[List[str]] = None
|
||
|
||
|
||
class JobStatusResponse(BaseModel):
|
||
job_id: str
|
||
name: str
|
||
status: str
|
||
progress: int
|
||
current_step: Optional[str] = None
|
||
error_message: Optional[str] = None
|
||
created_at: str
|
||
started_at: Optional[str] = None
|
||
completed_at: Optional[str] = None
|
||
|
||
"""任务 Schema"""
|
||
from pydantic import BaseModel
|
||
from typing import Optional, List
|
||
from datetime import datetime
|
||
from enum import Enum
|
||
|
||
class JobStatus(str, Enum):
|
||
PENDING = "pending"
|
||
RUNNING = "running"
|
||
COMPLETED = "completed"
|
||
FAILED = "failed"
|
||
|
||
class JobCreate(BaseModel):
|
||
input_files: List[str]
|
||
sequence_type: str = "nucl"
|
||
scaf_suffix: str = ".fna"
|
||
threads: int = 4
|
||
|
||
class JobResponse(BaseModel):
|
||
id: str
|
||
status: JobStatus
|
||
input_files: List[str]
|
||
sequence_type: str
|
||
threads: int
|
||
result_url: Optional[str] = None
|
||
error_message: Optional[str] = None
|
||
created_at: datetime
|
||
completed_at: Optional[datetime] = None
|
||
|
||
class Config:
|
||
from_attributes = True
|