761 lines
32 KiB
Python
Executable File
761 lines
32 KiB
Python
Executable File
#!/usr/bin/env python
|
||
# -*- encoding: utf-8 -*-
|
||
'''
|
||
@file :analysis_pdb.py
|
||
@Description: : 分析PDB结构
|
||
@Date :2023/11/27 09:57:00
|
||
@Author :lyzeng
|
||
@Email :pylyzeng@gmail.com
|
||
@version :1.0
|
||
# 清理杂原子并初始化PDBAnalyzer
|
||
analyzer = PDBAnalyzer.cleanATOM(pdb_file)
|
||
print(analyzer.pdb_file)
|
||
'''
|
||
# micromamba create -n modeller modeller biopython pymol-open-source biopandas requests -y -c conda-forge -c salilab
|
||
# modeller注册码:MODELIRANJE (<conda_env>//lib/modeller-10.4/modlib/modeller/config.py)
|
||
from dataclasses import dataclass, field
|
||
from Bio.PDB import PDBParser
|
||
from Bio.SeqUtils import seq1
|
||
from Bio.Data import IUPACData
|
||
from typing import List, Dict, Tuple, Optional, Union
|
||
from functools import reduce, partial
|
||
from Bio.PDB import MMCIFIO, PDBIO, Chain, Structure
|
||
from biopandas.pdb import PandasPdb
|
||
from pathlib import Path
|
||
from Bio.SeqRecord import SeqRecord
|
||
from Bio import SeqIO
|
||
from Bio.Align import PairwiseAligner
|
||
import requests
|
||
from copy import deepcopy
|
||
from pymol import cmd
|
||
from pymol import CmdException
|
||
import pymol
|
||
import logging
|
||
from logging.handlers import RotatingFileHandler
|
||
import numpy as np
|
||
# 使用 BioPython 导入氨基酸缩写
|
||
AMINO_ACIDS = set(IUPACData.protein_letters)
|
||
|
||
class PyMOLInstance:
|
||
"""
|
||
Class to handle a separate PyMOL instance.
|
||
"""
|
||
def __init__(self):
|
||
self.cmd = pymol.cmd
|
||
self.cmd.reinitialize()
|
||
|
||
@classmethod
|
||
def change_chain_identifier(cls, pdb_file: Path, old_chain_id: str, new_chain_id: str, output_file: Path):
|
||
"""
|
||
Change the identifier of a specific chain in a PDB file using PyMOL.
|
||
|
||
Args:
|
||
pdb_file (Path): Path of the PDB file.
|
||
old_chain_id (str): Original identifier of the chain.
|
||
new_chain_id (str): New identifier to be assigned to the chain.
|
||
output_file (Path): Path to save the modified PDB file.
|
||
"""
|
||
cmd = pymol.cmd
|
||
cmd.reinitialize()
|
||
try:
|
||
cmd.load(pdb_file.as_posix(), "temp_structure")
|
||
cmd.alter(f'temp_structure and chain {old_chain_id}', f'chain="{new_chain_id}"')
|
||
cmd.save(output_file.as_posix(), "temp_structure")
|
||
except CmdException as e:
|
||
print(f"Error in PyMOL command: {e}")
|
||
finally:
|
||
cmd.delete("temp_structure")
|
||
|
||
def save(self, output_file: Path, format: str = 'pdb'):
|
||
"""
|
||
Save the current state of the PyMOL instance to a file.
|
||
|
||
Args:
|
||
output_file (Path): Path to save the modified PDB file.
|
||
"""
|
||
try:
|
||
self.cmd.save(filename=output_file.as_posix(), selection="temp_structure", format=format)
|
||
finally:
|
||
self.cmd.delete("temp_structure")
|
||
|
||
# 使用示例
|
||
def PyMOLInstance_change_chain_identifier(pdb_file_path: Path, old_chain_id: str, new_chain_id: str, output_file_path: Path):
|
||
# pdb_file_path = Path("path/to/your/pdb_file.pdb") # 更改为实际的文件路径
|
||
# old_chain_id = "A"
|
||
# new_chain_id = "D"
|
||
# output_file_path = Path("path/to/your/output_file.pdb") # 更改为希望保存的输出路径
|
||
a = PyMOLInstance.change_chain_identifier(pdb_file_path, old_chain_id, new_chain_id, output_file_path)
|
||
a.save(output_file_path)
|
||
|
||
@dataclass
|
||
class PDBAnalyzer:
|
||
"""
|
||
PDBAnalyzer class for analyzing protein structures from PDB files.
|
||
|
||
Attributes:
|
||
pdb_file (Path): Path to the PDB file to be analyzed.
|
||
structure (object): The parsed PDB structure, initialized in __post_init__.
|
||
"""
|
||
pdb_file: Path
|
||
pid: Optional[str] = field(default=None, init=False)
|
||
structure: object = field(init=False)
|
||
protein_structure: object = field(init=False)
|
||
biodf: PandasPdb = field(init=False)
|
||
protein_state: str = field(init=False) # Apo or Holo
|
||
chain_id_list: List[str] = field(init=False)
|
||
log_file: Path = field(init=False)
|
||
logger: logging.Logger = field(init=False)
|
||
ca_distances: List[float] = field(init=False)
|
||
renumber_errors: List[Dict[str, Tuple[int, int, int]]] = field(default_factory=list)
|
||
|
||
def __post_init__(self):
|
||
"""
|
||
Initialize the PDB structure after the object is created.
|
||
"""
|
||
self.log_file = self.pdb_file.with_suffix('.log') # 设置日志文件路径
|
||
self._configure_logging()
|
||
self._initialize_structure()
|
||
self._initialize_properties()
|
||
self.check_and_log_sequence_issues() # 检查并记录编号问题
|
||
|
||
def _initialize_structure(self):
|
||
self.structure = PDBParser(QUIET=True).get_structure('PDB_structure', self.pdb_file.as_posix())
|
||
self.protein_structure = self.structure
|
||
|
||
def _initialize_properties(self):
|
||
"""Initialize properties based on the pdb_file."""
|
||
self.pdb_file_stem = self.pdb_file.stem.split('.')[0]
|
||
self.pid = self.pdb_file_stem.lower() if len(self.pdb_file_stem) == 4 else None
|
||
self.biodata = PandasPdb().read_pdb(self.pdb_file.as_posix())
|
||
self.protein_state = 'Holo' if 'HETATM' in self.biodata.df.keys() else 'Apo'
|
||
self.chain_id_list = self.biodata.df['ATOM']['chain_id'].drop_duplicates().to_list()
|
||
self.ca_distances = []
|
||
|
||
|
||
def _configure_logging(self):
|
||
"""配置日志系统。"""
|
||
self.logger = logging.getLogger(f"PDBAnalyzer_{self.pdb_file.stem}")
|
||
self.logger.setLevel(logging.DEBUG) # 设置日志级别
|
||
# 创建一个文件处理器,并设置级别和格式
|
||
file_handler = RotatingFileHandler(self.log_file, maxBytes=1024*1024*5, backupCount=5)
|
||
file_handler.setLevel(logging.DEBUG)
|
||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
file_handler.setFormatter(formatter)
|
||
# 添加处理器到logger
|
||
self.logger.addHandler(file_handler)
|
||
|
||
def _initialize_structure(self):
|
||
"""Initialize properties based on the pdb_file."""
|
||
self.pdb_file_stem = self.pdb_file.stem.split('.')[0]
|
||
self.pid = self.pdb_file_stem.lower() if len(self.pdb_file_stem) == 4 else None
|
||
|
||
parser = PDBParser(QUIET=True)
|
||
self.structure = parser.get_structure(self.pdb_file_stem, self.pdb_file)
|
||
self.protein_structure = self.structure # Assuming identical for simplified reference
|
||
|
||
self.biodata = PandasPdb().read_pdb(self.pdb_file.as_posix())
|
||
self.protein_state = 'Holo' if 'HETATM' in self.biodata.df.keys() else 'Apo'
|
||
self.chain_id_list = self.biodata.df['ATOM']['chain_id'].drop_duplicates().to_list()
|
||
|
||
self.logger.info("Structure and properties initialized.")
|
||
|
||
def calculate_distance(self, chain_id, res1, res2):
|
||
chain = self.structure[0][chain_id] # 假设只有一个模型
|
||
try:
|
||
res1_ca = chain[res1]['CA']
|
||
res2_ca = chain[res2]['CA']
|
||
distance = res1_ca - res2_ca
|
||
return distance
|
||
except KeyError:
|
||
# 如果没有CA原子,返回一个很大的值表示距离无穷大
|
||
return float('inf')
|
||
|
||
@classmethod
|
||
def cleanATOM(cls, input_file: Path, out_file: Path = None, ext: str = ".clean.pdb") -> 'PDBAnalyzer':
|
||
"""
|
||
Class method to clean PDB file by extracting all ATOM and TER records and write them to a new file.
|
||
|
||
Args:
|
||
input_file (Path): Path of the PDB file to be cleaned.
|
||
out_file (Path): Optional; output filename. Defaults to None, which will create <input_file>_clean.pdb.
|
||
ext (str): Extension for the output file if out_file is not specified. Defaults to "_clean.pdb".
|
||
|
||
Returns:
|
||
PDBAnalyzer: An instance of PDBAnalyzer pointing to the cleaned PDB file.
|
||
"""
|
||
# Define the output file name if not provided
|
||
if out_file is None:
|
||
out_file = input_file.with_suffix(ext)
|
||
|
||
# Extract ATOM and TER lines
|
||
with open(input_file, "r") as fid:
|
||
good_lines = [line for line in fid if line.startswith(("ATOM", "TER"))]
|
||
|
||
# Write the selected records to the new file
|
||
with open(out_file, "w") as fid:
|
||
fid.writelines(good_lines)
|
||
|
||
# Return a new instance of PDBAnalyzer pointing to the cleaned file
|
||
return cls(out_file)
|
||
|
||
def collect_renumbering_info(self):
|
||
"""
|
||
收集需要重新编号的残基信息。
|
||
"""
|
||
self.renumbering_info = {} # Chain ID as key, list of (original_res_num, new_res_num) tuples as value
|
||
for chain_id in self.chain_id_list:
|
||
residues = self.biodata.df['ATOM'][self.biodata.df['ATOM']['chain_id'] == chain_id]
|
||
unique_residues = residues['residue_number'].unique()
|
||
sorted_residues = sorted(unique_residues)
|
||
|
||
new_res_num = 1
|
||
self.renumbering_info[chain_id] = []
|
||
for orig_res_num in sorted_residues:
|
||
self.renumbering_info[chain_id].append((orig_res_num, new_res_num))
|
||
new_res_num += 1
|
||
|
||
def log_numbering_error(self, chain_id: str, start_residue: int, end_residue: int, estimated_missing: int):
|
||
"""
|
||
记录编号错误的信息。
|
||
|
||
Args:
|
||
chain_id (str): 链的ID。
|
||
start_residue (int): 错误开始的残基编号。
|
||
end_residue (int): 错误结束的残基编号。
|
||
estimated_missing (int): 估计的缺失残基数量。
|
||
"""
|
||
self.renumber_errors.append({
|
||
"chain_id": chain_id,
|
||
"start_residue": start_residue,
|
||
"end_residue": end_residue,
|
||
"estimated_missing": estimated_missing
|
||
})
|
||
|
||
def check_and_log_sequence_issues(self):
|
||
"""
|
||
检测并记录每条链的编号问题,并计算相邻残基间的距离。
|
||
"""
|
||
for chain_id, detailed_seq in self.extract_sequences(detailed=True).items():
|
||
if detailed_seq[0][0] != 1:
|
||
self.logger.warning(f"Chain {chain_id} does not start with residue number 1.")
|
||
|
||
for i in range(len(detailed_seq) - 1):
|
||
prev_res, next_res = detailed_seq[i], detailed_seq[i + 1]
|
||
diff = next_res[0] - prev_res[0]
|
||
distance = self.calculate_distance(chain_id, prev_res[0], next_res[0])
|
||
self.ca_distances.append(distance)
|
||
|
||
min_expected_dist = 3.3 * (diff - 0.5)
|
||
max_expected_dist = 4.3 * (diff + 0.5)
|
||
|
||
# 简化条件判断
|
||
if diff == 1:
|
||
if not (3.3 <= distance <= 4.3):
|
||
self.logger.warning(f"Potential numbering error or unexpected distance in chain {chain_id} between residues {prev_res[0]} ({prev_res[1]}) and {next_res[0]} ({next_res[1]}). Distance: {distance:.2f} Å")
|
||
elif diff > 1:
|
||
if (min_expected_dist <= distance <= max_expected_dist):
|
||
self.logger.info(f"Potential missing residue in chain {chain_id} between residues {prev_res[0]} and {next_res[0]}")
|
||
else:
|
||
missing_number = int(np.round(distance / 3.8 ) - 1)
|
||
self.logger.warning(f"Wrong sequence numbering in chain {chain_id} between residues {prev_res[0]} and {next_res[0]}, distance: {distance:.2f} Å, missing residue number: {missing_number}")
|
||
self.log_numbering_error(chain_id, prev_res[0], next_res[0], missing_number)
|
||
|
||
@classmethod
|
||
def renumber_residues_based_on_issues_and_clean(cls, input_file: Path, out_ext: str = ".renumbered.pdb", chains: Union[List[str], str, None] = None) -> 'PDBAnalyzer':
|
||
"""
|
||
先清洗PDB文件,再重新编号,并返回处理后的cls实例。
|
||
|
||
Args:
|
||
input_file (Path): PDB文件路径。
|
||
out_ext (str): 重新编号后的输出文件扩展名,默认为".renumbered.pdb"。
|
||
chains (Union[List[str], str, None]): 可选;指定要重新编号的链。
|
||
|
||
Returns:
|
||
PDBAnalyzer: 经过清洗和重新编号的PDBAnalyzer实例。
|
||
"""
|
||
cleaned_analyzer = cls.cleanATOM(input_file)
|
||
modifications = cleaned_analyzer.find_issues_and_generate_modifications(chains)
|
||
cleaned_analyzer.batch_modify_residues(modifications)
|
||
output_file = input_file.with_suffix(out_ext)
|
||
cleaned_analyzer.save_biodata(output_file) # 假设这个方法可以将修改后的数据写入文件
|
||
|
||
# 返回一个新的PDBAnalyzer实例,指向重新编号后的文件
|
||
return cls(output_file)
|
||
|
||
def find_issues_and_generate_modifications(self, specified_chains=None) -> dict:
|
||
"""
|
||
根据实际情况生成modifications字典,用于重新编号。
|
||
|
||
Args:
|
||
specified_chains (Union[List[str], str, None]): 指定的链。
|
||
|
||
Returns:
|
||
dict: 修改方案字典。
|
||
"""
|
||
modifications = {}
|
||
|
||
# 示例逻辑:检查每条链的编号问题并生成修改方案
|
||
for chain_id in self.chain_id_list:
|
||
if specified_chains and chain_id not in specified_chains:
|
||
continue # 如果指定了链且当前链不在指定列表中,则跳过
|
||
|
||
# 生成连续的新编号方案
|
||
residues = self.biodata.df['ATOM'][self.biodata.df['ATOM']['chain_id'] == chain_id]
|
||
original_res_nums = residues['residue_number'].unique()
|
||
sorted_original_res_nums = sorted(original_res_nums)
|
||
|
||
# 检查并生成修改方案
|
||
new_res_nums = range(1, len(sorted_original_res_nums) + 1)
|
||
changes = zip(sorted_original_res_nums, new_res_nums)
|
||
modifications[chain_id] = list(changes)
|
||
|
||
return modifications
|
||
|
||
def modify_residue_number(self, chain_id: str, original_res_num: int, new_res_num: int, return_pandas: bool = False):
|
||
mask = (self.biodata.df['ATOM']['chain_id'] == chain_id) & \
|
||
(self.biodata.df['ATOM']['residue_number'] == original_res_num)
|
||
if not mask.any():
|
||
self.logger.warning(f"Residue number {original_res_num} in chain {chain_id} does not exist in the PDB file.")
|
||
return
|
||
self.biodata.df['ATOM'].loc[mask, 'residue_number'] = new_res_num
|
||
if return_pandas:
|
||
return self.biodata
|
||
|
||
def batch_modify_residues(self, modifications: dict, return_pandas: bool = False):
|
||
for chain_id, changes in modifications.items():
|
||
for original_res_num, new_res_num in changes:
|
||
self.modify_residue_number(chain_id, original_res_num, new_res_num)
|
||
if return_pandas:
|
||
return self.biodata
|
||
|
||
def extract_chains_to_new_pdb(self, chains: List[str]) -> PandasPdb:
|
||
"""
|
||
Extract specified chains into a new PandasPdb object.
|
||
|
||
Args:
|
||
chains (List[str]): List of chain IDs to be extracted.
|
||
|
||
Returns:
|
||
PandasPdb: A new PandasPdb object containing only the specified chains.
|
||
|
||
Raises:
|
||
ValueError: If any of the specified chains are not found in the PDB file.
|
||
"""
|
||
# Check if all specified chains exist in the PDB file
|
||
if not all(chain in self.chain_id_list for chain in chains):
|
||
missing_chains = [chain for chain in chains if chain not in self.chain_id_list]
|
||
raise ValueError(f"Chains {missing_chains} not found in the PDB file. {self.pdb_file.as_posix()}")
|
||
|
||
# Create a new PandasPdb object for the specified chains
|
||
new_ppdb = PandasPdb()
|
||
|
||
# Extract ATOM records for specified chains
|
||
new_ppdb.df['ATOM'] = self.biodata.df['ATOM'][self.biodata.df['ATOM']['chain_id'].isin(chains)]
|
||
|
||
# Extract HETATM records for specified chains, if needed
|
||
if 'HETATM' in self.biodata.df:
|
||
new_ppdb.df['HETATM'] = self.biodata.df['HETATM'][self.biodata.df['HETATM']['chain_id'].isin(chains)]
|
||
|
||
return new_ppdb
|
||
|
||
def sequence_similarity(self, seq1: str, seq2: str) -> float:
|
||
"""
|
||
Calculate the similarity between two sequences.
|
||
|
||
Args:
|
||
seq1 (str): First sequence.
|
||
seq2 (str): Second sequence.
|
||
|
||
Returns:
|
||
float: Similarity score between 0 and 1, where 1 is identical.
|
||
"""
|
||
aligner = PairwiseAligner()
|
||
aligner.mode = 'global'
|
||
score = aligner.score(seq1, seq2)
|
||
max_score = min(len(seq1), len(seq2)) * aligner.match_score
|
||
return score / max_score
|
||
|
||
def check_continuity(self, chain, missing_char):
|
||
"""
|
||
Check the continuity of residues in a protein chain.
|
||
|
||
Args:
|
||
chain (Chain): A Chain object from Bio.PDB.
|
||
missing_char (str): Character to denote missing residues.
|
||
|
||
Returns:
|
||
List[Tuple[str, int]]: List of tuples containing residue names and their sequence numbers.
|
||
"""
|
||
def reducer(acc, residue):
|
||
# Accumulate residues, filling in gaps with the specified missing character
|
||
expected_resseq = acc[-1][1] + 1 if acc else residue.id[1]
|
||
while expected_resseq < residue.id[1]:
|
||
acc.append((missing_char, expected_resseq))
|
||
expected_resseq += 1
|
||
acc.append((residue.resname, residue.id[1]))
|
||
return acc
|
||
return reduce(reducer, chain, [])
|
||
|
||
def residue_to_single_letter(self, residue_name):
|
||
"""
|
||
Convert a three-letter residue name to a single-letter code.
|
||
|
||
Args:
|
||
residue_name (str): Three-letter residue name.
|
||
|
||
Returns:
|
||
str: Single-letter residue code or the original string if it's a special character.
|
||
"""
|
||
# Handle special characters separately
|
||
if residue_name in {'-', 'X', ''}:
|
||
return residue_name
|
||
# Convert standard residue names to single-letter codes
|
||
return seq1(residue_name)
|
||
|
||
def extract_sequences(self, missing_char: str = '-', detailed: bool = False) -> Dict[str, Union[str, List[Tuple[int, str]]]]:
|
||
"""
|
||
Extract amino acid sequences from the structure, with gaps denoted by the specified character.
|
||
|
||
Args:
|
||
missing_char (str): Character to use for missing residues (default is '-').
|
||
detailed (bool): If True, returns detailed sequence info as {"chain_ID": ("sequence_number", "amino_single_char")}.
|
||
|
||
Returns:
|
||
Dict[str, Union[str, List[Tuple[int, str]]]]: Dictionary of chain IDs mapped to their amino acid sequences
|
||
or detailed sequence info depending on the detailed flag.
|
||
"""
|
||
sequences = {}
|
||
for model in self.structure:
|
||
for chain in model:
|
||
chain_seq = []
|
||
for residue in chain:
|
||
if residue.id[0] == ' ' or residue.id[0] == 'H_MSE': # MSE is treated as MET in SEQRES
|
||
res_id = residue.id[1]
|
||
try:
|
||
# Convert three-letter code to one-letter code, "-" for gaps
|
||
res_letter = seq1(residue.resname, undef_code=missing_char)
|
||
except KeyError:
|
||
res_letter = missing_char
|
||
if detailed:
|
||
chain_seq.append((res_id, res_letter))
|
||
else:
|
||
chain_seq.append(res_letter)
|
||
if detailed:
|
||
sequences[chain.id] = chain_seq
|
||
else:
|
||
sequences[chain.id] = ''.join(chain_seq)
|
||
return sequences
|
||
|
||
def extract_sequences_info(self) -> Dict[str, List[int]]:
|
||
"""
|
||
Get missing residues information for each chain in the structure.
|
||
Utilizes map and filter for efficient processing.
|
||
|
||
Returns:
|
||
Dict[str, List[int]]: Dictionary mapping each chain ID to a list of missing residue numbers.
|
||
"""
|
||
def find_missing(chain):
|
||
observed = [residue.get_id()[1] for residue in chain if residue.id[0] == ' ']
|
||
if observed:
|
||
full_range = range(min(observed), max(observed) + 1)
|
||
return chain.get_id(), sorted(set(full_range) - set(observed))
|
||
return chain.get_id(), []
|
||
|
||
chains = [chain for model in self.protein_structure for chain in model]
|
||
missing_info = map(find_missing, chains)
|
||
return dict(filter(lambda x: x[1], missing_info))
|
||
|
||
def extract_chain(self, chain_id: str):
|
||
"""
|
||
Extract a specific chain from the structure. use biopython
|
||
|
||
Args:
|
||
chain_id (str): ID of the chain to be extracted.
|
||
|
||
Returns:
|
||
object: An object containing the extracted chain with a save method.
|
||
"""
|
||
target_chain = next((chain for model in self.structure for chain in model if chain.id == chain_id), None)
|
||
if target_chain is None:
|
||
raise ValueError(f"Chain {chain_id} not found in the structure.")
|
||
|
||
def save_to_file(filename: str, file_format: str = 'pdb'):
|
||
"""
|
||
Save the extracted chain to a file in the specified format.
|
||
|
||
Args:
|
||
filename (str): Name of the file to save.
|
||
file_format (str): Format of the file ('pdb' or 'cif'). Default is 'pdb'.
|
||
"""
|
||
if file_format not in ['pdb', 'cif']:
|
||
raise ValueError("Unsupported file format. Use 'pdb' or 'cif'.")
|
||
|
||
if file_format == 'pdb':
|
||
io = PDBIO()
|
||
elif file_format == 'cif':
|
||
io = MMCIFIO()
|
||
|
||
io.set_structure(target_chain)
|
||
io.save(filename)
|
||
|
||
return type('ChainExtractor', (object,), {'save': save_to_file})
|
||
|
||
def get_missing_residues_as_loops(self) -> Dict[str, List[Tuple[int, int, int]]]:
|
||
"""
|
||
Get missing residues information formatted as loops for PyRosetta, along with chain identifiers.
|
||
|
||
Returns:
|
||
Dict[str, List[Tuple[int, int, int]]]: Dictionary mapping chain IDs to lists of tuples representing loop regions (start, end, cut).
|
||
"""
|
||
missing_residues = self.extract_sequences_info()
|
||
loops_by_chain = {}
|
||
for chain_id, missing_res_nums in missing_residues.items():
|
||
loops = []
|
||
for num in missing_res_nums:
|
||
# Define the loop region, assuming simple start and end points
|
||
loops.append((num - 1, num + 1, num)) # Example loop definition
|
||
loops_by_chain[chain_id] = loops
|
||
return loops_by_chain
|
||
|
||
def change_chain_identifier(self, old_chain_id: str, new_chain_id: str, return_pandas: bool = False):
|
||
self.biodata.df['ATOM']['chain_id'] = self.biodata.df['ATOM']['chain_id'].replace(old_chain_id, new_chain_id)
|
||
if return_pandas:
|
||
return self.biodata
|
||
|
||
def change_chain_identifier_biopython(self, old_chain_id: str, new_chain_id: str, split: bool = False) -> PDBIO:
|
||
io = PDBIO()
|
||
|
||
def change_id(chain):
|
||
if chain.id == old_chain_id:
|
||
chain.id = new_chain_id
|
||
return chain
|
||
|
||
if split:
|
||
# 使用 filter 和 map 创建一个只包含已更改的特定链的新结构
|
||
new_structure = Structure.Structure("modified_structure")
|
||
for model in self.structure:
|
||
filtered_chains = filter(lambda c: c.id == old_chain_id, model)
|
||
changed_chains = map(change_id, filtered_chains)
|
||
new_model = reduce(lambda m, c: m.add(c) or m, changed_chains, Structure.Model.Model(model.id))
|
||
new_structure.add(new_model)
|
||
io.set_structure(new_structure)
|
||
else:
|
||
# 直接在整个结构上更改ID
|
||
all_chains = (chain for model in self.structure for chain in model)
|
||
list(map(change_id, all_chains))
|
||
io.set_structure(self.structure)
|
||
|
||
return io
|
||
|
||
def split_chain(self, chain_id: str) -> PandasPdb:
|
||
biodata = deepcopy(self.biodata)
|
||
df = biodata.df['ATOM'] # get ATOM colum
|
||
tmp = PandasPdb() # create empty pdb file object
|
||
tmp.df['ATOM'] = df[df['chain_id'] == chain_id.strip().upper()]
|
||
return tmp # use to_pdb method save
|
||
|
||
def save_biodata(self, out_file: Path):
|
||
self.biodata.to_pdb(out_file.as_posix())
|
||
|
||
@staticmethod
|
||
def write_seq_to_fasta_single_line(seq_record: SeqRecord, output_file: Path):
|
||
with open(output_file, 'w') as output_handle:
|
||
output_handle.write(f">{seq_record.description}\n")
|
||
output_handle.write(str(seq_record.seq) + "\n")
|
||
|
||
def download_fasta(self, pdb_id: str = None):
|
||
if pdb_id == None:
|
||
pdb_id = self.pid
|
||
if pdb_id == None:
|
||
raise ValueError("PDB ID not found.")
|
||
url = f"https://www.rcsb.org/fasta/entry/{pdb_id.upper()}"
|
||
response = requests.get(url)
|
||
if response.status_code == 200:
|
||
output_file = self.pdb_file.parent / f"{pdb_id}.fasta"
|
||
with open(output_file, 'w') as file:
|
||
file.write(response.text)
|
||
return output_file
|
||
else:
|
||
raise Exception(f"Failed to download FASTA file for PDB ID {pdb_id}")
|
||
|
||
def filter_sequences(self, chain_id: str) -> List[SeqRecord]:
|
||
"""
|
||
Filter sequences from a FASTA file based on a specific chain ID.
|
||
|
||
This function is designed to work with FASTA files containing single polypeptide chains (monomers).
|
||
It filters the sequences by matching the specified chain ID with the descriptions in the FASTA file.
|
||
|
||
Args:
|
||
chain_id (str): The chain ID to be used for filtering the sequences.
|
||
|
||
Returns:
|
||
List[SeqRecord]: A list of SeqRecord objects corresponding to the specified chain ID.
|
||
|
||
Note:
|
||
This method assumes that the FASTA file contains sequences of individual chains (monomers) only.
|
||
It may not work correctly if the FASTA file contains sequences from multimeric proteins (with multiple chains combined).
|
||
"""
|
||
return list(filter(lambda x: f"Chain {chain_id}" in x.description, self.read_fasta()))
|
||
|
||
|
||
def find_most_similar(self, input_seq: str) -> str:
|
||
"""
|
||
Find the most similar sequence in the FASTA file to the given input sequence.
|
||
|
||
Args:
|
||
input_seq (str): The protein sequence to compare against sequences in the FASTA file.
|
||
|
||
Returns:
|
||
str: The most similar sequence found in the FASTA file.
|
||
"""
|
||
aligner = PairwiseAligner()
|
||
max_score = -1
|
||
most_similar_seq = None
|
||
|
||
for record in self.read_fasta():
|
||
score = aligner.score(input_seq, str(record.seq))
|
||
if score > max_score:
|
||
max_score = score
|
||
most_similar_seq = record.seq
|
||
|
||
return most_similar_seq
|
||
|
||
def read_fasta(self) -> SeqRecord:
|
||
fasta_file = self.pdb_file.parent / f"{self.pid}.fasta"
|
||
if not fasta_file.exists():
|
||
self.download_fasta()
|
||
return SeqIO.parse(fasta_file.as_posix(), "fasta")
|
||
|
||
@property
|
||
def ca_distances_median(self):
|
||
"""
|
||
计算整个蛋白质中所有氨基酸Cα原子之间距离的中位数。
|
||
|
||
Returns:
|
||
float: 所有Cα原子间距离的中位数。
|
||
"""
|
||
if self.ca_distances:
|
||
return np.median(self.ca_distances)
|
||
else:
|
||
return None
|
||
|
||
@dataclass
|
||
class PDBAlign:
|
||
template_file: Path
|
||
target_file: Path
|
||
pymol_instance: object = field(init=False)
|
||
out_file: Path = field(default=None) # 输出文件路径
|
||
|
||
def __post_init__(self):
|
||
self.initialize_pymol()
|
||
|
||
def initialize_pymol(self):
|
||
self.pymol_instance = pymol.cmd
|
||
self.pymol_instance.reinitialize()
|
||
|
||
def align(self):
|
||
self.pymol_instance.reinitialize()
|
||
# 首先,加载模板结构
|
||
self.pymol_instance.load(self.template_file.as_posix(), "template")
|
||
|
||
# 加载并对齐所有目标结构
|
||
self.pymol_instance.load(self.target_file.as_posix(), "target")
|
||
self.pymol_instance.align("target", "template")
|
||
|
||
return self.pymol_instance.get_pdbstr('target')
|
||
|
||
def main(PDB_ID, PDB_file_path):
|
||
# 示例
|
||
# PDB_ID = '5sws'
|
||
# PDB_file_path = Path(f'{PDB_ID}.pdb')
|
||
analyzer = PDBAnalyzer(PDB_file_path)
|
||
sequences = analyzer.extract_sequences(missing_char='-') # 或者 'X', 或者 ''
|
||
print(f'Residues info for {PDB_ID}: \n',sequences)
|
||
missing_info = analyzer.extract_sequences_info()
|
||
print(f'Missing residues info for {PDB_ID}:\n {missing_info}')
|
||
# 示例: 使用biopython提取A链(将会保留HETATM)
|
||
chain_extractor = analyzer.extract_chain('A') # 假设要提取的链ID是 'A'
|
||
chain_extractor.save('biopython_extracted_chain_A.pdb') # 保存为PDB文件
|
||
# 示例: 使用biopandas提取A链(将不会保留HETATM)
|
||
chain_extractor = analyzer.split_chain('A') # 假设要提取的链ID是 'A'
|
||
chain_extractor.to_pdb('biopandas_extracted_chain_A.pdb') # 保存为PDB文件
|
||
# A链改B链, 并分割保存为单独文件
|
||
analyzer.change_chain_identifier('A', 'B', split=True).to_pdb(f'{PDB_ID}_B.pdb')
|
||
# 分割所有的链
|
||
split_dict = {}
|
||
for j in analyzer.chain_id_list:
|
||
fn = Path(f'{PDB_ID}_{j}.pdb')
|
||
analyzer.split_chain(j).to_pdb(fn.as_posix())
|
||
split_dict[j]=fn.read_text()
|
||
# 修复loop区域
|
||
from build_modeller import PDBModeler
|
||
from modeller import ModellerError
|
||
mc_dict = {}
|
||
for mc in missing_info:
|
||
out_file = f'5sws_{mc}.pdb'
|
||
analyzer.split_chain(mc).to_pdb(out_file) # get misschain pdb file
|
||
mc_fasta = analyzer.filter_sequences(mc) # get misschain fasta file # single polypeptide chains (monomers).
|
||
if len(mc_fasta) == 1:
|
||
mc_fasta = mc_fasta[0]
|
||
out_fasta_file = Path(f'{PDB_ID}_{mc}.fasta')
|
||
analyzer.write_seq_to_fasta_single_line(mc_fasta, out_fasta_file)
|
||
print(f'>{mc_fasta.description}')
|
||
print(mc_fasta.seq)
|
||
modeller = PDBModeler(PDB_file_path, out_fasta_file, Path('.'), mc, 1, 'refine.very_fast')
|
||
try:
|
||
modeller_results = modeller.make_model()
|
||
except ModellerError:
|
||
print(f'Failed to build model for chain {mc}')
|
||
print(f'No loops detected in {out_fasta_file.name}')
|
||
print(f'may pdb file sequence is not correct')
|
||
continue
|
||
except Exception as e:
|
||
raise e
|
||
print(f'Model files: {[file.name for file in modeller_results]}')
|
||
# change id to original
|
||
for i in modeller_results:
|
||
manalyzer = PDBAnalyzer(i)
|
||
manalyzer.change_chain_identifier('A', mc, split=False).to_pdb(i)
|
||
if len(modeller_results) == 1:
|
||
# use pymol to align
|
||
aligner = PDBAlign(PDB_file_path, modeller_results[0],Path(f'{PDB_ID}_merge_model.pdb'))
|
||
pdbstr = aligner.align()
|
||
mc_dict[mc] = pdbstr
|
||
else:
|
||
print('more than one model file, please set num_loop to 1')
|
||
else:
|
||
raise ValueError(f'only can fix one chain content: {mc_fasta}')
|
||
|
||
# 使用示例
|
||
split_dict.update(mc_dict) # 更新 split_dict
|
||
import_and_merge_pdb_strings(split_dict, "merged_object", f'{PDB_ID}.modellerfix.pdb')
|
||
|
||
def import_and_merge_pdb_strings(pdb_strings, merged_object_name, output_file):
|
||
"""
|
||
从 PDB 字符串导入多个对象,将它们合并,并保存为一个 PDB 文件。
|
||
|
||
:param pdb_strings: 字典,键为对象名称,值为 PDB 数据字符串。
|
||
:param merged_object_name: 合并后的对象名称。
|
||
:param output_file: 输出文件的路径。
|
||
"""
|
||
# 初始化 PyMOL(如果在脚本或非交互式环境中使用)
|
||
cmd.reinitialize()
|
||
|
||
# 从字符串导入每个对象
|
||
for chain_id, pdb_str in pdb_strings.items():
|
||
object_name = f"chain_{chain_id}" # 创建唯一的对象名称
|
||
cmd.read_pdbstr(pdb_str, object_name)
|
||
|
||
# 合并所有对象为一个
|
||
object_names = [f"chain_{chain_id}" for chain_id in pdb_strings.keys()]
|
||
cmd.create(merged_object_name, ' or '.join(object_names))
|
||
|
||
# 保存合并后的对象
|
||
cmd.save(output_file, merged_object_name)
|
||
|
||
if __name__ == "__main__":
|
||
pass
|
||
|
||
|
||
|