Files
search_macro/notebooks/04_rdkit_parallel_analysis.ipynb
2025-11-14 18:46:03 +08:00

431 lines
15 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# RDKit Parallel Processing Analysis\n",
"\n",
"分析RDKit匹配场景的并行性和SDF读取性能瓶颈"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"import multiprocessing\n",
"import psutil\n",
"from pathlib import Path\n",
"from rdkit import Chem\n",
"from rdkit.Chem import SDMolSupplier\n",
"from joblib import Parallel, delayed\n",
"import pandas as pd\n",
"import numpy as np\n",
"from tqdm import tqdm\n",
"import matplotlib.pyplot as plt\n",
"\n",
"print(\"=== RDKit并行处理分析 ===\")\n",
"print(f\"可用CPU核心数: {multiprocessing.cpu_count()}\")\n",
"print(f\"使用80%核心: {int(multiprocessing.cpu_count() * 0.8)}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. SDF读取性能测试"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def test_sdf_loading_performance(sdf_path, molecule_counts=[100, 500, 1000, 5000, 10000]):\n",
" \"\"\"测试不同分子数量下的SDF加载性能\"\"\"\n",
" \n",
" loading_times = {}\n",
" memory_usage = {}\n",
" \n",
" print(f\"测试文件: {sdf_path.name}\")\n",
" print(\"分子数量\\t加载时间(s)\\t内存使用(MB)\\t速度(mol/s)\")\n",
" print(\"-\" * 60)\n",
" \n",
" for count in molecule_counts:\n",
" # 记录开始时间和内存\n",
" start_time = time.time()\n",
" process = psutil.Process()\n",
" start_memory = process.memory_info().rss / (1024**2)\n",
" \n",
" # 加载分子\n",
" suppl = SDMolSupplier(str(sdf_path), sanitize=True)\n",
" molecules = []\n",
" \n",
" for i, mol in enumerate(suppl):\n",
" if mol is not None:\n",
" molecules.append(mol)\n",
" if len(molecules) >= count:\n",
" break\n",
" \n",
" # 记录结束时间和内存\n",
" end_time = time.time()\n",
" end_memory = process.memory_info().rss / (1024**2)\n",
" \n",
" loading_time = end_time - start_time\n",
" memory_used = end_memory - start_memory\n",
" speed = len(molecules) / loading_time if loading_time > 0 else 0\n",
" \n",
" loading_times[count] = loading_time\n",
" memory_usage[count] = memory_used\n",
" \n",
" print(f\"{count}\\t\\t{loading_time:.3f}\\t\\t{memory_used:.1f}\\t\\t{speed:.1f}\")\n",
" \n",
" return loading_times, memory_usage\n",
"\n",
"# 找一个测试文件\n",
"extracted_sdf_dir = Path('../extracted_sdf_files')\n",
"sdf_files = list(extracted_sdf_dir.rglob('*.sdf'))\n",
"\n",
"if sdf_files:\n",
" test_file = sdf_files[0]\n",
" loading_times, memory_usage = test_sdf_loading_performance(test_file)\n",
"else:\n",
" print(\"未找到SDF文件\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. RDKit匹配性能测试"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def test_matching_performance(molecules, patterns_dict, molecule_counts=[100, 500, 1000, 5000]):\n",
" \"\"\"测试不同分子数量下的匹配性能\"\"\"\n",
" \n",
" def match_single_molecule(mol, patterns):\n",
" matches = {}\n",
" for pattern_name, pattern in patterns.items():\n",
" try:\n",
" matches[pattern_name] = mol.HasSubstructMatch(pattern)\n",
" except:\n",
" matches[pattern_name] = False\n",
" return matches\n",
" \n",
" matching_times = {}\n",
" \n",
" print(\"分子数量\\t匹配时间(s)\\t速度(mol/s)\\t平均时间/分子(ms)\")\n",
" print(\"-\" * 60)\n",
" \n",
" for count in molecule_counts:\n",
" if count > len(molecules):\n",
" continue\n",
" \n",
" test_mols = molecules[:count]\n",
" \n",
" start_time = time.time()\n",
" \n",
" for mol in test_mols:\n",
" match_single_molecule(mol, patterns_dict)\n",
" \n",
" end_time = time.time()\n",
" matching_time = end_time - start_time\n",
" speed = count / matching_time if matching_time > 0 else 0\n",
" avg_time_per_mol = (matching_time / count) * 1000 # 转换为毫秒\n",
" \n",
" matching_times[count] = matching_time\n",
" \n",
" print(f\"{count}\\t\\t{matching_time:.3f}\\t\\t{speed:.1f}\\t\\t{avg_time_per_mol:.2f}\")\n",
" \n",
" return matching_times\n",
"\n",
"# 测试匹配性能\n",
"if 'test_file' in locals():\n",
" # 加载一些分子用于测试\n",
" suppl = SDMolSupplier(str(test_file), sanitize=True)\n",
" test_molecules = [mol for mol in suppl if mol is not None][:10000]\n",
" \n",
" # 定义测试SMARTS模式\n",
" test_patterns = {\n",
" 'benzene': Chem.MolFromSmarts('c1ccccc1'),\n",
" 'alcohol': Chem.MolFromSmarts('[OX2H]'),\n",
" 'carboxylic_acid': Chem.MolFromSmarts('C(=O)O'),\n",
" 'amine': Chem.MolFromSmarts('[NX3;H2,H1;!$(NC=O)]')\n",
" }\n",
" \n",
" print(f\"\\n=== 匹配性能测试 (使用{len(test_patterns)}个SMARTS模式) ===\")\n",
" matching_times = test_matching_performance(test_molecules, test_patterns)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. 并行vs串行性能对比"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def compare_parallel_vs_serial(molecules, patterns_dict, n_jobs_list=[1, 2, 4, 8, 16]):\n",
" \"\"\"比较不同并行度下的性能\"\"\"\n",
" \n",
" def match_single_molecule(mol, patterns):\n",
" matches = {}\n",
" for pattern_name, pattern in patterns.items():\n",
" try:\n",
" matches[pattern_name] = mol.HasSubstructMatch(pattern)\n",
" except:\n",
" matches[pattern_name] = False\n",
" return matches\n",
" \n",
" test_mols = molecules[:1000] # 使用1000个分子测试\n",
" results = {}\n",
" \n",
" print(f\"测试分子数量: {len(test_mols)}\")\n",
" print(f\"SMARTS模式数量: {len(patterns_dict)}\")\n",
" print(\"\\n并行度\\t时间(s)\\t速度(mol/s)\\t加速比\\t效率(%)\")\n",
" print(\"-\" * 65)\n",
" \n",
" serial_time = None\n",
" \n",
" for n_jobs in n_jobs_list:\n",
" start_time = time.time()\n",
" \n",
" if n_jobs == 1:\n",
" # 串行处理\n",
" for mol in test_mols:\n",
" match_single_molecule(mol, patterns_dict)\n",
" else:\n",
" # 并行处理\n",
" Parallel(n_jobs=n_jobs, backend='loky')(\n",
" delayed(match_single_molecule)(mol, patterns_dict) \n",
" for mol in test_mols\n",
" )\n",
" \n",
" end_time = time.time()\n",
" processing_time = end_time - start_time\n",
" speed = len(test_mols) / processing_time\n",
" \n",
" if serial_time is None:\n",
" serial_time = processing_time\n",
" speedup = 1.0\n",
" efficiency = 100.0\n",
" else:\n",
" speedup = serial_time / processing_time\n",
" efficiency = (speedup / n_jobs) * 100\n",
" \n",
" results[n_jobs] = {\n",
" 'time': processing_time,\n",
" 'speed': speed,\n",
" 'speedup': speedup,\n",
" 'efficiency': efficiency\n",
" }\n",
" \n",
" print(f\"{n_jobs}\\t\\t{processing_time:.3f}\\t\\t{speed:.1f}\\t\\t{speedup:.2f}x\\t{efficiency:.1f}\")\n",
" \n",
" return results\n",
"\n",
"# 运行并行vs串行对比\n",
"if 'test_molecules' in locals() and 'test_patterns' in locals():\n",
" print(\"\\n=== 并行vs串行性能对比 ===\")\n",
" max_jobs = min(16, int(multiprocessing.cpu_count() * 0.8))\n",
" n_jobs_list = [1, 2, 4, 8, max_jobs] if max_jobs > 8 else [1, 2, 4, max_jobs]\n",
" \n",
" parallel_results = compare_parallel_vs_serial(test_molecules, test_patterns, n_jobs_list)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. I/O瓶颈分析"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def analyze_io_bottleneck():\n",
" \"\"\"分析I/O瓶颈\"\"\"\n",
" \n",
" print(\"=== I/O瓶颈分析 ===\")\n",
" \n",
" # 测试文件读取速度\n",
" if 'test_file' in locals():\n",
" file_size = test_file.stat().st_size / (1024**2) # MB\n",
" \n",
" print(f\"\\n文件信息:\")\n",
" print(f\"文件大小: {file_size:.2f} MB\")\n",
" print(f\"文件路径: {test_file}\")\n",
" \n",
" # 测试纯文件读取速度\n",
" start_time = time.time()\n",
" with open(test_file, 'rb') as f:\n",
" raw_data = f.read()\n",
" raw_read_time = time.time() - start_time\n",
" raw_read_speed = file_size / raw_read_time\n",
" \n",
" print(f\"\\n纯文件读取:\")\n",
" print(f\"读取时间: {raw_read_time:.3f} s\")\n",
" print(f\"读取速度: {raw_read_speed:.2f} MB/s\")\n",
" \n",
" # 测试RDKit解析速度\n",
" start_time = time.time()\n",
" suppl = SDMolSupplier(str(test_file), sanitize=True)\n",
" molecules = [mol for mol in suppl if mol is not None]\n",
" parse_time = time.time() - start_time\n",
" parse_speed = len(molecules) / parse_time\n",
" \n",
" print(f\"\\nRDKit解析:\")\n",
" print(f\"解析时间: {parse_time:.3f} s\")\n",
" print(f\"分子数量: {len(molecules)}\")\n",
" print(f\"解析速度: {parse_speed:.1f} mol/s\")\n",
" \n",
" # 计算I/O占比\n",
" io_percentage = (raw_read_time / parse_time) * 100\n",
" print(f\"\\nI/O时间占比: {io_percentage:.1f}%\")\n",
" \n",
" if io_percentage > 50:\n",
" print(\"⚠️ I/O是主要瓶颈建议:\")\n",
" print(\" - 使用更快的存储(SSD)\")\n",
" print(\" - 预处理SDF文件为其他格式\")\n",
" print(\" - 使用内存映射\")\n",
" else:\n",
" print(\"✅ 计算是主要瓶颈,并行化有效\")\n",
"\n",
"analyze_io_bottleneck()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. 优化建议"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def provide_optimization_recommendations():\n",
" \"\"\"提供优化建议\"\"\"\n",
" \n",
" print(\"=== RDKit并行处理优化建议 ===\")\n",
" \n",
" recommendations = [\n",
" {\n",
" \"问题\": \"SDF文件读取慢\",\n",
" \"原因\": \"RDKit需要解析分子结构sanitization耗时\",\n",
" \"解决方案\": [\n",
" \"1. 预处理SDF为pickle/feather格式\",\n",
" \"2. 禁用不必要的sanitization\",\n",
" \"3. 使用更快的存储介质\",\n",
" \"4. 分批读取避免内存溢出\"\n",
" ]\n",
" },\n",
" {\n",
" \"问题\": \"并行效率低\",\n",
" \"原因\": \"RDKit的GIL限制和进程间通信开销\",\n",
" \"解决方案\": [\n",
" \"1. 使用多进程而非多线程\",\n",
" \"2. 增大每个任务的粒度\",\n",
" \"3. 预编译SMARTS模式\",\n",
" \"4. 使用loky backend减少开销\"\n",
" ]\n",
" },\n",
" {\n",
" \"问题\": \"内存使用过高\",\n",
" \"原因\": \"分子对象在内存中占用空间大\",\n",
" \"解决方案\": [\n",
" \"1. 分批处理\",\n",
" \"2. 及时释放不需要的分子\",\n",
" \"3. 使用生成器而非列表\",\n",
" \"4. 考虑使用SMILES字符串替代分子对象\"\n",
" ]\n",
" }\n",
" ]\n",
" \n",
" for i, rec in enumerate(recommendations, 1):\n",
" print(f\"\\n{i}. {rec['问题']}\")\n",
" print(f\" 原因: {rec['原因']}\")\n",
" print(f\" 解决方案:\")\n",
" for solution in rec['解决方案']:\n",
" print(f\" {solution}\")\n",
" \n",
" print(f\"\\n=== 最佳实践总结 ===\")\n",
" print(\"1. 🚀 对于大量小分子: 并行处理效果好\")\n",
" print(\"2. 📁 对于大文件: 预处理和分批读取更重要\")\n",
" print(\"3. 💾 内存受限: 使用流式处理和生成器\")\n",
" print(\"4. ⚡ CPU密集: 适当增加并行度,但避免过度并行\")\n",
" print(\"5. 🔄 I/O密集: 优化存储和文件格式\")\n",
"\n",
"provide_optimization_recommendations()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 结论\n",
"\n",
"### RDKit + Joblib 并行性分析:\n",
"\n",
"1. **✅ 可以并行**: RDKit的分子匹配是CPU密集型任务适合并行化\n",
"2. **⚠️ 有效但有局限**: 由于GIL和进程开销并行效率通常在60-80%\n",
"3. **📊 最佳并行度**: 通常为CPU核心数的50-80%\n",
"\n",
"### SDF读取性能:\n",
"\n",
"1. **🐌 相对较慢**: SDF解析需要分子sanitization比纯文本读取慢10-100倍\n",
"2. **💾 内存密集**: 每个分子对象在内存中占用较大空间\n",
"3. **🔄 I/O瓶颈**: 对于大文件I/O可能成为主要瓶颈\n",
"\n",
"### 推荐策略:\n",
"\n",
"- **小文件+多模式**: 并行处理效果好\n",
"- **大文件+少模式**: 优化I/O更重要\n",
"- **混合策略**: 预处理 + 并行匹配 + 分批处理"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "search_macro",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}