451 lines
20 KiB
Python
Executable File
451 lines
20 KiB
Python
Executable File
#!/usr/bin/env python
|
||
# -*- encoding: utf-8 -*-
|
||
'''
|
||
@file :analysis_pdb.py
|
||
@Description: : 分析PDB结构
|
||
@Date :2023/11/27 09:57:00
|
||
@Author :lyzeng
|
||
@Email :pylyzeng@gmail.com
|
||
@version :1.0
|
||
'''
|
||
# micromamba create -n modeller modeller biopython pymol-open-source biopandas requests -y -c conda-forge -c salilab
|
||
# modeller注册码:MODELIRANJE
|
||
from dataclasses import dataclass, field
|
||
from Bio.PDB import PDBParser
|
||
from Bio.SeqUtils import seq1
|
||
from typing import List, Dict, Tuple, Optional
|
||
from functools import reduce, partial
|
||
from Bio.PDB import MMCIFIO, PDBIO, Chain, Structure
|
||
from biopandas.pdb import PandasPdb
|
||
from pathlib import Path
|
||
from Bio.SeqRecord import SeqRecord
|
||
from Bio import SeqIO
|
||
import requests
|
||
from copy import deepcopy
|
||
from pymol import cmd
|
||
|
||
@dataclass
|
||
class PDBAnalyzer:
|
||
"""
|
||
PDBAnalyzer class for analyzing protein structures from PDB files.
|
||
|
||
Attributes:
|
||
pdb_file (Path): Path to the PDB file to be analyzed.
|
||
structure (object): The parsed PDB structure, initialized in __post_init__.
|
||
"""
|
||
pdb_file: Path
|
||
pid: Optional[str] = field(default=None, init=False)
|
||
structure: object = field(init=False)
|
||
biodf: PandasPdb = field(init=False)
|
||
protein_state: str = field(init=False) # Apo or Holo
|
||
chain_id_list: List[str] = field(init=False)
|
||
|
||
def __post_init__(self):
|
||
"""
|
||
Initialize the PDB structure after the object is created.
|
||
"""
|
||
parser = PDBParser(QUIET=True)
|
||
self.pid = self.pdb_file.stem.lower() if len(self.pdb_file.stem) == 4 else None
|
||
self.structure = parser.get_structure('PDB_structure', self.pdb_file.as_posix())
|
||
self.biodata = PandasPdb().read_pdb(self.pdb_file.as_posix())
|
||
self.protein_state = 'Holo' if 'HETATM' in self.biodata.df.keys() else 'Apo'
|
||
self.chain_id_list = self.biodata.df['ATOM']['chain_id'].drop_duplicates().to_list()
|
||
|
||
def check_continuity(self, chain, missing_char):
|
||
"""
|
||
Check the continuity of residues in a protein chain.
|
||
|
||
Args:
|
||
chain (Chain): A Chain object from Bio.PDB.
|
||
missing_char (str): Character to denote missing residues.
|
||
|
||
Returns:
|
||
List[Tuple[str, int]]: List of tuples containing residue names and their sequence numbers.
|
||
"""
|
||
def reducer(acc, residue):
|
||
# Accumulate residues, filling in gaps with the specified missing character
|
||
expected_resseq = acc[-1][1] + 1 if acc else residue.id[1]
|
||
while expected_resseq < residue.id[1]:
|
||
acc.append((missing_char, expected_resseq))
|
||
expected_resseq += 1
|
||
acc.append((residue.resname, residue.id[1]))
|
||
return acc
|
||
return reduce(reducer, chain, [])
|
||
|
||
def residue_to_single_letter(self, residue_name):
|
||
"""
|
||
Convert a three-letter residue name to a single-letter code.
|
||
|
||
Args:
|
||
residue_name (str): Three-letter residue name.
|
||
|
||
Returns:
|
||
str: Single-letter residue code or the original string if it's a special character.
|
||
"""
|
||
# Handle special characters separately
|
||
if residue_name in {'-', 'X', ''}:
|
||
return residue_name
|
||
# Convert standard residue names to single-letter codes
|
||
return seq1(residue_name)
|
||
|
||
def extract_sequences(self, missing_char='-') -> Dict[str, str]:
|
||
"""
|
||
Extract amino acid sequences from the structure, with gaps denoted by the specified character.
|
||
|
||
Args:
|
||
missing_char (str): Character to use for missing residues (default is '-').
|
||
|
||
Returns:
|
||
Dict[str, str]: Dictionary of chain IDs mapped to their amino acid sequences.
|
||
"""
|
||
# Create a continuity check function with the specified missing character
|
||
check_continuity_new = partial(self.check_continuity, missing_char=missing_char)
|
||
|
||
sequences = {}
|
||
# Process each chain in the structure
|
||
for model in self.structure:
|
||
chains = model.get_list()
|
||
for chain in chains:
|
||
# Check continuity and get the sequence of residues
|
||
chain_sequence = check_continuity_new(chain)
|
||
# Convert to single-letter sequence
|
||
single_letter_sequence = ''.join(self.residue_to_single_letter(res[0]) for res in chain_sequence)
|
||
sequences[chain.id] = single_letter_sequence
|
||
|
||
return sequences
|
||
|
||
def extract_sequences_info(self) -> Dict[str, List[int]]:
|
||
"""
|
||
Get missing residues information for each chain in the structure.
|
||
Utilizes map and filter for efficient processing.
|
||
|
||
Returns:
|
||
Dict[str, List[int]]: Dictionary mapping each chain ID to a list of missing residue numbers.
|
||
"""
|
||
def find_missing(chain):
|
||
observed = [residue.get_id()[1] for residue in chain if residue.id[0] == ' ']
|
||
if observed:
|
||
full_range = range(min(observed), max(observed) + 1)
|
||
return chain.get_id(), sorted(set(full_range) - set(observed))
|
||
return chain.get_id(), []
|
||
|
||
chains = [chain for model in self.structure for chain in model]
|
||
missing_info = map(find_missing, chains)
|
||
return dict(filter(lambda x: x[1], missing_info))
|
||
|
||
def extract_chain(self, chain_id: str):
|
||
"""
|
||
Extract a specific chain from the structure. use biopython
|
||
|
||
Args:
|
||
chain_id (str): ID of the chain to be extracted.
|
||
|
||
Returns:
|
||
object: An object containing the extracted chain with a save method.
|
||
"""
|
||
target_chain = next((chain for model in self.structure for chain in model if chain.id == chain_id), None)
|
||
if target_chain is None:
|
||
raise ValueError(f"Chain {chain_id} not found in the structure.")
|
||
|
||
def save_to_file(filename: str, file_format: str = 'pdb'):
|
||
"""
|
||
Save the extracted chain to a file in the specified format.
|
||
|
||
Args:
|
||
filename (str): Name of the file to save.
|
||
file_format (str): Format of the file ('pdb' or 'cif'). Default is 'pdb'.
|
||
"""
|
||
if file_format not in ['pdb', 'cif']:
|
||
raise ValueError("Unsupported file format. Use 'pdb' or 'cif'.")
|
||
|
||
if file_format == 'pdb':
|
||
io = PDBIO()
|
||
elif file_format == 'cif':
|
||
io = MMCIFIO()
|
||
|
||
io.set_structure(target_chain)
|
||
io.save(filename)
|
||
|
||
return type('ChainExtractor', (object,), {'save': save_to_file})
|
||
|
||
def get_missing_residues_as_loops(self) -> Dict[str, List[Tuple[int, int, int]]]:
|
||
"""
|
||
Get missing residues information formatted as loops for PyRosetta, along with chain identifiers.
|
||
|
||
Returns:
|
||
Dict[str, List[Tuple[int, int, int]]]: Dictionary mapping chain IDs to lists of tuples representing loop regions (start, end, cut).
|
||
"""
|
||
missing_residues = self.extract_sequences_info()
|
||
loops_by_chain = {}
|
||
for chain_id, missing_res_nums in missing_residues.items():
|
||
loops = []
|
||
for num in missing_res_nums:
|
||
# Define the loop region, assuming simple start and end points
|
||
loops.append((num - 1, num + 1, num)) # Example loop definition
|
||
loops_by_chain[chain_id] = loops
|
||
return loops_by_chain
|
||
|
||
def change_chain_identifier(self, old_chain_id: str, new_chain_id: str, split: bool = False) -> PandasPdb:
|
||
"""
|
||
Change the identifier of a specific chain. default use biopandas , not test yet.
|
||
|
||
Args:
|
||
old_chain_id (str): Original identifier of the chain.
|
||
new_chain_id (str): New identifier to be assigned to the chain.
|
||
return:
|
||
object: An new object,which not influce original self.biodata, containing the changed chain with a save method.
|
||
"""
|
||
biodata = self.split_chain(old_chain_id) if split else deepcopy(self.biodata) # get ATOM colum
|
||
biodata.df['ATOM']['chain_id'] = biodata.df['ATOM']['chain_id'].replace(old_chain_id.strip().upper(), new_chain_id.strip().upper())
|
||
return biodata # use to_pdb to save
|
||
|
||
def change_chain_identifier_biopython(self, old_chain_id: str, new_chain_id: str, split: bool = False) -> PDBIO:
|
||
io = PDBIO()
|
||
|
||
def change_id(chain):
|
||
if chain.id == old_chain_id:
|
||
chain.id = new_chain_id
|
||
return chain
|
||
|
||
if split:
|
||
# 使用 filter 和 map 创建一个只包含已更改的特定链的新结构
|
||
new_structure = Structure.Structure("modified_structure")
|
||
for model in self.structure:
|
||
filtered_chains = filter(lambda c: c.id == old_chain_id, model)
|
||
changed_chains = map(change_id, filtered_chains)
|
||
new_model = reduce(lambda m, c: m.add(c) or m, changed_chains, Structure.Model.Model(model.id))
|
||
new_structure.add(new_model)
|
||
io.set_structure(new_structure)
|
||
else:
|
||
# 直接在整个结构上更改ID
|
||
all_chains = (chain for model in self.structure for chain in model)
|
||
list(map(change_id, all_chains))
|
||
io.set_structure(self.structure)
|
||
|
||
return io
|
||
|
||
def split_chain(self, chain_id: str) -> PandasPdb:
|
||
biodata = deepcopy(self.biodata)
|
||
df = biodata.df['ATOM'] # get ATOM colum
|
||
tmp = PandasPdb() # create empty pdb file object
|
||
tmp.df['ATOM'] = df[df['chain_id'] == chain_id.strip().upper()]
|
||
return tmp # use to_pdb method save
|
||
|
||
def save_biodata(self, out_file: Path):
|
||
self.biodata.to_pdb(out_file.as_posix())
|
||
|
||
@staticmethod
|
||
def write_seq_to_fasta_single_line(seq_record: SeqRecord, output_file: Path):
|
||
with open(output_file, 'w') as output_handle:
|
||
output_handle.write(f">{seq_record.description}\n")
|
||
output_handle.write(str(seq_record.seq) + "\n")
|
||
|
||
def download_fasta(self, pdb_id: str = None):
|
||
if pdb_id == None:
|
||
pdb_id = self.pid
|
||
if pdb_id == None:
|
||
raise ValueError("PDB ID not found.")
|
||
url = f"https://www.rcsb.org/fasta/entry/{pdb_id.upper()}"
|
||
response = requests.get(url)
|
||
if response.status_code == 200:
|
||
output_file = self.pdb_file.parent / f"{pdb_id}.fasta"
|
||
with open(output_file, 'w') as file:
|
||
file.write(response.text)
|
||
return output_file
|
||
else:
|
||
raise Exception(f"Failed to download FASTA file for PDB ID {pdb_id}")
|
||
|
||
def filter_sequences(self, chain_id: str) -> List[SeqRecord]:
|
||
return list(filter(lambda x: f"Chain {chain_id}" in x.description, self.read_fasta()))
|
||
|
||
def read_fasta(self) -> SeqRecord:
|
||
fasta_file = self.pdb_file.parent / f"{self.pid}.fasta"
|
||
if not fasta_file.exists():
|
||
self.download_fasta()
|
||
return SeqIO.parse(fasta_file.as_posix(), "fasta")
|
||
|
||
@dataclass
|
||
class PDBAlign:
|
||
template_file: Path
|
||
target_file: Path
|
||
out_file: Path = field(default=None) # 输出文件路径
|
||
|
||
def align(self):
|
||
cmd.reinitialize()
|
||
# 首先,加载模板结构
|
||
cmd.load(self.template_file.as_posix(), "template")
|
||
|
||
# 加载并对齐所有目标结构
|
||
cmd.load(self.target_file.as_posix(), "target")
|
||
cmd.align("target", "template")
|
||
|
||
return cmd.get_pdbstr('target')
|
||
|
||
def main(PDB_ID, PDB_file_path):
|
||
# 示例
|
||
# PDB_ID = '5sws'
|
||
# PDB_file_path = Path(f'{PDB_ID}.pdb')
|
||
analyzer = PDBAnalyzer(PDB_file_path)
|
||
sequences = analyzer.extract_sequences(missing_char='-') # 或者 'X', 或者 ''
|
||
print(f'Residues info for {PDB_ID}: \n',sequences)
|
||
missing_info = analyzer.extract_sequences_info()
|
||
print(f'Missing residues info for {PDB_ID}:\n {missing_info}')
|
||
# 示例: 使用biopython提取A链(将会保留HETATM)
|
||
chain_extractor = analyzer.extract_chain('A') # 假设要提取的链ID是 'A'
|
||
chain_extractor.save('biopython_extracted_chain_A.pdb') # 保存为PDB文件
|
||
# 示例: 使用biopandas提取A链(将不会保留HETATM)
|
||
chain_extractor = analyzer.split_chain('A') # 假设要提取的链ID是 'A'
|
||
chain_extractor.to_pdb('biopandas_extracted_chain_A.pdb') # 保存为PDB文件
|
||
# A链改B链, 并分割保存为单独文件
|
||
analyzer.change_chain_identifier('A', 'B', split=True).to_pdb(f'{PDB_ID}_B.pdb')
|
||
# 分割所有的链
|
||
split_dict = {}
|
||
for j in analyzer.chain_id_list:
|
||
fn = Path(f'{PDB_ID}_{j}.pdb')
|
||
analyzer.split_chain(j).to_pdb(fn.as_posix())
|
||
split_dict[j]=fn.read_text()
|
||
# 修复loop区域
|
||
from build_modeller import PDBModeler
|
||
from modeller import ModellerError
|
||
mc_dict = {}
|
||
for mc in missing_info:
|
||
out_file = f'5sws_{mc}.pdb'
|
||
analyzer.split_chain(mc).to_pdb(out_file) # get misschain pdb file
|
||
mc_fasta = analyzer.filter_sequences(mc) # get misschain fasta file
|
||
if len(mc_fasta) == 1:
|
||
mc_fasta = mc_fasta[0]
|
||
out_fasta_file = Path(f'{PDB_ID}_{mc}.fasta')
|
||
analyzer.write_seq_to_fasta_single_line(mc_fasta, out_fasta_file)
|
||
print(f'>{mc_fasta.description}')
|
||
print(mc_fasta.seq)
|
||
modeller = PDBModeler(PDB_file_path, out_fasta_file, Path('.'), mc, 1, 'refine.very_fast')
|
||
try:
|
||
modeller_results = modeller.make_model()
|
||
except ModellerError:
|
||
print(f'Failed to build model for chain {mc}')
|
||
print(f'No loops detected in {out_fasta_file.name}')
|
||
print(f'may pdb file sequence is not correct')
|
||
continue
|
||
except Exception as e:
|
||
raise e
|
||
print(f'Model files: {[file.name for file in modeller_results]}')
|
||
# change id to original
|
||
for i in modeller_results:
|
||
manalyzer = PDBAnalyzer(i)
|
||
manalyzer.change_chain_identifier('A', mc, split=False).to_pdb(i)
|
||
if len(modeller_results) == 1:
|
||
# use pymol to align
|
||
aligner = PDBAlign(PDB_file_path, modeller_results[0],Path(f'{PDB_ID}_merge_model.pdb'))
|
||
pdbstr = aligner.align()
|
||
mc_dict[mc] = pdbstr
|
||
else:
|
||
print('more than one model file, please set num_loop to 1')
|
||
else:
|
||
raise ValueError(f'only can fix one chain content: {mc_fasta}')
|
||
|
||
# 使用示例
|
||
split_dict.update(mc_dict) # 更新 split_dict
|
||
import_and_merge_pdb_strings(split_dict, "merged_object", f'{PDB_ID}.modellerfix.pdb')
|
||
|
||
def import_and_merge_pdb_strings(pdb_strings, merged_object_name, output_file):
|
||
"""
|
||
从 PDB 字符串导入多个对象,将它们合并,并保存为一个 PDB 文件。
|
||
|
||
:param pdb_strings: 字典,键为对象名称,值为 PDB 数据字符串。
|
||
:param merged_object_name: 合并后的对象名称。
|
||
:param output_file: 输出文件的路径。
|
||
"""
|
||
# 初始化 PyMOL(如果在脚本或非交互式环境中使用)
|
||
cmd.reinitialize()
|
||
|
||
# 从字符串导入每个对象
|
||
for chain_id, pdb_str in pdb_strings.items():
|
||
object_name = f"chain_{chain_id}" # 创建唯一的对象名称
|
||
cmd.read_pdbstr(pdb_str, object_name)
|
||
|
||
# 合并所有对象为一个
|
||
object_names = [f"chain_{chain_id}" for chain_id in pdb_strings.keys()]
|
||
cmd.create(merged_object_name, ' or '.join(object_names))
|
||
|
||
# 保存合并后的对象
|
||
cmd.save(output_file, merged_object_name)
|
||
|
||
if __name__ == "__main__":
|
||
# import argparse
|
||
# parser = argparse.ArgumentParser(description="Build model by Modeller")
|
||
# parser.add_argument("-s", "--structure", help="Structure file")
|
||
# parser.add_argument("-o", "--outdir", help="Output directory")
|
||
# parser.add_argument("-f", "--fasta", help="Fasta file")
|
||
# parser.add_argument("-n", "--num_loop", help="Number of loop model")
|
||
# parser.add_argument("-m", "--md_level", help="MD level")
|
||
# parser.add_argument("-c", "--chain", help="Chain ID")
|
||
# args = parser.parse_args()
|
||
pdbfiles = [i for i in Path('../PDBfile').glob('*.pdb')]
|
||
for i in pdbfiles:
|
||
PDB_file_path = i
|
||
PDB_ID = i.stem
|
||
analyzer = PDBAnalyzer(PDB_file_path)
|
||
sequences = analyzer.extract_sequences(missing_char='-') # 或者 'X', 或者 ''
|
||
print(f'Residues info for {PDB_ID}: \n',sequences)
|
||
missing_info = analyzer.extract_sequences_info()
|
||
print(f'Missing residues info for {PDB_ID}:\n {missing_info}')
|
||
# 示例: 使用biopython提取A链(将会保留HETATM)
|
||
chain_extractor = analyzer.extract_chain('A') # 假设要提取的链ID是 'A'
|
||
chain_extractor.save('biopython_extracted_chain_A.pdb') # 保存为PDB文件
|
||
# 示例: 使用biopandas提取A链(将不会保留HETATM)
|
||
chain_extractor = analyzer.split_chain('A') # 假设要提取的链ID是 'A'
|
||
chain_extractor.to_pdb('biopandas_extracted_chain_A.pdb') # 保存为PDB文件
|
||
# A链改B链, 并分割保存为单独文件
|
||
analyzer.change_chain_identifier('A', 'B', split=True).to_pdb(f'{PDB_ID}_B.pdb')
|
||
# 分割所有的链
|
||
split_dict = {}
|
||
for j in analyzer.chain_id_list:
|
||
fn = Path(f'{PDB_ID}_{j}.pdb')
|
||
analyzer.split_chain(j).to_pdb(fn.as_posix())
|
||
split_dict[j]=fn.read_text()
|
||
# 修复loop区域
|
||
from build_modeller import PDBModeler
|
||
from modeller import ModellerError
|
||
mc_dict = {}
|
||
for mc in missing_info:
|
||
out_file = f'5sws_{mc}.pdb'
|
||
analyzer.split_chain(mc).to_pdb(out_file) # get misschain pdb file
|
||
mc_fasta = analyzer.filter_sequences(mc) # get misschain fasta file
|
||
if len(mc_fasta) == 1:
|
||
mc_fasta = mc_fasta[0]
|
||
out_fasta_file = Path(f'{PDB_ID}_{mc}.fasta')
|
||
analyzer.write_seq_to_fasta_single_line(mc_fasta, out_fasta_file)
|
||
print(f'>{mc_fasta.description}')
|
||
print(mc_fasta.seq)
|
||
modeller = PDBModeler(PDB_file_path, out_fasta_file, Path('.'), mc, 1, 'refine.very_fast')
|
||
try:
|
||
modeller_results = modeller.make_model()
|
||
except ModellerError:
|
||
print(f'Failed to build model for chain {mc}')
|
||
print(f'No loops detected in {out_fasta_file.name}')
|
||
print(f'may pdb file sequence is not correct')
|
||
continue
|
||
except Exception as e:
|
||
raise e
|
||
print(f'Model files: {[file.name for file in modeller_results]}')
|
||
# change id to original
|
||
for i in modeller_results:
|
||
manalyzer = PDBAnalyzer(i)
|
||
manalyzer.change_chain_identifier('A', mc, split=False).to_pdb(i)
|
||
if len(modeller_results) == 1:
|
||
# use pymol to align
|
||
aligner = PDBAlign(PDB_file_path, modeller_results[0],Path(f'{PDB_ID}_merge_model.pdb'))
|
||
pdbstr = aligner.align()
|
||
mc_dict[mc] = pdbstr
|
||
else:
|
||
print('more than one model file, please set num_loop to 1')
|
||
elif len(mc_fasta) == 0:
|
||
continue
|
||
else:
|
||
raise ValueError(f'only can fix one chain content: {mc_fasta}')
|
||
|
||
# 使用示例
|
||
split_dict.update(mc_dict) # 更新 split_dict
|
||
import_and_merge_pdb_strings(split_dict, "merged_object", f'{PDB_ID}.modellerfix.pdb')
|
||
|