first add

This commit is contained in:
2025-11-14 20:34:58 +08:00
commit 0d99f7d12c
46 changed files with 698209 additions and 0 deletions

View File

@@ -0,0 +1,423 @@
#!/usr/bin/env python3
"""
批量处理12-20元环大环内酯分子
处理文件: data/ring12_20/temp.csv
输出文件夹: output/ring{N}_fragments/ (N=12,13,14...20)
"""
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
# 获取脚本所在目录的父目录(项目根目录)
script_dir = Path(__file__).resolve().parent
project_root = script_dir.parent
sys.path.insert(0, str(project_root))
from rdkit import Chem
import pandas as pd
import json
from tqdm import tqdm
from datetime import datetime
from collections import defaultdict
from src.ring_numbering import get_ring_atoms
from src.fragment_cleaver import cleave_side_chains
def find_lactone_carbons(mol, ring_size):
"""
找到指定环大小的环上的内酯羰基碳原子
返回所有环上内酯碳的索引列表
"""
# 获取指定环大小的所有环原子
ring_pattern = Chem.MolFromSmarts(f"[r{ring_size}]")
if ring_pattern is None:
return []
ring_matches = mol.GetSubstructMatches(ring_pattern)
ring_atoms = list(set([match[0] for match in ring_matches]))
if len(ring_atoms) == 0:
return []
# 使用更严格的SMARTS模式环上的氧连接到环上的羰基碳
# [r{ring_size};#8] 环上的氧,连接到 [r{ring_size};#6] 环上的碳,该碳有双键氧 (=O)
lactone_pattern = Chem.MolFromSmarts(f"[r{ring_size};#8][r{ring_size};#6](=[#8])")
if lactone_pattern is None:
return []
matches = mol.GetSubstructMatches(lactone_pattern)
# 提取环上的内酯碳索引 (match[1]是羰基碳)
lactone_carbons = []
for match in matches:
if len(match) >= 2:
carbonyl_carbon = match[1] # 羰基碳
# 确保这个碳确实在指定大小的环上
if carbonyl_carbon in ring_atoms:
lactone_carbons.append(carbonyl_carbon)
return list(set(lactone_carbons)) # 去重
def order_ring_atoms_clockwise(mol, ring_atoms, start_atom):
"""顺时针排列环原子"""
if start_atom not in ring_atoms:
raise ValueError("Start atom is not in the ring")
ordered = [start_atom]
visited = {start_atom}
current = start_atom
while len(ordered) < len(ring_atoms):
atom = mol.GetAtomWithIdx(current)
# 找到下一个环原子
next_atom = None
for neighbor in atom.GetNeighbors():
neighbor_idx = neighbor.GetIdx()
if neighbor_idx in ring_atoms and neighbor_idx not in visited:
next_atom = neighbor_idx
break
if next_atom is None:
# 尝试从起点反向遍历
if len(ordered) < len(ring_atoms):
atom = mol.GetAtomWithIdx(start_atom)
for neighbor in atom.GetNeighbors():
neighbor_idx = neighbor.GetIdx()
if neighbor_idx in ring_atoms and neighbor_idx not in visited:
ordered.insert(0, neighbor_idx)
visited.add(neighbor_idx)
current = neighbor_idx
break
else:
break
else:
break
else:
ordered.append(next_atom)
visited.add(next_atom)
current = next_atom
return ordered
def assign_ring_numbering_general(mol, ring_size):
"""
为任意大小的环分配编号
返回: (numbering_dict, lactone_carbon) 或 (None, None) 或 ('multiple_lactones', lactone_list)
"""
# 找到内酯碳
lactone_carbons = find_lactone_carbons(mol, ring_size)
if len(lactone_carbons) == 0:
return None, None
if len(lactone_carbons) > 1:
return 'multiple_lactones', lactone_carbons
lactone_c = lactone_carbons[0]
# 获取环原子
ring_atoms = get_ring_atoms(mol)
# 如果环原子数不匹配
if len(ring_atoms) != ring_size:
return None, None
# 排列环原子
try:
ordered_atoms = order_ring_atoms_clockwise(mol, ring_atoms, lactone_c)
except Exception:
return None, None
# 创建编号字典
numbering = {}
for position, atom_idx in enumerate(ordered_atoms, start=1):
numbering[atom_idx] = position
return numbering, lactone_c
def validate_numbering_general(numbering, ring_size):
"""验证编号是否正确"""
if numbering is None or numbering == 'multiple_lactones':
return False
if len(numbering) != ring_size:
return False
positions = set(numbering.values())
if positions != set(range(1, ring_size + 1)):
return False
return True
def get_ring_size_from_smiles(smiles):
"""从SMILES获取分子中的环大小"""
mol = Chem.MolFromSmiles(smiles)
if mol is None:
return []
ring_sizes = set()
for ring_size in range(12, 21): # 12-20
pattern = Chem.MolFromSmarts(f"[r{ring_size}]")
if pattern is not None:
matches = mol.GetSubstructMatches(pattern)
if matches:
unique_atoms = list(set([match[0] for match in matches]))
if len(unique_atoms) == ring_size:
ring_sizes.add(ring_size)
return sorted(list(ring_sizes))
def process_single_molecule(idx, row, ring_size, output_base_dir, log_file, error_log_file, multiple_lactone_log):
"""处理单个分子"""
try:
# 获取分子信息
molecule_id = row.get('IDs', f'mol_{idx}')
molecule_name = row.get('molecule_pref_name', 'Unknown')
smiles = row['smiles']
parent_id = f"ring{ring_size}_mol_{idx}"
# 解析SMILES
mol = Chem.MolFromSmiles(smiles)
if mol is None:
log_message(f"分子 {idx} ({molecule_id}) SMILES解析失败", error_log_file, 'error')
return None, 'parse_failed'
# 分配编号
numbering, lactone_c = assign_ring_numbering_general(mol, ring_size)
# 检查环上是否有多个内酯键
if numbering == 'multiple_lactones':
log_message(
f"分子 {idx} ({molecule_id}, {molecule_name}) 环上有{len(lactone_c)}个内酯键,已剔除。环上内酯碳索引: {lactone_c}",
multiple_lactone_log,
'info'
)
return None, 'multiple_lactones'
if numbering is None:
log_message(f"分子 {idx} ({molecule_id}) 编号失败", error_log_file, 'error')
return None, 'numbering_failed'
# 验证编号
if not validate_numbering_general(numbering, ring_size):
log_message(f"分子 {idx} ({molecule_id}) 编号验证失败", error_log_file, 'error')
return None, 'validation_failed'
# 侧链断裂
try:
mol_fragments = cleave_side_chains(mol, smiles, parent_id, numbering)
except Exception as e:
log_message(f"分子 {idx} ({molecule_id}) 裂解失败: {str(e)}", error_log_file, 'error')
return None, 'cleavage_failed'
# 创建分子专属文件夹
mol_output_dir = output_base_dir / parent_id
mol_output_dir.mkdir(parents=True, exist_ok=True)
# 保存整个MoleculeFragments对象
mol_fragments_path = mol_output_dir / f"{parent_id}_all_fragments.json"
mol_fragments.to_json_file(str(mol_fragments_path))
# 保存每个碎片
for frag in mol_fragments.fragments:
frag_path = mol_output_dir / f"{frag.fragment_id}.json"
frag.to_json_file(str(frag_path))
# 保存分子信息到metadata
metadata = {
'parent_id': parent_id,
'molecule_id': molecule_id,
'molecule_name': molecule_name,
'smiles': smiles,
'ring_size': ring_size,
'num_fragments': len(mol_fragments.fragments),
'processing_date': datetime.now().isoformat()
}
metadata_path = mol_output_dir / 'metadata.json'
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
return len(mol_fragments.fragments), 'success'
except Exception as e:
log_message(f"分子 {idx} 未预期错误: {str(e)}", error_log_file, 'error')
return None, 'unexpected_error'
def log_message(message, log_file, log_type='info'):
"""记录日志信息"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {message}\n"
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_entry)
if log_type == 'error':
tqdm.write(f"{message}")
elif log_type == 'info' and 'multiple' not in log_file.name:
tqdm.write(f"{message}")
def main():
print("="*80)
print("12-20元环大环内酯批量处理程序")
print("="*80)
print()
# 读取数据
input_file = Path('data/ring12_20/temp.csv')
if not input_file.exists():
print(f"❌ 错误: 找不到输入文件 {input_file}")
return
df = pd.read_csv(input_file)
print(f"✓ 读取数据集: {len(df)} 个分子")
print(f" 文件: {input_file}")
print()
# 第一步:按环大小分类
print("步骤 1: 按环大小分类分子...")
print()
ring_size_groups = defaultdict(list)
for idx in tqdm(range(len(df)), desc="分类进度"):
smiles = df.iloc[idx]['smiles']
ring_sizes = get_ring_size_from_smiles(smiles)
for ring_size in ring_sizes:
if 12 <= ring_size <= 20:
ring_size_groups[ring_size].append(idx)
print()
print("分类结果:")
for ring_size in sorted(ring_size_groups.keys()):
print(f" {ring_size}元环: {len(ring_size_groups[ring_size])} 个分子")
print()
# 第二步:对每种环大小分别处理
print("步骤 2: 处理各环大小分子...")
print()
all_stats = {}
for ring_size in sorted(ring_size_groups.keys()):
indices = ring_size_groups[ring_size]
print(f"\n处理 {ring_size}元环 ({len(indices)} 个分子)...")
print("-" * 60)
# 创建输出文件夹
output_base_dir = Path(f'output/ring{ring_size}_fragments')
output_base_dir.mkdir(parents=True, exist_ok=True)
# 创建日志文件
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = output_base_dir / f'processing_log_{timestamp}.txt'
error_log_file = output_base_dir / f'error_log_{timestamp}.txt'
multiple_lactone_log = output_base_dir / f'multiple_lactone_log_{timestamp}.txt'
# 统计信息
stats = {
'ring_size': ring_size,
'total': len(indices),
'success': 0,
'failed_parse': 0,
'failed_numbering': 0,
'failed_validation': 0,
'failed_cleavage': 0,
'failed_unexpected': 0,
'multiple_lactones': 0,
'total_fragments': 0
}
log_message(f"开始处理 {len(indices)}{ring_size}元环分子", log_file)
# 处理每个分子
for idx in tqdm(indices, desc=f"Ring-{ring_size}"):
num_fragments, status = process_single_molecule(
idx, df.iloc[idx], ring_size,
output_base_dir, log_file, error_log_file, multiple_lactone_log
)
if status == 'success':
stats['success'] += 1
stats['total_fragments'] += num_fragments
elif status == 'parse_failed':
stats['failed_parse'] += 1
elif status == 'numbering_failed':
stats['failed_numbering'] += 1
elif status == 'validation_failed':
stats['failed_validation'] += 1
elif status == 'cleavage_failed':
stats['failed_cleavage'] += 1
elif status == 'multiple_lactones':
stats['multiple_lactones'] += 1
else:
stats['failed_unexpected'] += 1
# 保存统计结果
stats_file = output_base_dir / 'processing_statistics.json'
with open(stats_file, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2)
all_stats[ring_size] = stats
# 打印该环大小的统计
print()
print(f"{ring_size}元环处理结果:")
print(f" 成功: {stats['success']}/{stats['total']} ({stats['success']/stats['total']*100:.1f}%)")
print(f" 碎片数: {stats['total_fragments']}")
if stats['success'] > 0:
print(f" 平均碎片: {stats['total_fragments']/stats['success']:.2f} 个/分子")
print(f" 多个内酯键: {stats['multiple_lactones']}")
log_message(f"处理完成: 成功 {stats['success']}/{stats['total']}", log_file)
# 第三步:总结
print()
print("="*80)
print("所有环大小处理完成!")
print("="*80)
print()
print("总体统计:")
total_molecules = 0
total_success = 0
total_fragments = 0
total_multiple_lactones = 0
for ring_size, stats in sorted(all_stats.items()):
total_molecules += stats['total']
total_success += stats['success']
total_fragments += stats['total_fragments']
total_multiple_lactones += stats['multiple_lactones']
print(f" 总分子数: {total_molecules}")
print(f" 总成功数: {total_success} ({total_success/total_molecules*100:.1f}%)")
print(f" 总碎片数: {total_fragments}")
if total_success > 0:
print(f" 平均碎片: {total_fragments/total_success:.2f} 个/分子")
print(f" 多内酯键分子: {total_multiple_lactones}")
print()
print("各环大小输出文件夹:")
for ring_size in sorted(all_stats.keys()):
output_dir = Path(f'output/ring{ring_size}_fragments')
print(f" {ring_size}元环: {output_dir}")
print()
if __name__ == '__main__':
main()