165 lines
6.5 KiB
Python
165 lines
6.5 KiB
Python
#!/usr/bin/env python
|
||
# -*- encoding: utf-8 -*-
|
||
'''
|
||
@file :tcr_pmhc_complexes.py
|
||
@Description: : 用于fasta文件构建数据集
|
||
@Date :2024/01/17 10:24:08
|
||
@Author :lyzeng
|
||
@Email :pylyzeng@gmail.com
|
||
@version :1.0
|
||
'''
|
||
from Bio.Data import IUPACData
|
||
from pydantic import BaseModel, Field, FilePath, field_validator
|
||
from typing import Optional, Dict, List, Any, Union
|
||
from pathlib import Path
|
||
from sequence_base import ProteinSequence, BaseProteinSequence
|
||
# 使用 BioPython 导入氨基酸缩写
|
||
AMINO_ACIDS = set(IUPACData.protein_letters)
|
||
|
||
class ProteinComplex(BaseModel):
|
||
pdb_id: str
|
||
tcr_alpha: Optional[ProteinSequence] = None
|
||
tcr_beta: Optional[ProteinSequence] = None
|
||
mhc_alpha: Optional[ProteinSequence] = None
|
||
mhc_beta: Optional[ProteinSequence] = None
|
||
peptide: Optional[ProteinSequence] = None
|
||
mhc_class: str # MHC 类型:1, 2, 或 '未知'
|
||
data_source: str # 数据来源,例如论文的 DOI 号
|
||
extra_data: Dict[str, Any] = {}
|
||
|
||
def add_extra_data(self, **kwargs):
|
||
self.extra_data.update(kwargs)
|
||
|
||
class FastaHeaderInfo(BaseModel):
|
||
pdb_id: Optional[str] = None
|
||
chain_ids: List[str] = []
|
||
auth_chain_ids: Dict[str, str] = {}
|
||
description: str = ''
|
||
is_polymeric: Optional[str] = None # 新增属性
|
||
|
||
@classmethod
|
||
def from_header_line(cls, header_line: str):
|
||
if '|' in header_line:
|
||
parts = header_line.split('|')
|
||
pdb_id = parts[0].strip('>')
|
||
chain_info = parts[1] if len(parts) > 1 else ''
|
||
description = parts[2] if len(parts) > 2 else ''
|
||
|
||
chain_parts = chain_info.replace('Chain ', '').replace('Chains ', '').split(',')
|
||
chain_ids = []
|
||
auth_chain_ids = {}
|
||
|
||
for part in chain_parts:
|
||
part = part.strip()
|
||
if '[' in part:
|
||
chain_id, auth_chain_id = part.split('[')
|
||
chain_id = chain_id.strip()
|
||
auth_chain_id = auth_chain_id.strip(']').replace('auth ', '').strip()
|
||
chain_ids.append(chain_id)
|
||
auth_chain_ids[chain_id] = auth_chain_id
|
||
else:
|
||
chain_ids.append(part)
|
||
auth_chain_ids[part] = part
|
||
|
||
is_polymeric = "Yes" if len(chain_ids) > 1 else "No"
|
||
else:
|
||
pdb_id = None
|
||
chain_ids = []
|
||
auth_chain_ids = {}
|
||
description = header_line[1:].strip() # 去除开头的 '>' 字符
|
||
is_polymeric = "Unknown" # 没有链信息时,多聚体状态未知
|
||
|
||
return cls(pdb_id=pdb_id, chain_ids=chain_ids, auth_chain_ids=auth_chain_ids, description=description, is_polymeric=is_polymeric)
|
||
|
||
# 对于标准fasta的兼容尝试处理,废弃
|
||
# class FastaHeaderInfo(BaseModel):
|
||
# info: Dict[str, Union[str, List[str], Dict[str, str]]] = {}
|
||
# is_polymeric: Optional[str] = None # 新增属性
|
||
|
||
# @classmethod
|
||
# def from_header_line(cls, header_line: str):
|
||
# header_info = {}
|
||
# if '|' in header_line and 'Chain' in header_line:
|
||
# # PDB FASTA格式处理
|
||
# parts = header_line.split('|')
|
||
# header_info['identifier'] = parts[0].strip('>')
|
||
# chain_info = parts[1] if len(parts) > 1 else ''
|
||
# header_info['description'] = parts[2] if len(parts) > 2 else ''
|
||
|
||
# chain_parts = chain_info.replace('Chain ', '').replace('Chains ', '').split(',')
|
||
# chain_ids = []
|
||
# auth_chain_ids = {}
|
||
|
||
# for part in chain_parts:
|
||
# part = part.strip()
|
||
# if '[' in part:
|
||
# chain_id, auth_chain_id = part.split('[')
|
||
# chain_id = chain_id.strip()
|
||
# auth_chain_id = auth_chain_id.strip(']').strip()
|
||
# chain_ids.append(chain_id)
|
||
# auth_chain_ids[chain_id] = auth_chain_id
|
||
# else:
|
||
# chain_ids.append(part)
|
||
# auth_chain_ids[part] = part
|
||
|
||
# header_info['chain_ids'] = chain_ids
|
||
# header_info['auth_chain_ids'] = auth_chain_ids
|
||
# header_info['is_polymeric'] = "Yes" if len(chain_ids) > 1 else "No"
|
||
# else:
|
||
# # 处理非PDB或非标准FASTA头信息
|
||
# identifier = header_line[1:].split()[0] # 取第一个空格前的文本作为标识符
|
||
# description = ' '.join(header_line[1:].split()[1:]) # 剩余的文本作为描述
|
||
# header_info['identifier'] = identifier
|
||
# header_info['description'] = description
|
||
# header_info['is_polymeric'] = "Unknown"
|
||
|
||
# return cls(info=header_info)
|
||
|
||
|
||
class FastaSequence(BaseModel):
|
||
header_info: FastaHeaderInfo
|
||
sequence: BaseProteinSequence
|
||
|
||
@property
|
||
def sequence_length(self) -> int:
|
||
# 注意这里 sequence.sequence 是因为 sequence 是 BaseProteinSequence 的实例
|
||
# 而 BaseProteinSequence 有一个名为 sequence 的字段
|
||
return len(self.sequence.sequence)
|
||
|
||
class FastaFile(BaseModel):
|
||
file: FilePath
|
||
sequences: List[FastaSequence] = []
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
self.read_sequence()
|
||
|
||
@property
|
||
def sequence_num(self) -> int:
|
||
return len(self.sequences)
|
||
|
||
def read_sequence(self):
|
||
with open(self.file, 'r') as fasta_file:
|
||
header_line = ''
|
||
sequence = ''
|
||
for line in fasta_file:
|
||
line = line.strip()
|
||
if line.startswith('>'):
|
||
if sequence:
|
||
# 创建 BaseProteinSequence 实例
|
||
base_sequence = BaseProteinSequence(sequence=sequence)
|
||
header_info = FastaHeaderInfo.from_header_line(header_line)
|
||
# 使用 BaseProteinSequence 实例而不是字符串
|
||
self.sequences.append(FastaSequence(header_info=header_info, sequence=base_sequence))
|
||
sequence = ''
|
||
header_line = line
|
||
else:
|
||
sequence += line
|
||
|
||
# 确保文件末尾的序列也被添加
|
||
if sequence:
|
||
base_sequence = BaseProteinSequence(sequence=sequence)
|
||
header_info = FastaHeaderInfo.from_header_line(header_line)
|
||
self.sequences.append(FastaSequence(header_info=header_info, sequence=base_sequence))
|
||
|