230 lines
10 KiB
Python
Executable File
230 lines
10 KiB
Python
Executable File
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from modeller import *
|
||
from modeller.automodel import *
|
||
import time
|
||
from typing import List, Tuple
|
||
import sys
|
||
import glob
|
||
import pyfastx
|
||
from Bio import SeqIO
|
||
from Bio.Seq import Seq
|
||
from Bio.SeqRecord import SeqRecord
|
||
|
||
@dataclass
|
||
class PDBModeler:
|
||
structure_file: Path
|
||
fasta_file: Path
|
||
outdir: Path
|
||
chain: str
|
||
num_loop: int = 2
|
||
md_level: str = 'refine.fast' # refine.very_fast or refine.slow optional
|
||
|
||
def __post_init__(self):
|
||
self.structure = self.structure_file.stem
|
||
self.ali_file = self.fasta_to_ali()
|
||
|
||
@staticmethod
|
||
def find_non_dash_indices(seq):
|
||
start = next((i for i, c in enumerate(seq) if c != '-'), None)
|
||
end = next((i for i, c in enumerate(reversed(seq)) if c != '-'), None)
|
||
if start is not None and end is not None:
|
||
end = len(seq) - end
|
||
return start, end
|
||
|
||
@staticmethod
|
||
def align_sequences(file: Path) -> Path:
|
||
fx = pyfastx.Fasta(file.as_posix(), build_index=True)
|
||
assert len(fx) == 2
|
||
seqs = [seq for seq in fx]
|
||
|
||
# 确定哪条链需要裁剪
|
||
if seqs[0].seq.startswith('-') or seqs[0].seq.endswith('-'):
|
||
trim_index = 0
|
||
elif seqs[1].seq.startswith('-') or seqs[1].seq.endswith('-'):
|
||
trim_index = 1
|
||
else:
|
||
# 如果两条链都不需要裁剪,就直接返回原文件
|
||
return file
|
||
|
||
start, end = PDBModeler.find_non_dash_indices(seqs[trim_index].seq)
|
||
|
||
# 根据确定的裁剪位置裁剪两条链
|
||
trimmed_seqs = []
|
||
for seq in seqs:
|
||
trimmed_seq = seq.seq[start:end]
|
||
trimmed_seqs.append(SeqRecord(Seq(trimmed_seq), id=seq.name, description=""))
|
||
|
||
# 选择没有'-'的序列
|
||
selected_seq = None
|
||
for seq_record in trimmed_seqs:
|
||
if '-' not in seq_record.seq:
|
||
selected_seq = seq_record
|
||
break
|
||
|
||
assert selected_seq is not None, "no sequence without '-' found"
|
||
assert not selected_seq.seq.startswith('-') and not selected_seq.seq.endswith('-'), "selected sequence should not start or end with '-'"
|
||
|
||
# Write the selected sequence to a new FASTA file using Biopython
|
||
new_fasta_file = file.with_suffix('.selected.fasta')
|
||
with open(new_fasta_file, 'w') as output_handle:
|
||
SeqIO.write([selected_seq], output_handle, "fasta")
|
||
|
||
return new_fasta_file
|
||
|
||
def _make_model_env(self, ali_file: Path) -> Tuple[Path, Path]:
|
||
'''
|
||
ali_file: 需要和PDB文件中对齐的 ali文件,这里在类初始化的时候就将对应的fasta文件转化了ali文件格式(self.fasta_to_ali()),这里是对齐操作
|
||
'''
|
||
env = Environ()
|
||
mdl = Model(env, file=self.structure_file.as_posix(), model_segment=(f'FIRST:{self.chain}', f'LAST:{self.chain}'))
|
||
aln = Alignment(env)
|
||
aln.append_model(mdl, align_codes=self.structure, atom_files=self.structure_file.as_posix())
|
||
aln.append(file=ali_file.as_posix(), align_codes=self.structure)
|
||
aln.align2d()
|
||
new_ali_file = self.outdir / f'alignment_{self.chain}.ali'
|
||
aln.write(file=new_ali_file.as_posix(), alignment_format='PIR')
|
||
aln.write(file=(self.outdir / f'alignment_{self.chain}.pap').as_posix(), alignment_format='PAP')
|
||
fasta_ali_file = self.outdir / f'alignment_{self.chain}.fasta'
|
||
aln.write(file=fasta_ali_file.as_posix(), alignment_format='FASTA')
|
||
return fasta_ali_file, new_ali_file
|
||
|
||
def make_model(self): # 单模板建模
|
||
print("***************************************************")
|
||
print("md_level ====", self.md_level)
|
||
print("***************************************************")
|
||
|
||
time_start = time.time()
|
||
|
||
# env1 = Environ()
|
||
# mdl = Model(env1, file=self.structure_file.as_posix(), model_segment=(f'FIRST:{self.chain}', f'LAST:{self.chain}'))
|
||
# aln = Alignment(env1)
|
||
# aln.append_model(mdl, align_codes=f'{self.structure}_{self.chain}', atom_files=self.structure_file.as_posix())
|
||
# aln.append(file=self.ali_file.as_posix(), align_codes=self.fasta_file.stem)
|
||
# aln.align2d()
|
||
# aln.write(file=(self.outdir / 'alignment1.ali').as_posix(), alignment_format='PIR')
|
||
# aln.write(file=(self.outdir / 'alignment1.pap').as_posix(), alignment_format='PAP')
|
||
|
||
# save alignment in FASTA format
|
||
# aln.write(file=(self.outdir / 'alignment1.fasta').as_posix(), alignment_format='FASTA')
|
||
fasta_align1, align1 = self._make_model_env(self.ali_file) # 第一次对齐
|
||
slice_fasta = PDBModeler.align_sequences(fasta_align1) # 首尾部对齐剔除
|
||
fx = pyfastx.Fasta(slice_fasta.as_posix(), build_index=True)
|
||
if len(fx) == 2 and (slice_fasta == fasta_align1): # 针对首位都没有'-'的序列处理,直接使用'alignment1.fasta'进行同源建模
|
||
fix_ali_file = slice_fasta
|
||
elif len(fx) == 1:
|
||
slice_ali = self.outdir / 'alignment_slice.ali'
|
||
PDBModeler.write_ali(slice_ali, f'{fx[0].name}', fx[0].seq)
|
||
fasta_align2, align2 = self._make_model_env(slice_ali) # 第2次对齐
|
||
# env2 = Environ()
|
||
# mdl2 = Model(env2, file=self.structure_file.as_posix(), model_segment=(f'FIRST:{self.chain}', f'LAST:{self.chain}'))
|
||
# aln2 = Alignment(env2)
|
||
# aln2.append_model(mdl2, align_codes=f'{self.structure}_{self.chain}', atom_files=self.structure_file.as_posix())
|
||
# aln2.append(file=slice_ali.as_posix(), align_codes=self.fasta_file.stem)
|
||
# aln2.align2d()
|
||
# aln2.write(file=(self.outdir / 'alignment2.ali').as_posix(), alignment_format='PIR')
|
||
# aln2.write(file=(self.outdir / 'alignment2.pap').as_posix(), alignment_format='PAP')
|
||
# choose ali file
|
||
fix_ali_file = align2
|
||
else:
|
||
raise Exception(f"Unexpected FASTA file {slice_fasta.as_posix()} in errror number more than 2.")
|
||
|
||
env = Environ()
|
||
env.io.atom_files_directory = ['.']
|
||
loop_model = LoopModel(env,
|
||
alnfile=fix_ali_file.as_posix(),
|
||
knowns=self.structure,
|
||
sequence=self.fasta_file.stem,
|
||
loop_assess_methods=(assess.DOPE, assess.GA341))
|
||
# 设置循环模型数量
|
||
# 数量规则:(end - start) + 1
|
||
loop_model.loop.starting_model = 1
|
||
loop_model.loop.ending_model = self.num_loop
|
||
# 设置 MD 优化函数为 "refine.slow" 或 "refine.fast
|
||
if self.md_level.strip() == 'refine.slow':
|
||
loop_model.loop.md_level = refine.slow
|
||
elif self.md_level.strip() == 'refine.very_fast':
|
||
loop_model.loop.md_level = refine.very_fast
|
||
elif self.md_level.strip() == 'refine.fast':
|
||
loop_model.loop.md_level = refine.fast
|
||
|
||
# 调用 LoopModel 的 make 方法
|
||
loop_model.make()
|
||
end_time = time.time()
|
||
print(f"Time cost: {end_time - time_start}s")
|
||
|
||
# 获取所有成功生成的模型文件的路径
|
||
model_files = self.get_model_files(loop_model)
|
||
if model_files:
|
||
print(f"Model files: {[file.name for file in model_files]}")
|
||
else:
|
||
print("No model files found.")
|
||
|
||
return model_files
|
||
|
||
def find_pdb95_fsa_file(self) -> Path:
|
||
"""在 Conda 环境中查找 pdb95.fsa 文件的路径。"""
|
||
# 获取当前 Python 解释器的路径
|
||
python_executable_path = Path(sys.executable)
|
||
|
||
# 获取 Conda 环境的根目录
|
||
conda_env_root = python_executable_path.parent.parent
|
||
|
||
# 获取可能的 Modeller 目录
|
||
modeller_dirs = list(conda_env_root.glob("lib/modeller-*/examples/commands"))
|
||
|
||
# 选择最新版本的 Modeller 目录
|
||
modeller_dirs.sort(reverse=True)
|
||
if modeller_dirs:
|
||
latest_modeller_dir = modeller_dirs[0]
|
||
pdb95_fsa_path = latest_modeller_dir / "pdb95.fsa"
|
||
return pdb95_fsa_path
|
||
else:
|
||
raise FileNotFoundError("Modeller directory not found.")
|
||
|
||
def get_model_files(self, loop_model) -> List[Path]:
|
||
# 检查 loop_model.loop.outputs 列表,收集所有成功生成的模型文件
|
||
model_files = []
|
||
for output in loop_model.loop.outputs:
|
||
if output.get('failure') is None:
|
||
model_files.append(Path(output.get('name')))
|
||
return model_files
|
||
|
||
@staticmethod
|
||
def write_ali(ali_file: Path, description: str, sequence: str):
|
||
with open(ali_file, 'w') as f:
|
||
f.write(f'>P1;{description}\n')
|
||
f.write(f'sequence:{description}:::::::0.00: 0.00\n')
|
||
f.write(f'{sequence}*')
|
||
|
||
def fasta_to_ali(self) -> Path:
|
||
if not self.outdir.exists():
|
||
self.outdir.mkdir(parents=True, exist_ok=True)
|
||
|
||
ali_file = self.outdir / f'{self.fasta_file.stem}.ali'
|
||
if ali_file.exists():
|
||
ali_file.unlink()
|
||
|
||
fx = pyfastx.Fasta(self.fasta_file.as_posix(), build_index=True)
|
||
assert len(fx) == 1, "FASTA file should contain only one sequence"
|
||
PDBModeler.write_ali(ali_file, self.structure, fx[0].seq)
|
||
return ali_file
|
||
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
parser = argparse.ArgumentParser(description="Build model by Modeller")
|
||
parser.add_argument("-s", "--structure", help="Structure file")
|
||
parser.add_argument("-o", "--outdir", help="Output directory")
|
||
parser.add_argument("-f", "--fasta", help="Fasta file")
|
||
parser.add_argument("-n", "--num_loop", help="Number of loop model")
|
||
parser.add_argument("-m", "--md_level", help="MD level")
|
||
parser.add_argument("-c", "--chain", help="Chain ID")
|
||
args = parser.parse_args()
|
||
|
||
modeler = PDBModeler(Path(args.structure), Path(args.fasta), Path(args.outdir),
|
||
args.chain, int(args.num_loop), args.md_level)
|
||
modeler.make_model()
|
||
|
||
# test command
|
||
# python build_modellel.py -s ../5sws_fixer.pdb -o ./5swsmodellerfix -f ../rcsb_pdb_5SWS.fasta -n 1 -m refine.very_fast -c D
|
||
# python build_modeller.py -s 1j8h.pdb -o ./1j8hmodellerfix -f ./1j8h_D.fasta -n 1 -m refine.very_fast -c D |