Compare commits
2 Commits
ca7257ffa6
...
51f11e35d3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51f11e35d3 | ||
|
|
3c25bc619a |
@@ -1,37 +1,3 @@
|
||||
"""
|
||||
--------------------------------------------------------------
|
||||
并行批量SMILES转3D SDF文件脚本
|
||||
--------------------------------------------------------------
|
||||
本脚本用于从CSV文件中批量读取SMILES分子式,利用RDKit并行生成3D分子构象,
|
||||
每个分子单独保存为SDF文件,并输出详细日志和失败统计。
|
||||
|
||||
主要功能:
|
||||
- 支持通过参数动态指定SMILES和标识符字段名
|
||||
- 多进程加速(可设定n_jobs)
|
||||
- 详细彩色日志输出(rich),失败原因自动分类
|
||||
- 成功/失败统计表格
|
||||
- 所有失败分子及原因自动记录至failed_smiles.txt
|
||||
|
||||
依赖:
|
||||
conda install -y -c conda-forge rdkit pandas rich joblib
|
||||
or pip install pandas rich
|
||||
|
||||
推荐用法示例:
|
||||
python gen_sdf_parallel.py --csv input.csv --outdir ./sdf_files \
|
||||
--smiles_col canonical_smiles --id_col identifier --n_jobs 8 --max_attempts 100
|
||||
|
||||
参数说明:
|
||||
--csv 输入的CSV文件路径(必填)
|
||||
--outdir 输出SDF文件夹,默认 ./sdf_files
|
||||
--smiles_col SMILES列名,默认 canonical_smiles
|
||||
--id_col 标识符列名,默认 identifier
|
||||
--n_jobs 并行进程数,默认4
|
||||
--max_attempts 3D构象最大生成尝试次数,默认10
|
||||
|
||||
脚本作者:lingyuzeng
|
||||
最后更新:2025-07
|
||||
--------------------------------------------------------------
|
||||
"""
|
||||
from pathlib import Path
|
||||
import pandas as pd
|
||||
from rdkit import Chem
|
||||
@@ -41,9 +7,18 @@ from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.panel import Panel
|
||||
# rdkit, tqdm, rich,pandas
|
||||
|
||||
console = Console()
|
||||
|
||||
def is_valid_sdf(sdf_path):
|
||||
try:
|
||||
# 尝试读取SDF文件为mol对象
|
||||
suppl = Chem.SDMolSupplier(str(sdf_path), sanitize=False)
|
||||
mols = [mol for mol in suppl if mol is not None]
|
||||
return len(mols) > 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def smiles_to_3d_sdf(identifier, smiles, props, sdf_path, max_attempts=10):
|
||||
try:
|
||||
mol = Chem.MolFromSmiles(smiles)
|
||||
@@ -79,29 +54,44 @@ def batch_csv_to_3d_sdf_parallel(csv_path, output_dir, smiles_col, id_col, n_job
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
df = pd.read_csv(csv_path, sep=',', dtype=str)
|
||||
tasks = []
|
||||
skipped = []
|
||||
for idx, row in df.iterrows():
|
||||
smiles = row[smiles_col]
|
||||
identifier = row[id_col]
|
||||
props = row.to_dict()
|
||||
sdf_file = output_dir / f"{identifier}.sdf"
|
||||
if sdf_file.exists():
|
||||
if is_valid_sdf(sdf_file):
|
||||
# SDF存在且可读,跳过
|
||||
skipped.append(identifier)
|
||||
continue
|
||||
else:
|
||||
# SDF存在但不可读,认为损坏,先删除
|
||||
try:
|
||||
sdf_file.unlink()
|
||||
console.print(f"[red]⚡发现损坏SDF文件 {sdf_file.name},已删除,准备重新生成[/red]")
|
||||
except Exception as e:
|
||||
console.print(f"[bold magenta]❗无法删除损坏SDF: {sdf_file.name}, {e}[/]")
|
||||
tasks.append((identifier, smiles, props, sdf_file, max_attempts))
|
||||
console.rule(f"[bold green]共 {len(tasks)} 个分子,使用 {n_jobs} 并行进程生成[/]")
|
||||
|
||||
console.rule(f"[bold green]共 {len(df)} 个分子,{len(skipped)} 个已存在且有效,{len(tasks)} 个待处理(使用 {n_jobs} 并行进程)[/]")
|
||||
|
||||
results = []
|
||||
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
|
||||
future_to_identifier = {executor.submit(smiles_to_3d_sdf_tuple, task): task[0] for task in tasks}
|
||||
for i, future in enumerate(as_completed(future_to_identifier), 1):
|
||||
identifier, success, msg = future.result()
|
||||
results.append((identifier, success, msg))
|
||||
if success:
|
||||
console.print(f"[bold green]✅ [{identifier}] 处理成功。[/][dim]{msg}[/]")
|
||||
else:
|
||||
if "SMILES解析失败" in msg:
|
||||
console.print(f"[bold red]❌ [{identifier}] SMILES解析失败: {msg}[/]")
|
||||
elif "3D" in msg:
|
||||
console.print(f"[yellow]⚠️ [{identifier}] 3D构象生成失败: {msg}[/]")
|
||||
if tasks:
|
||||
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
|
||||
future_to_identifier = {executor.submit(smiles_to_3d_sdf_tuple, task): task[0] for task in tasks}
|
||||
for i, future in enumerate(as_completed(future_to_identifier), 1):
|
||||
identifier, success, msg = future.result()
|
||||
results.append((identifier, success, msg))
|
||||
if success:
|
||||
console.print(f"[bold green]✅ [{identifier}] 处理成功。[/][dim]{msg}[/]")
|
||||
else:
|
||||
console.print(f"[magenta]❗ [{identifier}] 其它错误: {msg}[/]")
|
||||
if "SMILES解析失败" in msg:
|
||||
console.print(f"[bold red]❌ [{identifier}] SMILES解析失败: {msg}[/]")
|
||||
elif "3D" in msg:
|
||||
console.print(f"[yellow]⚠️ [{identifier}] 3D构象生成失败: {msg}[/]")
|
||||
else:
|
||||
console.print(f"[magenta]❗ [{identifier}] 其它错误: {msg}[/]")
|
||||
|
||||
# 分类失败原因
|
||||
failed = [r for r in results if not r[1]]
|
||||
@@ -115,6 +105,7 @@ def batch_csv_to_3d_sdf_parallel(csv_path, output_dir, smiles_col, id_col, n_job
|
||||
table.add_column("状态", justify="center", style="cyan")
|
||||
table.add_column("数量", justify="center")
|
||||
table.add_row("成功", str(len(succeed)))
|
||||
table.add_row("已跳过(已存在有效SDF)", str(len(skipped)))
|
||||
table.add_row("SMILES解析失败", str(len(failed_smiles)))
|
||||
table.add_row("3D构象失败", str(len(failed_3d)))
|
||||
table.add_row("其它失败", str(len(failed_other)))
|
||||
@@ -159,7 +150,7 @@ def batch_csv_to_3d_sdf_parallel(csv_path, output_dir, smiles_col, id_col, n_job
|
||||
)
|
||||
)
|
||||
else:
|
||||
console.print(Panel("[bold green]全部分子处理成功![/bold green]", style="green"))
|
||||
console.print(Panel("[bold green]全部分子处理成功或已跳过![/bold green]", style="green"))
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
@@ -168,7 +159,7 @@ if __name__ == '__main__':
|
||||
parser.add_argument('--outdir', type=str, default='./sdf_files', help='SDF输出文件夹')
|
||||
parser.add_argument('--smiles_col', type=str, default='canonical_smiles', help='SMILES列名')
|
||||
parser.add_argument('--id_col', type=str, default='identifier', help='标识符列名')
|
||||
parser.add_argument('--n_jobs', type=int, default=-1, help='并行进程数')
|
||||
parser.add_argument('--n_jobs', type=int, default=4, help='并行进程数')
|
||||
parser.add_argument('--max_attempts', type=int, default=10, help='最大尝试次数')
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -176,5 +167,5 @@ if __name__ == '__main__':
|
||||
args.csv, args.outdir, args.smiles_col, args.id_col,
|
||||
n_jobs=args.n_jobs, max_attempts=args.max_attempts
|
||||
)
|
||||
|
||||
# python gen_sdf_parallel.py --csv ./data/coconut_data_info.csv --outdir ./data/sdf_files --n_jobs 8 --max_attempts 100 --smiles_col canonical_smiles --id_col identifier
|
||||
# use example:
|
||||
# python gen_sdf_parallel.py --csv coconut_data_info.csv --outdir ./sdf_files --n_jobs 8 --max_attempts 10 --smiles_col canonical_smiles --id_col identifier
|
||||
95
scripts/move_sdf.py
Normal file
95
scripts/move_sdf.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from rdkit import Chem
|
||||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||
|
||||
# 路径设置
|
||||
SOURCE_DIR = Path(r"D:\inhibitor\data\COCUNUT\COCONUT_sdf")
|
||||
DEST_DIR = Path(r"C:\Users\pylyz\Documents\project\unidock-mcp\scripts\data\molecules")
|
||||
|
||||
DEST_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def process_row(row):
|
||||
identifier = row['identifier']
|
||||
smiles = row['canonical_smiles']
|
||||
sdf_filename = f"{identifier}.sdf"
|
||||
src_sdf = SOURCE_DIR / sdf_filename
|
||||
dst_sdf = DEST_DIR / sdf_filename
|
||||
|
||||
result = {
|
||||
"identifier": identifier,
|
||||
"smiles": smiles,
|
||||
"src_exists": src_sdf.exists(),
|
||||
"copied": False,
|
||||
"rdkit_ok": False,
|
||||
"error": ""
|
||||
}
|
||||
|
||||
if src_sdf.exists():
|
||||
try:
|
||||
shutil.copy2(src_sdf, dst_sdf)
|
||||
result['copied'] = True
|
||||
except Exception as e:
|
||||
result['error'] = f"拷贝失败: {e}"
|
||||
return result
|
||||
try:
|
||||
# 检查SDF文件能否被RDKit读取
|
||||
suppl = Chem.SDMolSupplier(str(dst_sdf), sanitize=False)
|
||||
mols = [mol for mol in suppl if mol is not None]
|
||||
if len(mols) > 0:
|
||||
result['rdkit_ok'] = True
|
||||
else:
|
||||
result['error'] = "RDKit无法读取SDF"
|
||||
except Exception as e:
|
||||
result['error'] = f"RDKit异常: {e}"
|
||||
else:
|
||||
result['error'] = "SDF文件不存在"
|
||||
return result
|
||||
|
||||
def main():
|
||||
csv_path = "coconut_data_info.csv"
|
||||
df = pd.read_csv(csv_path, dtype=str)
|
||||
|
||||
results = []
|
||||
with ProcessPoolExecutor() as executor:
|
||||
futures = [executor.submit(process_row, row) for _, row in df.iterrows()]
|
||||
for fut in as_completed(futures):
|
||||
results.append(fut.result())
|
||||
|
||||
copied_and_valid = [r for r in results if r['src_exists'] and r['copied'] and r['rdkit_ok']]
|
||||
copied_but_invalid = [r for r in results if r['src_exists'] and r['copied'] and not r['rdkit_ok']]
|
||||
no_sdf = [r for r in results if not r['src_exists']]
|
||||
failed_copy = [r for r in results if r['src_exists'] and not r['copied']]
|
||||
|
||||
print("=" * 60)
|
||||
print(f"总分子数: {len(df)}")
|
||||
print(f"有SDF并成功拷贝且可被RDKit读取: {len(copied_and_valid)}")
|
||||
print(f"有SDF拷贝后不能被RDKit读取(坏文件): {len(copied_but_invalid)}")
|
||||
print(f"没有SDF文件: {len(no_sdf)}")
|
||||
print(f"SDF存在但拷贝失败: {len(failed_copy)}")
|
||||
print("=" * 60)
|
||||
|
||||
# 输出详细清单
|
||||
if copied_but_invalid:
|
||||
print("拷贝后坏SDF列表(identifier):")
|
||||
for r in copied_but_invalid:
|
||||
print(f" {r['identifier']} ({r['error']})")
|
||||
print("-" * 30)
|
||||
if no_sdf:
|
||||
print("没有SDF文件的canonical_smiles:")
|
||||
for r in no_sdf:
|
||||
print(f" {r['smiles']} ({r['identifier']})")
|
||||
# 也可以保存为文本文件
|
||||
with open("no_sdf_smiles.txt", "w", encoding="utf-8") as f:
|
||||
for r in no_sdf:
|
||||
f.write(f"{r['smiles']}\t{r['identifier']}\n")
|
||||
print("所有无SDF分子的smiles已保存到 no_sdf_smiles.txt")
|
||||
if failed_copy:
|
||||
print("拷贝失败的分子:")
|
||||
for r in failed_copy:
|
||||
print(f" {r['identifier']} ({r['error']})")
|
||||
print("-" * 30)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
94
scripts/sdf2to3d.py
Normal file
94
scripts/sdf2to3d.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""
|
||||
-------------------------------------------------------------
|
||||
批量2D SDF文件 → 3D SDF文件 并行转换脚本 (joblib版)
|
||||
-------------------------------------------------------------
|
||||
功能:
|
||||
- 从指定目录读取2D SDF文件(每个分子一个SDF文件),并行批量生成3D SDF文件,保留原有属性
|
||||
- 支持失败分子详细记录和统计
|
||||
- 源文件与目标3D SDF目录分离,防止覆盖
|
||||
|
||||
依赖安装:
|
||||
conda install -y -c conda-forge rdkit joblib
|
||||
|
||||
用法示例:
|
||||
python batch_2d_to_3d.py --src_dir ./2d_sdf_dir --out_dir ./3d_sdf_dir --n_jobs 8
|
||||
|
||||
-------------------------------------------------------------
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from rdkit import Chem
|
||||
from rdkit.Chem import AllChem
|
||||
from joblib import Parallel, delayed
|
||||
import argparse
|
||||
|
||||
def convert_2d_to_3d_sdf(sdf_path, out_dir, max_attempts=10):
|
||||
identifier = sdf_path.stem
|
||||
out_sdf = out_dir / f"{identifier}.sdf"
|
||||
try:
|
||||
suppl = Chem.SDMolSupplier(str(sdf_path), sanitize=True)
|
||||
mols = [mol for mol in suppl if mol is not None]
|
||||
if not mols:
|
||||
return identifier, False, "SDF读取失败"
|
||||
mol = mols[0]
|
||||
mol = Chem.AddHs(mol)
|
||||
params = AllChem.ETKDGv3()
|
||||
last_error = ""
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
status = AllChem.EmbedMolecule(mol, params)
|
||||
if status == 0:
|
||||
AllChem.UFFOptimizeMolecule(mol)
|
||||
writer = Chem.SDWriter(str(out_sdf))
|
||||
writer.write(mol)
|
||||
writer.close()
|
||||
return identifier, True, f"成功(第{attempt+1}次)"
|
||||
else:
|
||||
last_error = f"Embed失败: status={status}"
|
||||
except Exception as e:
|
||||
last_error = f"3D生成异常: {e}"
|
||||
return identifier, False, last_error if last_error else f"3D构象生成失败(已重试{max_attempts}次)"
|
||||
except Exception as e:
|
||||
return identifier, False, f"异常: {e}"
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--src_dir', type=str, required=True, help='2D SDF文件夹路径')
|
||||
parser.add_argument('--out_dir', type=str, required=True, help='3D SDF输出文件夹路径')
|
||||
parser.add_argument('--n_jobs', type=int, default=4, help='并行进程数')
|
||||
parser.add_argument('--max_attempts', type=int, default=10, help='最大Embed尝试次数')
|
||||
args = parser.parse_args()
|
||||
|
||||
src_dir = Path(args.src_dir)
|
||||
out_dir = Path(args.out_dir)
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
sdf_files = list(src_dir.glob("*.sdf"))
|
||||
print(f"共检测到2D SDF文件 {len(sdf_files)} 个,开始并行3D生成...")
|
||||
|
||||
results = Parallel(n_jobs=args.n_jobs)(
|
||||
delayed(convert_2d_to_3d_sdf)(sdf_file, out_dir, args.max_attempts) for sdf_file in sdf_files
|
||||
)
|
||||
|
||||
# 分类统计
|
||||
success = [r for r in results if r[1]]
|
||||
failed = [r for r in results if not r[1]]
|
||||
|
||||
print("="*60)
|
||||
print(f"总2D SDF: {len(sdf_files)}")
|
||||
print(f"成功生成3D SDF: {len(success)}")
|
||||
print(f"失败: {len(failed)}")
|
||||
if failed:
|
||||
print("失败分子列表:")
|
||||
for ident, _, msg in failed:
|
||||
print(f" {ident}: {msg}")
|
||||
# 保存失败到文件
|
||||
failfile = out_dir / "failed_2dto3d.txt"
|
||||
with open(failfile, "w", encoding="utf-8") as f:
|
||||
for ident, _, msg in failed:
|
||||
f.write(f"{ident}\t{msg}\n")
|
||||
print(f"失败分子已保存到: {failfile.resolve()}")
|
||||
print("="*60)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user