#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @file :runner.py @Description: : @Date :2023/12/04 14:34:36 @Author :lyzeng @Email :pylyzeng@gmail.com @version :1.0 ''' import time import logging from dataclasses import dataclass, field from pathlib import Path import subprocess import multiprocessing import shutil import os @dataclass class SimulationRunner: pdb_file: Path nsteps: int dt: float base_folder: Path bash_script: Path = None gmxrc_path: Path = None gpu_id: int = None temp_folder: Path = field(init=False) runner_folder: Path = field(init=False) tpr_file: Path = field(init=False) xtc_file: Path = field(init=False) logger: logging.Logger = field(init=False) def __post_init__(self): # 初始化文件夹和文件路径 self.runner_folder = self.base_folder / f"runner_{self.pdb_file.stem}" self.tpr_file = self.runner_folder / "md.tpr" self.xtc_file = self.runner_folder / "md.xtc" self.runner_folder.mkdir(exist_ok=True) self.temp_folder = self.runner_folder / "temp" self.temp_folder.mkdir(exist_ok=True) self.bash_script = self.bash_script.absolute() if self.bash_script else Path(__file__).resolve().parent / "md_gromacs.sh" # 设置 GMXRC_PATH 环境变量 self.gmxrc_path = self.gmxrc_path pdb_id = self.pdb_file.stem self.logger = self.setup_logging(pdb_id, self.base_folder) @staticmethod def setup_logging(pdb_id, log_folder): log_file = log_folder / f"{pdb_id}_simulation_log.log" if log_file.exists(): log_file.unlink() logging.basicConfig(level=logging.INFO, filename=log_file, filemode='a', format='%(asctime)s - %(levelname)s - %(message)s') return logging.getLogger(pdb_id) def copy_pdb(self): shutil.copy(self.pdb_file, self.runner_folder / self.pdb_file.name) def set_gpu(self, gpu_id): """设置要使用的GPU。""" self.gpu_id = gpu_id @staticmethod def read_ndx_file(filename): ndx_dict = {} current_section = None with open(filename, 'r') as file: for line in file: line = line.strip() if line.startswith('[') and line.endswith(']'): current_section = line[1:-1].strip() ndx_dict[current_section] = [] else: if current_section is not None: ndx_dict[current_section].extend(map(int, line.split())) return ndx_dict # 新增处理轨迹的方法 def process_trajectory(self, extract_interval): # echo "Protein" | gmx_mpi trjconv -dt {extract_interval} -s {tpr_file} -f {xtc_file} -n {temp_folder}/tarj_show.ndx -pbc mol -o {temp_folder}/temp.xtc # 根据提供的脚本逻辑读取和保存索引文件 ndx_dict = self.read_ndx_file(f'{self.runner_folder}/index.ndx') # 根据索引文件内容决定如何处理轨迹 if any(key.startswith("LG") for key in ndx_dict): # 处理含有LG组的情况 new_ndx_dict = {key: value for key, value in ndx_dict.items() if key.startswith("LG") or key in ["Protein", "Protein_LIG"]} self.save_ndx_file(f"{self.temp_folder}/tarj_show.ndx", new_ndx_dict) # 构建处理轨迹的命令 command_1 = f'echo "Protein_LIG" | gmx trjconv -dt {extract_interval} -s {self.tpr_file} -f {self.xtc_file} -n {self.temp_folder}/tarj_show.ndx -pbc mol -o {self.temp_folder}/temp.xtc' command_2 = f'echo "Protein\nProtein\nProtein_LIG" | gmx trjconv -s {self.tpr_file} -f {self.temp_folder}/temp.xtc -n {self.temp_folder}/tarj_show.ndx -center -fit rot+trans -o {self.output_folder}/traj_show.xtc' command_3 = f'echo "Protein\nProtein\nProtein_LIG" | gmx trjconv -s {self.tpr_file} -f {self.temp_folder}/temp.xtc -n {self.temp_folder}/tarj_show.ndx -center -fit rot+trans -b 0 -e 0 -o {self.output_folder}/tarj_show.pdb' else: # 处理只含有蛋白质组的情况 new_ndx_dict = {key: value for key, value in ndx_dict.items() if key in ["Protein"]} self.save_ndx_file(f"{self.temp_folder}/tarj_show.ndx", new_ndx_dict) # 构建处理轨迹的命令 command_1 = f'echo "Protein" | gmx trjconv -dt {extract_interval} -s {self.tpr_file} -f {self.xtc_file} -n {self.temp_folder}/tarj_show.ndx -pbc mol -o {self.temp_folder}/temp.xtc' command_2 = f'echo "Protein\nProtein\nProtein" | gmx trjconv -s {self.tpr_file} -f {self.temp_folder}/temp.xtc -n {self.temp_folder}/tarj_show.ndx -center -fit rot+trans -o {self.output_folder}/traj_show.xtc' command_3 = f'echo "Protein\nProtein\nProtein" | gmx trjconv -s {self.tpr_file} -f {self.temp_folder}/temp.xtc -n {self.temp_folder}/tarj_show.ndx -center -fit rot+trans -b 0 -e 0 -o {self.output_folder}/tarj_show.pdb' subprocess.run(command_1, shell=True, check=True) subprocess.run(command_2, shell=True, check=True) subprocess.run(command_3, shell=True, check=True) def run_simulation(self): start_time = time.time() result = None # 保留1个核心不使用 num_cores = max(1, multiprocessing.cpu_count() - 1) # 避免核心数小于 1 os.environ['NUM_CORES'] = str(num_cores) try: env_vars = { "NAME": self.pdb_file.stem, "NSTEPS": str(self.nsteps), "DT": str(self.dt), "GMXRC_PATH": str(self.gmxrc_path) if self.gmxrc_path else "", "PATH": os.environ.get("PATH", ""), "LD_LIBRARY_PATH": os.environ.get("LD_LIBRARY_PATH", ""), "HOME": os.environ["HOME"], "CUDA_VISIBLE_DEVICES": str(self.gpu_id) if self.gpu_id is not None else "" } self.logger.info(f"pdb_file: {self.pdb_file.name}") self.logger.info(f"Executing script at: {self.bash_script}") result = subprocess.run(["bash", str(self.bash_script)], env=env_vars, cwd=self.runner_folder, capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: self.logger.error(f"Error in simulation for {self.pdb_file.name}: {e}") if e.stdout: self.logger.error(f"Standard Output:\n{e.stdout}") if e.stderr: self.logger.error(f"Standard Error:\n{e.stderr}") end_time = time.time() duration = end_time - start_time if result: self.logger.info(f"Simulation for {self.pdb_file.name} completed successfully in {duration:.2f} seconds.") if result.stdout: self.logger.info(f"Shell Script Output:\n{result.stdout}") if result.stderr: self.logger.error(f"Shell Script Error Output:\n{result.stderr}") else: self.logger.error(f"Simulation for {self.pdb_file.name} failed in {duration:.2f} seconds.") def detect_gpus(): """检测系统上的GPU数量。""" try: output = subprocess.check_output("nvidia-smi -L", shell=True).decode('utf-8') return len(output.strip().split('\n')) except subprocess.CalledProcessError: return 0 def setup_global_logging(log_folder): log_file = log_folder / "simulation_log.log" if log_file.exists(): log_file.unlink() logging.basicConfig(level=logging.INFO, filename=log_file, filemode='a', format='%(asctime)s - %(levelname)s - %(message)s') return logging.getLogger() def run_simulation_task(pdb_file, simulation_steps, time_step, pdb_folder, bash_script_path, gmxrc_path, gpu_id): runner = SimulationRunner(pdb_file, simulation_steps, time_step, pdb_folder, bash_script_path, gmxrc_path, gpu_id) runner.set_gpu(gpu_id) # 设置要使用的GPU runner.copy_pdb() runner.run_simulation() runner.process_trajectory(extract_interval=100) def main(simulation_steps, time_step, pdb_folder_path, bash_script_path, gmxrc_path): pdb_folder = Path(pdb_folder_path) setup_global_logging(pdb_folder) # 设置全局日志记录器 pdb_files = list(pdb_folder.glob("*.pdb")) num_gpus = detect_gpus() if num_gpus == 0: logging.error("No GPUs detected, exiting.") return with multiprocessing.Pool(processes=1) as pool: for i, pdb_file in enumerate(pdb_files): gpu_id = i % num_gpus # 分配GPU pool.apply_async(run_simulation_task, (pdb_file, simulation_steps, time_step, pdb_folder, bash_script_path, gmxrc_path, '0')) pool.close() pool.join() if __name__ == "__main__": NSTEPS = 50000 # Example: 50000000 steps DT = 0.002 # Example: 2 fs time step PDB_FOLDER_PATH = Path("./pdb_gjm") # Assuming the PDB files are in a folder named 'pdb_files' in the current directory # 传入自定义的bash脚本路径 CUSTOM_BASH_SCRIPT_PATH = Path('md_gromacs.sh') # 传入 GMXRC 文件的路径 GMXRC_PATH = Path('/usr/local/gromacs-2021.4-plumed-2.8.0/bin/GMXRC') main(NSTEPS, DT, PDB_FOLDER_PATH, CUSTOM_BASH_SCRIPT_PATH, GMXRC_PATH)