From 51f11e35d349778ff34c803d66aa04f34307258a Mon Sep 17 00:00:00 2001 From: mm644706215 Date: Thu, 31 Jul 2025 13:18:44 +0800 Subject: [PATCH] add --- scripts/move_sdf.py | 95 +++++++++++++++++++++++++++++++++++++++++++++ scripts/sdf2to3d.py | 94 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 scripts/move_sdf.py create mode 100644 scripts/sdf2to3d.py diff --git a/scripts/move_sdf.py b/scripts/move_sdf.py new file mode 100644 index 0000000..0853335 --- /dev/null +++ b/scripts/move_sdf.py @@ -0,0 +1,95 @@ +import pandas as pd +from pathlib import Path +import shutil +from rdkit import Chem +from concurrent.futures import ProcessPoolExecutor, as_completed + +# 路径设置 +SOURCE_DIR = Path(r"D:\inhibitor\data\COCUNUT\COCONUT_sdf") +DEST_DIR = Path(r"C:\Users\pylyz\Documents\project\unidock-mcp\scripts\data\molecules") + +DEST_DIR.mkdir(parents=True, exist_ok=True) + +def process_row(row): + identifier = row['identifier'] + smiles = row['canonical_smiles'] + sdf_filename = f"{identifier}.sdf" + src_sdf = SOURCE_DIR / sdf_filename + dst_sdf = DEST_DIR / sdf_filename + + result = { + "identifier": identifier, + "smiles": smiles, + "src_exists": src_sdf.exists(), + "copied": False, + "rdkit_ok": False, + "error": "" + } + + if src_sdf.exists(): + try: + shutil.copy2(src_sdf, dst_sdf) + result['copied'] = True + except Exception as e: + result['error'] = f"拷贝失败: {e}" + return result + try: + # 检查SDF文件能否被RDKit读取 + suppl = Chem.SDMolSupplier(str(dst_sdf), sanitize=False) + mols = [mol for mol in suppl if mol is not None] + if len(mols) > 0: + result['rdkit_ok'] = True + else: + result['error'] = "RDKit无法读取SDF" + except Exception as e: + result['error'] = f"RDKit异常: {e}" + else: + result['error'] = "SDF文件不存在" + return result + +def main(): + csv_path = "coconut_data_info.csv" + df = pd.read_csv(csv_path, dtype=str) + + results = [] + with ProcessPoolExecutor() as executor: + futures = [executor.submit(process_row, row) for _, row in df.iterrows()] + for fut in as_completed(futures): + results.append(fut.result()) + + copied_and_valid = [r for r in results if r['src_exists'] and r['copied'] and r['rdkit_ok']] + copied_but_invalid = [r for r in results if r['src_exists'] and r['copied'] and not r['rdkit_ok']] + no_sdf = [r for r in results if not r['src_exists']] + failed_copy = [r for r in results if r['src_exists'] and not r['copied']] + + print("=" * 60) + print(f"总分子数: {len(df)}") + print(f"有SDF并成功拷贝且可被RDKit读取: {len(copied_and_valid)}") + print(f"有SDF拷贝后不能被RDKit读取(坏文件): {len(copied_but_invalid)}") + print(f"没有SDF文件: {len(no_sdf)}") + print(f"SDF存在但拷贝失败: {len(failed_copy)}") + print("=" * 60) + + # 输出详细清单 + if copied_but_invalid: + print("拷贝后坏SDF列表(identifier):") + for r in copied_but_invalid: + print(f" {r['identifier']} ({r['error']})") + print("-" * 30) + if no_sdf: + print("没有SDF文件的canonical_smiles:") + for r in no_sdf: + print(f" {r['smiles']} ({r['identifier']})") + # 也可以保存为文本文件 + with open("no_sdf_smiles.txt", "w", encoding="utf-8") as f: + for r in no_sdf: + f.write(f"{r['smiles']}\t{r['identifier']}\n") + print("所有无SDF分子的smiles已保存到 no_sdf_smiles.txt") + if failed_copy: + print("拷贝失败的分子:") + for r in failed_copy: + print(f" {r['identifier']} ({r['error']})") + print("-" * 30) + +if __name__ == "__main__": + main() diff --git a/scripts/sdf2to3d.py b/scripts/sdf2to3d.py new file mode 100644 index 0000000..478648e --- /dev/null +++ b/scripts/sdf2to3d.py @@ -0,0 +1,94 @@ +""" +------------------------------------------------------------- +批量2D SDF文件 → 3D SDF文件 并行转换脚本 (joblib版) +------------------------------------------------------------- +功能: +- 从指定目录读取2D SDF文件(每个分子一个SDF文件),并行批量生成3D SDF文件,保留原有属性 +- 支持失败分子详细记录和统计 +- 源文件与目标3D SDF目录分离,防止覆盖 + +依赖安装: + conda install -y -c conda-forge rdkit joblib + +用法示例: + python batch_2d_to_3d.py --src_dir ./2d_sdf_dir --out_dir ./3d_sdf_dir --n_jobs 8 + +------------------------------------------------------------- +""" + +from pathlib import Path +from rdkit import Chem +from rdkit.Chem import AllChem +from joblib import Parallel, delayed +import argparse + +def convert_2d_to_3d_sdf(sdf_path, out_dir, max_attempts=10): + identifier = sdf_path.stem + out_sdf = out_dir / f"{identifier}.sdf" + try: + suppl = Chem.SDMolSupplier(str(sdf_path), sanitize=True) + mols = [mol for mol in suppl if mol is not None] + if not mols: + return identifier, False, "SDF读取失败" + mol = mols[0] + mol = Chem.AddHs(mol) + params = AllChem.ETKDGv3() + last_error = "" + for attempt in range(max_attempts): + try: + status = AllChem.EmbedMolecule(mol, params) + if status == 0: + AllChem.UFFOptimizeMolecule(mol) + writer = Chem.SDWriter(str(out_sdf)) + writer.write(mol) + writer.close() + return identifier, True, f"成功(第{attempt+1}次)" + else: + last_error = f"Embed失败: status={status}" + except Exception as e: + last_error = f"3D生成异常: {e}" + return identifier, False, last_error if last_error else f"3D构象生成失败(已重试{max_attempts}次)" + except Exception as e: + return identifier, False, f"异常: {e}" + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--src_dir', type=str, required=True, help='2D SDF文件夹路径') + parser.add_argument('--out_dir', type=str, required=True, help='3D SDF输出文件夹路径') + parser.add_argument('--n_jobs', type=int, default=4, help='并行进程数') + parser.add_argument('--max_attempts', type=int, default=10, help='最大Embed尝试次数') + args = parser.parse_args() + + src_dir = Path(args.src_dir) + out_dir = Path(args.out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + sdf_files = list(src_dir.glob("*.sdf")) + print(f"共检测到2D SDF文件 {len(sdf_files)} 个,开始并行3D生成...") + + results = Parallel(n_jobs=args.n_jobs)( + delayed(convert_2d_to_3d_sdf)(sdf_file, out_dir, args.max_attempts) for sdf_file in sdf_files + ) + + # 分类统计 + success = [r for r in results if r[1]] + failed = [r for r in results if not r[1]] + + print("="*60) + print(f"总2D SDF: {len(sdf_files)}") + print(f"成功生成3D SDF: {len(success)}") + print(f"失败: {len(failed)}") + if failed: + print("失败分子列表:") + for ident, _, msg in failed: + print(f" {ident}: {msg}") + # 保存失败到文件 + failfile = out_dir / "failed_2dto3d.txt" + with open(failfile, "w", encoding="utf-8") as f: + for ident, _, msg in failed: + f.write(f"{ident}\t{msg}\n") + print(f"失败分子已保存到: {failfile.resolve()}") + print("="*60) + +if __name__ == '__main__': + main()