This commit is contained in:
mm644706215
2025-10-16 17:26:35 +08:00
parent b1d437a06d
commit ea218a3a39
49 changed files with 694742 additions and 2 deletions

0
utils/atom_show.ipynb Normal file → Executable file
View File

0
utils/bond_show.ipynb Normal file → Executable file
View File

0
utils/generate.ipynb Normal file → Executable file
View File

278
utils/simemacrocycle_repair.py Executable file
View File

@@ -0,0 +1,278 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@file :simemacrocycle_repair.py
@Description: :SIME工具生成的16元环大环内酯化合物的自动化价态修复工具
@Date :2025/03/29 18:29:52
@Author :lyzeng
@Email :pylyzeng@gmail.com
@version :1.0
# 安装依赖
pip install rdkit pandas swifter joblib tqdm matplotlib
# 运行脚本
python simemacrocycle_repair.py
'''
##############################
# 模块说明
##############################
"""
主要功能:
1. 批量修复SIME生成的含双键16元环大环内酯的价态错误
2. 自动检测并处理以下问题:
- 碳原子显式价态超限如5价碳
- 不合理的显式氢配置
- 双键立体化学冲突
3. 提供修复结果统计和可视化分析
输入输出:
- 输入包含SMILES的文本文件每行一个分子
- 输出:
- 修复后的CSV文件含原始/修正SMILES和状态
- 修复统计图表PNG
- 摘要报告TXT
依赖环境:
- Python >= 3.7
- RDKit >= 2022.03
- pandas >= 1.3
- swifter >= 1.3
"""
import pandas as pd
from pathlib import Path
import swifter
from joblib import Parallel, delayed
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from collections import Counter
import re
from rdkit import Chem
import warnings
warnings.filterwarnings('ignore', category=UserWarning)
##############################
# 核心修复函数
##############################
def safe_sanitize(mol):
"""
安全标准化分子结构
Parameters:
mol (rdkit.Chem.Mol): 待检测的分子对象
Returns:
int/None: 返回错误原子索引如存在价态错误无错误则返回None
"""
try:
Chem.SanitizeMol(mol)
return None
except Chem.AtomValenceException as e:
match = re.search(r'atom # (\d+)', str(e))
return int(match.group(1)) if match else None
except:
return None
def fix_valence_error(smi):
"""
修复单个SMILES的价态错误
Parameters:
smi (str): 输入SMILES字符串
Returns:
tuple: (修正后SMILES, 状态, 描述信息)
状态可能值:
- 'valid': 原始有效
- 'corrected': 修复成功
- 'kekule': 需Kekule形式
- 'failed': 修复失败
"""
try:
mol = Chem.MolFromSmiles(smi, sanitize=False)
if not mol:
return smi, "invalid", "无法解析SMILES"
error_atom_idx = safe_sanitize(Chem.Mol(mol))
if error_atom_idx is None:
return Chem.MolToSmiles(mol), "valid", "原始有效"
rw_mol = Chem.RWMol(mol)
atom = rw_mol.GetAtomWithIdx(error_atom_idx)
# 修复策略序列
repair_actions = [
lambda: atom.SetNumExplicitHs(0),
lambda: (atom.SetFormalCharge(0), atom.SetNumRadicalElectrons(0)),
lambda: rw_mol.RemoveBond(
list(atom.GetBonds())[-1].GetBeginAtomIdx(),
list(atom.GetBonds())[-1].GetEndAtomIdx()
) if atom.GetDegree() > 1 else None
]
for action in repair_actions:
action()
if safe_sanitize(rw_mol.GetMol()) is None:
return Chem.MolToSmiles(rw_mol.GetMol()), "corrected", f"修复原子 {error_atom_idx}"
Chem.Kekulize(rw_mol)
return Chem.MolToSmiles(rw_mol.GetMol(), kekuleSmiles=True), "kekule", "返回Kekule形式"
except Exception as e:
return smi, "failed", str(e)
##############################
# 并行处理模块
##############################
def batch_process(smi_chunk):
"""
分块处理SMILES列表兼容并行化
Parameters:
smi_chunk (list): SMILES字符串列表
Returns:
list: 包含(修正SMILES, 状态, 信息)的元组列表
"""
return [fix_valence_error(smi) for smi in smi_chunk]
def process_in_chunks(smi_list, chunk_size=50000, n_jobs=4):
"""
分块并行处理大规模SMILES数据
Parameters:
smi_list (list): 原始SMILES列表
chunk_size (int): 每块处理量
n_jobs (int): 并行进程数
Returns:
tuple: (修正SMILES列表, 状态列表, 信息列表)
"""
results = []
for i in tqdm(range(0, len(smi_list), chunk_size),
desc=f"Processing {len(smi_list):,} molecules"):
chunk = smi_list[i:i + chunk_size]
chunk_results = Parallel(n_jobs=n_jobs)(
delayed(batch_process)(chunk[i:i+1000])
for i in range(0, len(chunk), 1000)
)
results.extend([item for sublist in chunk_results for item in sublist])
return list(zip(*results)) if results else ([], [], [])
##############################
# 统计分析模块
##############################
def analyze_results(df):
"""
生成修复结果统计分析报告
Parameters:
df (pd.DataFrame): 包含修复结果的DataFrame
Returns:
dict: 包含关键统计指标的字典
"""
# 计算基本统计量
stats = {
'total_molecules': len(df),
'valid_count': len(df[df['status'] == 'valid']),
'corrected_count': len(df[df['status'] == 'corrected']),
'kekule_count': len(df[df['status'] == 'kekule']),
'failed_count': len(df[df['status'] == 'failed']),
'success_rate': (len(df[df['status'].isin(['valid', 'corrected'])]) / len(df))
}
# 错误分析(仅当存在失败时)
if stats['failed_count'] > 0:
stats['common_errors'] = dict(df[df['status'] == 'failed']['message'].value_counts().head(5))
else:
stats['common_errors'] = {}
# 可视化
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
# 状态分布饼图
status_counts = df['status'].value_counts()
status_counts.plot.pie(ax=ax1, autopct='%1.1f%%', startangle=90)
ax1.set_title('修复状态分布')
# 错误原因柱状图(仅当存在错误时)
if stats['common_errors']:
pd.Series(stats['common_errors']).plot.barh(ax=ax2)
ax2.set_title('Top 5错误原因')
else:
ax2.axis('off')
ax2.text(0.5, 0.5, '无修复失败记录',
ha='center', va='center', fontsize=12)
plt.tight_layout()
plt.savefig('repair_statistics.png', dpi=150)
plt.close()
return stats
##############################
# 主执行流程
##############################
def main(input_path, output_path="fixed_molecules.csv", n_jobs=4):
"""
主处理流程
Parameters:
input_path (str): 输入SMILES文件路径
output_path (str): 输出CSV文件路径
n_jobs (int): 并行进程数
"""
print(f"\n{' SIME大环内酯修复工具 ':=^50}\n")
# 数据加载
smi_list = [s.strip() for s in Path(input_path).read_text().splitlines() if s.strip()]
print(f"✅ 已加载 {len(smi_list):,} 个分子")
# 分子修复
fixed_smiles, statuses, messages = process_in_chunks(smi_list, n_jobs=n_jobs)
# 结果分析
df = pd.DataFrame({
'original_smiles': smi_list,
'fixed_smiles': fixed_smiles,
'status': statuses,
'message': messages
})
stats = analyze_results(df)
# 结果保存
df.to_csv(output_path, index=False)
print(f"\n{' 修复结果统计 ':=^50}")
print(f"总处理数: {stats['total_molecules']:,}")
print(f"成功率: {stats['success_rate']:.2%}")
print(f"\n输出文件已保存至: {output_path} 和 repair_statistics.png")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="SIME大环内酯修复工具")
parser.add_argument('input', help="输入SMILES文件路径")
parser.add_argument('-o', '--output', default="fixed_molecules.csv", help="输出CSV路径")
parser.add_argument('-j', '--jobs', type=int, default=4, help="并行进程数")
args = parser.parse_args()
main(input_path=args.input, output_path=args.output, n_jobs=args.jobs)
'''
# 查看帮助
python simemacrocycle_repair.py -h
# 运行示例
python simemacrocycle_repair.py input.smi -o results.csv -j 8
python simemacrocycle_repair.py ../data/Macro16_SIME_Synthesis/2025-02-26-05-38-39_mcrl_1.smiles -o ../data/Macro16_SIME_Synthesis/fixed_macrolides_2025.csv -j 8
'''

159
utils/smiles_svg_show.py Normal file
View File

@@ -0,0 +1,159 @@
import argparse
import json
import re
from datetime import datetime
from pathlib import Path
from rdkit import Chem
from rdkit.Chem.Draw import rdMolDraw2D
import boto3
# 对象存储配置信息(可随时修改)
BUCKET_NAME = "{Your_Bucket_Name}"
ACCESS_KEY = "{Your_Access_Key}"
SECRET_KEY = "{Your_Secret_Key}"
ENDPOINT_URL = "{Your_Endpoint_Url}"
S3_SVG_PREFIX = "svg_outputs/"
# 生成SVG图片并高亮
def mol_to_svg(mol, highlight_atoms=None, size=(400, 400)):
drawer = rdMolDraw2D.MolDraw2DSVG(size[0], size[1])
drawer.SetFontSize(6)
opts = drawer.drawOptions()
opts.addAtomIndices = True
atom_colors = {}
if highlight_atoms:
for idx in highlight_atoms:
atom_colors[idx] = (1, 0, 0)
drawer.DrawMolecule(
mol,
highlightAtoms=highlight_atoms or [],
highlightAtomColors=atom_colors
)
drawer.FinishDrawing()
return drawer.GetDrawingText()
# 上传到对象存储S3兼容
# 替换原始 upload_svg_to_s3 的返回值
def upload_svg_to_s3(svg_content, object_name):
session = boto3.session.Session(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
s3 = session.resource('s3', endpoint_url=ENDPOINT_URL)
obj = s3.Object(BUCKET_NAME, object_name)
obj.put(Body=svg_content, ContentType='image/svg+xml')
# 返回 R2.dev 公共 URL
return f"https://pub-389f446a01134875b8c7ced0572758de.r2.dev/{object_name}"
# 检测原子价态错误
def find_valence_error_atom(mol):
try:
Chem.SanitizeMol(mol)
return None
except Chem.AtomValenceException as e:
match = re.search(r'atom # (\d+)', str(e))
if match:
return int(match.group(1))
return None
# 保存和读取JSON的方法
def save_json(data, filename):
Path(filename).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
def load_json(filename):
return json.loads(Path(filename).read_text(encoding='utf-8'))
# 获取原子详细状态信息
def get_atom_status(mol, atom_idx):
atom = mol.GetAtomWithIdx(atom_idx)
mol.UpdatePropertyCache(strict=False)
connections = []
for bond in atom.GetBonds():
neighbor_idx = bond.GetOtherAtomIdx(atom_idx)
connections.append({
"connected_to": f"#{neighbor_idx} ({mol.GetAtomWithIdx(neighbor_idx).GetSymbol()})",
"bond_type": str(bond.GetBondType())
})
return {
"explicit_connections": atom.GetDegree(),
"formal_charge": atom.GetFormalCharge(),
"radical_electrons": atom.GetNumRadicalElectrons(),
"implicit_hydrogens": atom.GetNumImplicitHs(),
"explicit_hydrogens": atom.GetNumExplicitHs(),
"connections_detail": connections
}
# 主程序
def main():
parser = argparse.ArgumentParser(description="Process SMILES and optionally highlight atoms using atom index or SMARTS pattern.")
parser.add_argument('--smiles', type=str, required=True, help='SMILES string of molecule')
parser.add_argument('--atom_idx', type=int, help='Atom index to highlight')
parser.add_argument('--smarts', type=str, help='SMARTS pattern to highlight matched atoms')
parser.add_argument('--output', type=str, default="output.json", help='Output JSON filename')
parser.add_argument('--no_s3', action='store_true', help='Save SVG locally instead of S3')
args = parser.parse_args()
mol = Chem.MolFromSmiles(args.smiles, sanitize=False)
# Chem.SanitizeMol(mol) # 手动完成标准化
# Chem.MolToSmiles(mol) # canonical=True by default
error_atom_idx = find_valence_error_atom(mol)
atom_state_info = "OK" if error_atom_idx is None else f"Valence error at atom #{error_atom_idx}"
highlight_atoms = set()
if args.atom_idx is not None:
highlight_atoms.add(args.atom_idx)
if args.smarts:
patt = Chem.MolFromSmarts(args.smarts)
matches = mol.GetSubstructMatches(patt)
for match in matches:
highlight_atoms.update(match)
svg_str = mol_to_svg(mol, highlight_atoms=list(highlight_atoms))
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
svg_filename = f"molecule_{timestamp}.svg"
output_path = Path(args.output)
if not output_path.is_absolute():
output_path = Path.cwd() / output_path
if args.no_s3:
svg_path = output_path.parent / svg_filename
svg_path.write_text(svg_str, encoding='utf-8')
svg_location = str(svg_path)
else:
object_name = f"{S3_SVG_PREFIX}{svg_filename}"
svg_location = upload_svg_to_s3(svg_str, object_name)
output_data = {
"atom_state": atom_state_info,
"svg_url": svg_location,
"svg_filename": svg_filename
}
if args.atom_idx is not None:
output_data["atom_status_detail"] = get_atom_status(mol, args.atom_idx)
save_json(output_data, output_path)
print(f"Results saved to {output_path}")
if __name__ == "__main__":
main()
"""
# 自动修复键值错误
python smiles_svg_show.py --smiles "O=C1C[C@@H](O)C[C@H](O[C@H]9C[C@@](C)(OC)[C@@H](O)[C@H](C)O9)[C@@H](C)C[C@@H](C)C(=O)/C=C/[C@@H](CC)=C/[C@H](O[C@@H]9O[C@H](C)C[C@@H]([C@H]9O)N(C)C)[N@@](C)O1" --atom_idx 30
python smiles_svg_show.py --smiles "CCC1=C\[C@H](O[C@H]2C[C@@](C)(OC)[C@@H](O)[C@H](C)O2)[C@@H](CC=O)OC(=O)C[C@@H](O)C[C@H](O[C@H]2C[C@@](C)(OC)[C@@H](O)[C@H](C)O2)[C@@H](C)C[C@@H](C)C(=O)\C=C\1" --atom_idx 30
# smarts 匹配要求smiles正确
python smiles_svg_show.py --smiles "CCC1=C\[C@H](O[C@H]2C[C@@](C)(OC)[C@@H](O)[C@H](C)O2)[C@@H](CC=O)OC(=O)C[C@@H](O)C[C@H](O[C@H]2C[C@@](C)(OC)[C@@H](O)[C@H](C)O2)[C@@H](C)C[C@@H](C)C(=O)\C=C\1" --smarts "[r16]([#8][#6](=[#8]))"
"""

0
utils/split_multi.ipynb Normal file → Executable file
View File