Files
macro_split/scripts/batch_process_multi_rings.py
2025-11-14 20:34:58 +08:00

424 lines
14 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
批量处理12-20元环大环内酯分子
处理文件: data/ring12_20/temp.csv
输出文件夹: output/ring{N}_fragments/ (N=12,13,14...20)
"""
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
# 获取脚本所在目录的父目录(项目根目录)
script_dir = Path(__file__).resolve().parent
project_root = script_dir.parent
sys.path.insert(0, str(project_root))
from rdkit import Chem
import pandas as pd
import json
from tqdm import tqdm
from datetime import datetime
from collections import defaultdict
from src.ring_numbering import get_ring_atoms
from src.fragment_cleaver import cleave_side_chains
def find_lactone_carbons(mol, ring_size):
"""
找到指定环大小的环上的内酯羰基碳原子
返回所有环上内酯碳的索引列表
"""
# 获取指定环大小的所有环原子
ring_pattern = Chem.MolFromSmarts(f"[r{ring_size}]")
if ring_pattern is None:
return []
ring_matches = mol.GetSubstructMatches(ring_pattern)
ring_atoms = list(set([match[0] for match in ring_matches]))
if len(ring_atoms) == 0:
return []
# 使用更严格的SMARTS模式环上的氧连接到环上的羰基碳
# [r{ring_size};#8] 环上的氧,连接到 [r{ring_size};#6] 环上的碳,该碳有双键氧 (=O)
lactone_pattern = Chem.MolFromSmarts(f"[r{ring_size};#8][r{ring_size};#6](=[#8])")
if lactone_pattern is None:
return []
matches = mol.GetSubstructMatches(lactone_pattern)
# 提取环上的内酯碳索引 (match[1]是羰基碳)
lactone_carbons = []
for match in matches:
if len(match) >= 2:
carbonyl_carbon = match[1] # 羰基碳
# 确保这个碳确实在指定大小的环上
if carbonyl_carbon in ring_atoms:
lactone_carbons.append(carbonyl_carbon)
return list(set(lactone_carbons)) # 去重
def order_ring_atoms_clockwise(mol, ring_atoms, start_atom):
"""顺时针排列环原子"""
if start_atom not in ring_atoms:
raise ValueError("Start atom is not in the ring")
ordered = [start_atom]
visited = {start_atom}
current = start_atom
while len(ordered) < len(ring_atoms):
atom = mol.GetAtomWithIdx(current)
# 找到下一个环原子
next_atom = None
for neighbor in atom.GetNeighbors():
neighbor_idx = neighbor.GetIdx()
if neighbor_idx in ring_atoms and neighbor_idx not in visited:
next_atom = neighbor_idx
break
if next_atom is None:
# 尝试从起点反向遍历
if len(ordered) < len(ring_atoms):
atom = mol.GetAtomWithIdx(start_atom)
for neighbor in atom.GetNeighbors():
neighbor_idx = neighbor.GetIdx()
if neighbor_idx in ring_atoms and neighbor_idx not in visited:
ordered.insert(0, neighbor_idx)
visited.add(neighbor_idx)
current = neighbor_idx
break
else:
break
else:
break
else:
ordered.append(next_atom)
visited.add(next_atom)
current = next_atom
return ordered
def assign_ring_numbering_general(mol, ring_size):
"""
为任意大小的环分配编号
返回: (numbering_dict, lactone_carbon) 或 (None, None) 或 ('multiple_lactones', lactone_list)
"""
# 找到内酯碳
lactone_carbons = find_lactone_carbons(mol, ring_size)
if len(lactone_carbons) == 0:
return None, None
if len(lactone_carbons) > 1:
return 'multiple_lactones', lactone_carbons
lactone_c = lactone_carbons[0]
# 获取环原子
ring_atoms = get_ring_atoms(mol)
# 如果环原子数不匹配
if len(ring_atoms) != ring_size:
return None, None
# 排列环原子
try:
ordered_atoms = order_ring_atoms_clockwise(mol, ring_atoms, lactone_c)
except Exception:
return None, None
# 创建编号字典
numbering = {}
for position, atom_idx in enumerate(ordered_atoms, start=1):
numbering[atom_idx] = position
return numbering, lactone_c
def validate_numbering_general(numbering, ring_size):
"""验证编号是否正确"""
if numbering is None or numbering == 'multiple_lactones':
return False
if len(numbering) != ring_size:
return False
positions = set(numbering.values())
if positions != set(range(1, ring_size + 1)):
return False
return True
def get_ring_size_from_smiles(smiles):
"""从SMILES获取分子中的环大小"""
mol = Chem.MolFromSmiles(smiles)
if mol is None:
return []
ring_sizes = set()
for ring_size in range(12, 21): # 12-20
pattern = Chem.MolFromSmarts(f"[r{ring_size}]")
if pattern is not None:
matches = mol.GetSubstructMatches(pattern)
if matches:
unique_atoms = list(set([match[0] for match in matches]))
if len(unique_atoms) == ring_size:
ring_sizes.add(ring_size)
return sorted(list(ring_sizes))
def process_single_molecule(idx, row, ring_size, output_base_dir, log_file, error_log_file, multiple_lactone_log):
"""处理单个分子"""
try:
# 获取分子信息
molecule_id = row.get('IDs', f'mol_{idx}')
molecule_name = row.get('molecule_pref_name', 'Unknown')
smiles = row['smiles']
parent_id = f"ring{ring_size}_mol_{idx}"
# 解析SMILES
mol = Chem.MolFromSmiles(smiles)
if mol is None:
log_message(f"分子 {idx} ({molecule_id}) SMILES解析失败", error_log_file, 'error')
return None, 'parse_failed'
# 分配编号
numbering, lactone_c = assign_ring_numbering_general(mol, ring_size)
# 检查环上是否有多个内酯键
if numbering == 'multiple_lactones':
log_message(
f"分子 {idx} ({molecule_id}, {molecule_name}) 环上有{len(lactone_c)}个内酯键,已剔除。环上内酯碳索引: {lactone_c}",
multiple_lactone_log,
'info'
)
return None, 'multiple_lactones'
if numbering is None:
log_message(f"分子 {idx} ({molecule_id}) 编号失败", error_log_file, 'error')
return None, 'numbering_failed'
# 验证编号
if not validate_numbering_general(numbering, ring_size):
log_message(f"分子 {idx} ({molecule_id}) 编号验证失败", error_log_file, 'error')
return None, 'validation_failed'
# 侧链断裂
try:
mol_fragments = cleave_side_chains(mol, smiles, parent_id, numbering)
except Exception as e:
log_message(f"分子 {idx} ({molecule_id}) 裂解失败: {str(e)}", error_log_file, 'error')
return None, 'cleavage_failed'
# 创建分子专属文件夹
mol_output_dir = output_base_dir / parent_id
mol_output_dir.mkdir(parents=True, exist_ok=True)
# 保存整个MoleculeFragments对象
mol_fragments_path = mol_output_dir / f"{parent_id}_all_fragments.json"
mol_fragments.to_json_file(str(mol_fragments_path))
# 保存每个碎片
for frag in mol_fragments.fragments:
frag_path = mol_output_dir / f"{frag.fragment_id}.json"
frag.to_json_file(str(frag_path))
# 保存分子信息到metadata
metadata = {
'parent_id': parent_id,
'molecule_id': molecule_id,
'molecule_name': molecule_name,
'smiles': smiles,
'ring_size': ring_size,
'num_fragments': len(mol_fragments.fragments),
'processing_date': datetime.now().isoformat()
}
metadata_path = mol_output_dir / 'metadata.json'
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
return len(mol_fragments.fragments), 'success'
except Exception as e:
log_message(f"分子 {idx} 未预期错误: {str(e)}", error_log_file, 'error')
return None, 'unexpected_error'
def log_message(message, log_file, log_type='info'):
"""记录日志信息"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {message}\n"
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_entry)
if log_type == 'error':
tqdm.write(f"{message}")
elif log_type == 'info' and 'multiple' not in log_file.name:
tqdm.write(f"{message}")
def main():
print("="*80)
print("12-20元环大环内酯批量处理程序")
print("="*80)
print()
# 读取数据
input_file = Path('data/ring12_20/temp.csv')
if not input_file.exists():
print(f"❌ 错误: 找不到输入文件 {input_file}")
return
df = pd.read_csv(input_file)
print(f"✓ 读取数据集: {len(df)} 个分子")
print(f" 文件: {input_file}")
print()
# 第一步:按环大小分类
print("步骤 1: 按环大小分类分子...")
print()
ring_size_groups = defaultdict(list)
for idx in tqdm(range(len(df)), desc="分类进度"):
smiles = df.iloc[idx]['smiles']
ring_sizes = get_ring_size_from_smiles(smiles)
for ring_size in ring_sizes:
if 12 <= ring_size <= 20:
ring_size_groups[ring_size].append(idx)
print()
print("分类结果:")
for ring_size in sorted(ring_size_groups.keys()):
print(f" {ring_size}元环: {len(ring_size_groups[ring_size])} 个分子")
print()
# 第二步:对每种环大小分别处理
print("步骤 2: 处理各环大小分子...")
print()
all_stats = {}
for ring_size in sorted(ring_size_groups.keys()):
indices = ring_size_groups[ring_size]
print(f"\n处理 {ring_size}元环 ({len(indices)} 个分子)...")
print("-" * 60)
# 创建输出文件夹
output_base_dir = Path(f'output/ring{ring_size}_fragments')
output_base_dir.mkdir(parents=True, exist_ok=True)
# 创建日志文件
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = output_base_dir / f'processing_log_{timestamp}.txt'
error_log_file = output_base_dir / f'error_log_{timestamp}.txt'
multiple_lactone_log = output_base_dir / f'multiple_lactone_log_{timestamp}.txt'
# 统计信息
stats = {
'ring_size': ring_size,
'total': len(indices),
'success': 0,
'failed_parse': 0,
'failed_numbering': 0,
'failed_validation': 0,
'failed_cleavage': 0,
'failed_unexpected': 0,
'multiple_lactones': 0,
'total_fragments': 0
}
log_message(f"开始处理 {len(indices)}{ring_size}元环分子", log_file)
# 处理每个分子
for idx in tqdm(indices, desc=f"Ring-{ring_size}"):
num_fragments, status = process_single_molecule(
idx, df.iloc[idx], ring_size,
output_base_dir, log_file, error_log_file, multiple_lactone_log
)
if status == 'success':
stats['success'] += 1
stats['total_fragments'] += num_fragments
elif status == 'parse_failed':
stats['failed_parse'] += 1
elif status == 'numbering_failed':
stats['failed_numbering'] += 1
elif status == 'validation_failed':
stats['failed_validation'] += 1
elif status == 'cleavage_failed':
stats['failed_cleavage'] += 1
elif status == 'multiple_lactones':
stats['multiple_lactones'] += 1
else:
stats['failed_unexpected'] += 1
# 保存统计结果
stats_file = output_base_dir / 'processing_statistics.json'
with open(stats_file, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2)
all_stats[ring_size] = stats
# 打印该环大小的统计
print()
print(f"{ring_size}元环处理结果:")
print(f" 成功: {stats['success']}/{stats['total']} ({stats['success']/stats['total']*100:.1f}%)")
print(f" 碎片数: {stats['total_fragments']}")
if stats['success'] > 0:
print(f" 平均碎片: {stats['total_fragments']/stats['success']:.2f} 个/分子")
print(f" 多个内酯键: {stats['multiple_lactones']}")
log_message(f"处理完成: 成功 {stats['success']}/{stats['total']}", log_file)
# 第三步:总结
print()
print("="*80)
print("所有环大小处理完成!")
print("="*80)
print()
print("总体统计:")
total_molecules = 0
total_success = 0
total_fragments = 0
total_multiple_lactones = 0
for ring_size, stats in sorted(all_stats.items()):
total_molecules += stats['total']
total_success += stats['success']
total_fragments += stats['total_fragments']
total_multiple_lactones += stats['multiple_lactones']
print(f" 总分子数: {total_molecules}")
print(f" 总成功数: {total_success} ({total_success/total_molecules*100:.1f}%)")
print(f" 总碎片数: {total_fragments}")
if total_success > 0:
print(f" 平均碎片: {total_fragments/total_success:.2f} 个/分子")
print(f" 多内酯键分子: {total_multiple_lactones}")
print()
print("各环大小输出文件夹:")
for ring_size in sorted(all_stats.keys()):
output_dir = Path(f'output/ring{ring_size}_fragments')
print(f" {ring_size}元环: {output_dir}")
print()
if __name__ == '__main__':
main()