Files
analysis_pdb/runner.py
2023-12-19 12:50:56 +08:00

155 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@file :runner.py
@Description: :
@Date :2023/12/04 14:34:36
@Author :lyzeng
@Email :pylyzeng@gmail.com
@version :1.0
'''
import time
import logging
from dataclasses import dataclass, field
from pathlib import Path
import subprocess
import shutil
import os
# 设置日志记录
logging.basicConfig(level=logging.INFO, filename='simulation_log.log', filemode='a',
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
class SimulationRunner:
pdb_file: Path
nsteps: int
dt: float
base_folder: Path
bash_script: Path = None # Bash脚本路径作为可选参数
gmxrc_path: Path = None # GMXRC文件路径作为可选参数
temp_folder: Path # 用于存储临时数据和结果数据的路径(轨迹数据处理中间数据等)
runner_folder: Path = field(init=False)
tpr_file: Path = field(init=False)
xtc_file: Path = field(init=False)
def __post_init__(self):
# 初始化文件夹和文件路径
self.runner_folder = self.base_folder / f"runner_{self.pdb_file.stem}"
self.tpr_file = self.runner_folder / "md.tpr"
self.xtc_file = self.runner_folder / "md.xtc"
self.runner_folder.mkdir(exist_ok=True)
self.temp_folder = self.runner_folder / "temp"
self.temp_folder.mkdir(exist_ok=True)
self.bash_script = self.bash_script.absolute() if self.bash_script else Path(__file__).resolve().parent / "md_gromacs.sh"
# 设置 GMXRC_PATH 环境变量
self.gmxrc_path = self.gmxrc_path or Path("/home/lingyuzeng/software/gmx2023.2/bin/GMXRC")
def copy_pdb(self):
shutil.copy(self.pdb_file, self.runner_folder / self.pdb_file.name)
@staticmethod
def read_ndx_file(filename):
ndx_dict = {}
current_section = None
with open(filename, 'r') as file:
for line in file:
line = line.strip()
if line.startswith('[') and line.endswith(']'):
current_section = line[1:-1].strip()
ndx_dict[current_section] = []
else:
if current_section is not None:
ndx_dict[current_section].extend(map(int, line.split()))
return ndx_dict
@staticmethod
echo "Protein" | gmx_mpi trjconv -dt {extract_interval} -s {tpr_file} -f {xtc_file} -n {temp_folder}/tarj_show.ndx -pbc mol -o {temp_folder}/temp.xtc
# 新增处理轨迹的方法
def process_trajectory(self, extract_interval):
# 根据提供的脚本逻辑读取和保存索引文件
ndx_dict = self.read_ndx_file(f'{self.runner_folder}/index.ndx')
# 根据索引文件内容决定如何处理轨迹
if any(key.startswith("LG") for key in ndx_dict):
# 处理含有LG组的情况
new_ndx_dict = {key: value for key, value in ndx_dict.items() if key.startswith("LG") or key in ["Protein", "Protein_LIG"]}
self.save_ndx_file(f"{self.temp_folder}/tarj_show.ndx", new_ndx_dict)
# 构建处理轨迹的命令
command_1 = f'echo "Protein_LIG" | gmx trjconv -dt {extract_interval} -s {self.tpr_file} -f {self.xtc_file} -n {self.temp_folder}/tarj_show.ndx -pbc mol -o {self.temp_folder}/temp.xtc'
command_2 = f'echo "Protein\nProtein\nProtein_LIG" | gmx trjconv -s {self.tpr_file} -f {self.temp_folder}/temp.xtc -n {self.temp_folder}/tarj_show.ndx -center -fit rot+trans -o {self.output_folder}/traj_show.xtc'
command_3 = f'echo "Protein\nProtein\nProtein_LIG" | gmx trjconv -s {self.tpr_file} -f {self.temp_folder}/temp.xtc -n {self.temp_folder}/tarj_show.ndx -center -fit rot+trans -b 0 -e 0 -o {self.output_folder}/tarj_show.pdb'
else:
# 处理只含有蛋白质组的情况
new_ndx_dict = {key: value for key, value in ndx_dict.items() if key in ["Protein"]}
self.save_ndx_file(f"{self.temp_folder}/tarj_show.ndx", new_ndx_dict)
# 构建处理轨迹的命令
command_1 = f'echo "Protein" | gmx trjconv -dt {extract_interval} -s {self.tpr_file} -f {self.xtc_file} -n {self.temp_folder}/tarj_show.ndx -pbc mol -o {self.temp_folder}/temp.xtc'
command_2 = f'echo "Protein\nProtein\nProtein" | gmx trjconv -s {self.tpr_file} -f {self.temp_folder}/temp.xtc -n {self.temp_folder}/tarj_show.ndx -center -fit rot+trans -o {self.output_folder}/traj_show.xtc'
command_3 = f'echo "Protein\nProtein\nProtein" | gmx trjconv -s {self.tpr_file} -f {self.temp_folder}/temp.xtc -n {self.temp_folder}/tarj_show.ndx -center -fit rot+trans -b 0 -e 0 -o {self.output_folder}/tarj_show.pdb'
subprocess.run(command_1, shell=True, check=True)
subprocess.run(command_2, shell=True, check=True)
subprocess.run(command_3, shell=True, check=True)
def run_simulation(self):
start_time = time.time()
result = None
try:
env_vars = {
"NAME": self.pdb_file.stem, # 首先将 NAME 设置为文件的 stem
"NSTEPS": str(self.nsteps),
"DT": str(self.dt),
"GMXRC_PATH": str(self.gmxrc_path)
}
env_vars["HOME"] = os.environ["HOME"]
logger.info(f"pdb_file: {self.pdb_file.name}")
logger.info(f"Executing script at: {self.bash_script}")
result = subprocess.run(["bash", str(self.bash_script)], env=env_vars, cwd=self.runner_folder,
capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
logger.error(f"Error in simulation for {self.pdb_file.name}: {e}")
if e.stdout:
logger.error(f"Standard Output:\n{e.stdout}")
if e.stderr:
logger.error(f"Standard Error:\n{e.stderr}")
end_time = time.time()
duration = end_time - start_time
if result:
logger.info(f"Simulation for {self.pdb_file.name} completed successfully in {duration:.2f} seconds.")
if result.stdout:
logger.info(f"Shell Script Output:\n{result.stdout}")
if result.stderr:
logger.error(f"Shell Script Error Output:\n{result.stderr}")
else:
logger.error(f"Simulation for {self.pdb_file.name} failed in {duration:.2f} seconds.")
def main(simulation_steps, time_step, pdb_folder_path, bash_script_path, gmxrc_path):
pdb_folder = Path(pdb_folder_path).resolve()
for pdb_file in pdb_folder.glob("*.pdb"):
runner = SimulationRunner(pdb_file, simulation_steps, time_step, pdb_folder, bash_script_path, gmxrc_path)
runner.copy_pdb()
logger.info(f"Running simulation for {pdb_file.name} in {runner.runner_folder}...")
runner.run_simulation()
logger.info(f"Finished simulation for {pdb_file.name}.")
runner.process_trajectory(extract_interval=100) # 例如每100ps抽取一次轨迹
logger.info(f"Finished processing trajectory for {pdb_file.name}. per 100ps")
if __name__ == "__main__":
NSTEPS = 50000000 # Example: 50000000 steps
DT = 0.002 # Example: 2 fs time step
PDB_FOLDER_PATH = Path("./pdb_files") # Assuming the PDB files are in a folder named 'pdb_files' in the current directory
# 传入自定义的bash脚本路径
CUSTOM_BASH_SCRIPT_PATH = Path('md_gromacs.sh')
# 传入 GMXRC 文件的路径
GMXRC_PATH = Path('/root/software/gmx2023.2/bin/GMXRC')
main(NSTEPS, DT, PDB_FOLDER_PATH, CUSTOM_BASH_SCRIPT_PATH, GMXRC_PATH)