#!/usr/bin/env python3 """ 批量处理12-20元环大环内酯分子 处理文件: data/ring12_20/temp.csv 输出文件夹: output/ring{N}_fragments/ (N=12,13,14...20) """ import sys from pathlib import Path # 添加项目根目录到 Python 路径 # 获取脚本所在目录的父目录(项目根目录) script_dir = Path(__file__).resolve().parent project_root = script_dir.parent sys.path.insert(0, str(project_root)) from rdkit import Chem import pandas as pd import json from tqdm import tqdm from datetime import datetime from collections import defaultdict from src.ring_numbering import get_ring_atoms from src.fragment_cleaver import cleave_side_chains def find_lactone_carbons(mol, ring_size): """ 找到指定环大小的环上的内酯羰基碳原子 返回所有环上内酯碳的索引列表 """ # 获取指定环大小的所有环原子 ring_pattern = Chem.MolFromSmarts(f"[r{ring_size}]") if ring_pattern is None: return [] ring_matches = mol.GetSubstructMatches(ring_pattern) ring_atoms = list(set([match[0] for match in ring_matches])) if len(ring_atoms) == 0: return [] # 使用更严格的SMARTS模式:环上的氧连接到环上的羰基碳 # [r{ring_size};#8] 环上的氧,连接到 [r{ring_size};#6] 环上的碳,该碳有双键氧 (=O) lactone_pattern = Chem.MolFromSmarts(f"[r{ring_size};#8][r{ring_size};#6](=[#8])") if lactone_pattern is None: return [] matches = mol.GetSubstructMatches(lactone_pattern) # 提取环上的内酯碳索引 (match[1]是羰基碳) lactone_carbons = [] for match in matches: if len(match) >= 2: carbonyl_carbon = match[1] # 羰基碳 # 确保这个碳确实在指定大小的环上 if carbonyl_carbon in ring_atoms: lactone_carbons.append(carbonyl_carbon) return list(set(lactone_carbons)) # 去重 def order_ring_atoms_clockwise(mol, ring_atoms, start_atom): """顺时针排列环原子""" if start_atom not in ring_atoms: raise ValueError("Start atom is not in the ring") ordered = [start_atom] visited = {start_atom} current = start_atom while len(ordered) < len(ring_atoms): atom = mol.GetAtomWithIdx(current) # 找到下一个环原子 next_atom = None for neighbor in atom.GetNeighbors(): neighbor_idx = neighbor.GetIdx() if neighbor_idx in ring_atoms and neighbor_idx not in visited: next_atom = neighbor_idx break if next_atom is None: # 尝试从起点反向遍历 if len(ordered) < len(ring_atoms): atom = mol.GetAtomWithIdx(start_atom) for neighbor in atom.GetNeighbors(): neighbor_idx = neighbor.GetIdx() if neighbor_idx in ring_atoms and neighbor_idx not in visited: ordered.insert(0, neighbor_idx) visited.add(neighbor_idx) current = neighbor_idx break else: break else: break else: ordered.append(next_atom) visited.add(next_atom) current = next_atom return ordered def assign_ring_numbering_general(mol, ring_size): """ 为任意大小的环分配编号 返回: (numbering_dict, lactone_carbon) 或 (None, None) 或 ('multiple_lactones', lactone_list) """ # 找到内酯碳 lactone_carbons = find_lactone_carbons(mol, ring_size) if len(lactone_carbons) == 0: return None, None if len(lactone_carbons) > 1: return 'multiple_lactones', lactone_carbons lactone_c = lactone_carbons[0] # 获取环原子 ring_atoms = get_ring_atoms(mol) # 如果环原子数不匹配 if len(ring_atoms) != ring_size: return None, None # 排列环原子 try: ordered_atoms = order_ring_atoms_clockwise(mol, ring_atoms, lactone_c) except Exception: return None, None # 创建编号字典 numbering = {} for position, atom_idx in enumerate(ordered_atoms, start=1): numbering[atom_idx] = position return numbering, lactone_c def validate_numbering_general(numbering, ring_size): """验证编号是否正确""" if numbering is None or numbering == 'multiple_lactones': return False if len(numbering) != ring_size: return False positions = set(numbering.values()) if positions != set(range(1, ring_size + 1)): return False return True def get_ring_size_from_smiles(smiles): """从SMILES获取分子中的环大小""" mol = Chem.MolFromSmiles(smiles) if mol is None: return [] ring_sizes = set() for ring_size in range(12, 21): # 12-20 pattern = Chem.MolFromSmarts(f"[r{ring_size}]") if pattern is not None: matches = mol.GetSubstructMatches(pattern) if matches: unique_atoms = list(set([match[0] for match in matches])) if len(unique_atoms) == ring_size: ring_sizes.add(ring_size) return sorted(list(ring_sizes)) def process_single_molecule(idx, row, ring_size, output_base_dir, log_file, error_log_file, multiple_lactone_log): """处理单个分子""" try: # 获取分子信息 molecule_id = row.get('IDs', f'mol_{idx}') molecule_name = row.get('molecule_pref_name', 'Unknown') smiles = row['smiles'] parent_id = f"ring{ring_size}_mol_{idx}" # 解析SMILES mol = Chem.MolFromSmiles(smiles) if mol is None: log_message(f"分子 {idx} ({molecule_id}) SMILES解析失败", error_log_file, 'error') return None, 'parse_failed' # 分配编号 numbering, lactone_c = assign_ring_numbering_general(mol, ring_size) # 检查环上是否有多个内酯键 if numbering == 'multiple_lactones': log_message( f"分子 {idx} ({molecule_id}, {molecule_name}) 环上有{len(lactone_c)}个内酯键,已剔除。环上内酯碳索引: {lactone_c}", multiple_lactone_log, 'info' ) return None, 'multiple_lactones' if numbering is None: log_message(f"分子 {idx} ({molecule_id}) 编号失败", error_log_file, 'error') return None, 'numbering_failed' # 验证编号 if not validate_numbering_general(numbering, ring_size): log_message(f"分子 {idx} ({molecule_id}) 编号验证失败", error_log_file, 'error') return None, 'validation_failed' # 侧链断裂 try: mol_fragments = cleave_side_chains(mol, smiles, parent_id, numbering) except Exception as e: log_message(f"分子 {idx} ({molecule_id}) 裂解失败: {str(e)}", error_log_file, 'error') return None, 'cleavage_failed' # 创建分子专属文件夹 mol_output_dir = output_base_dir / parent_id mol_output_dir.mkdir(parents=True, exist_ok=True) # 保存整个MoleculeFragments对象 mol_fragments_path = mol_output_dir / f"{parent_id}_all_fragments.json" mol_fragments.to_json_file(str(mol_fragments_path)) # 保存每个碎片 for frag in mol_fragments.fragments: frag_path = mol_output_dir / f"{frag.fragment_id}.json" frag.to_json_file(str(frag_path)) # 保存分子信息到metadata metadata = { 'parent_id': parent_id, 'molecule_id': molecule_id, 'molecule_name': molecule_name, 'smiles': smiles, 'ring_size': ring_size, 'num_fragments': len(mol_fragments.fragments), 'processing_date': datetime.now().isoformat() } metadata_path = mol_output_dir / 'metadata.json' with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) return len(mol_fragments.fragments), 'success' except Exception as e: log_message(f"分子 {idx} 未预期错误: {str(e)}", error_log_file, 'error') return None, 'unexpected_error' def log_message(message, log_file, log_type='info'): """记录日志信息""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[{timestamp}] {message}\n" with open(log_file, 'a', encoding='utf-8') as f: f.write(log_entry) if log_type == 'error': tqdm.write(f"❌ {message}") elif log_type == 'info' and 'multiple' not in log_file.name: tqdm.write(f"✓ {message}") def main(): print("="*80) print("12-20元环大环内酯批量处理程序") print("="*80) print() # 读取数据 input_file = Path('data/ring12_20/temp.csv') if not input_file.exists(): print(f"❌ 错误: 找不到输入文件 {input_file}") return df = pd.read_csv(input_file) print(f"✓ 读取数据集: {len(df)} 个分子") print(f" 文件: {input_file}") print() # 第一步:按环大小分类 print("步骤 1: 按环大小分类分子...") print() ring_size_groups = defaultdict(list) for idx in tqdm(range(len(df)), desc="分类进度"): smiles = df.iloc[idx]['smiles'] ring_sizes = get_ring_size_from_smiles(smiles) for ring_size in ring_sizes: if 12 <= ring_size <= 20: ring_size_groups[ring_size].append(idx) print() print("分类结果:") for ring_size in sorted(ring_size_groups.keys()): print(f" {ring_size}元环: {len(ring_size_groups[ring_size])} 个分子") print() # 第二步:对每种环大小分别处理 print("步骤 2: 处理各环大小分子...") print() all_stats = {} for ring_size in sorted(ring_size_groups.keys()): indices = ring_size_groups[ring_size] print(f"\n处理 {ring_size}元环 ({len(indices)} 个分子)...") print("-" * 60) # 创建输出文件夹 output_base_dir = Path(f'output/ring{ring_size}_fragments') output_base_dir.mkdir(parents=True, exist_ok=True) # 创建日志文件 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = output_base_dir / f'processing_log_{timestamp}.txt' error_log_file = output_base_dir / f'error_log_{timestamp}.txt' multiple_lactone_log = output_base_dir / f'multiple_lactone_log_{timestamp}.txt' # 统计信息 stats = { 'ring_size': ring_size, 'total': len(indices), 'success': 0, 'failed_parse': 0, 'failed_numbering': 0, 'failed_validation': 0, 'failed_cleavage': 0, 'failed_unexpected': 0, 'multiple_lactones': 0, 'total_fragments': 0 } log_message(f"开始处理 {len(indices)} 个{ring_size}元环分子", log_file) # 处理每个分子 for idx in tqdm(indices, desc=f"Ring-{ring_size}"): num_fragments, status = process_single_molecule( idx, df.iloc[idx], ring_size, output_base_dir, log_file, error_log_file, multiple_lactone_log ) if status == 'success': stats['success'] += 1 stats['total_fragments'] += num_fragments elif status == 'parse_failed': stats['failed_parse'] += 1 elif status == 'numbering_failed': stats['failed_numbering'] += 1 elif status == 'validation_failed': stats['failed_validation'] += 1 elif status == 'cleavage_failed': stats['failed_cleavage'] += 1 elif status == 'multiple_lactones': stats['multiple_lactones'] += 1 else: stats['failed_unexpected'] += 1 # 保存统计结果 stats_file = output_base_dir / 'processing_statistics.json' with open(stats_file, 'w', encoding='utf-8') as f: json.dump(stats, f, indent=2) all_stats[ring_size] = stats # 打印该环大小的统计 print() print(f"{ring_size}元环处理结果:") print(f" 成功: {stats['success']}/{stats['total']} ({stats['success']/stats['total']*100:.1f}%)") print(f" 碎片数: {stats['total_fragments']}") if stats['success'] > 0: print(f" 平均碎片: {stats['total_fragments']/stats['success']:.2f} 个/分子") print(f" 多个内酯键: {stats['multiple_lactones']}") log_message(f"处理完成: 成功 {stats['success']}/{stats['total']}", log_file) # 第三步:总结 print() print("="*80) print("所有环大小处理完成!") print("="*80) print() print("总体统计:") total_molecules = 0 total_success = 0 total_fragments = 0 total_multiple_lactones = 0 for ring_size, stats in sorted(all_stats.items()): total_molecules += stats['total'] total_success += stats['success'] total_fragments += stats['total_fragments'] total_multiple_lactones += stats['multiple_lactones'] print(f" 总分子数: {total_molecules}") print(f" 总成功数: {total_success} ({total_success/total_molecules*100:.1f}%)") print(f" 总碎片数: {total_fragments}") if total_success > 0: print(f" 平均碎片: {total_fragments/total_success:.2f} 个/分子") print(f" 多内酯键分子: {total_multiple_lactones}") print() print("各环大小输出文件夹:") for ring_size in sorted(all_stats.keys()): output_dir = Path(f'output/ring{ring_size}_fragments') print(f" {ring_size}元环: {output_dir}") print() if __name__ == '__main__': main()