#!/usr/bin/env python # -*- coding: utf-8 -*- """ 聚类粒度扫描脚本 运行示例: python scripts/cluster_granularity_scan.py \ --csv result/filtered_results/qed_values_trpe_combined_filtered.csv \ --smiles-col smiles \ --radius 3 \ --n-bits 1024 Method Params #Clusters AvgSize AvgIntraSim Butina {'cutoff': 0.4} 8960 2.10 0.706 Butina {'cutoff': 0.5} 6720 2.80 0.625 Butina {'cutoff': 0.6} 4648 4.04 0.548 Butina {'cutoff': 0.7} 2783 6.75 0.463 Butina {'cutoff': 0.8} 958 19.61 0.333 Hierarchical {'threshold': 0.3} 12235 1.54 0.814 Hierarchical {'threshold': 0.4} 9603 1.96 0.739 Hierarchical {'threshold': 0.5} 7300 2.57 0.664 DBSCAN {'eps': 0.2} 2050 3.18 0.106 DBSCAN {'eps': 0.3} 2275 4.61 0.113 DBSCAN {'eps': 0.4} 2014 6.65 0.113 KMeans {'k': 10} 10 1878.70 0.204 KMeans {'k': 20} 20 939.35 0.200 KMeans {'k': 50} 50 375.74 0.233 | 列名 | 含义 | | --------------- | --------------------------------- | | **#Clusters** | 聚类后得到的簇数量(独立 cluster 数) | | **AvgSize** | 每个簇平均包含的分子个数 = 样本总数 / 簇数 | | **AvgIntraSim** | 每个簇内部分子两两之间的平均相似度(越接近 1 代表簇内部更相似) | 现在的数据: Butina 在 cutoff=0.4 时 AvgIntraSim=0.706(簇内结构还算比较接近,但簇数非常多)。 Hierarchical 阈值 0.3 时 AvgIntraSim=0.814(更紧密,但簇数更多)。 DBSCAN 和 KMeans 的簇内相似度都低,说明它们在 Tanimoto 上可能不太适合你这个任务。 聚类:用 Butina cutoff ≈ 0.6–0.7 或 Hierarchical 阈值 ≈ 0.5–0.6(保持簇内差异可控,簇数不要太多)。 选代表:每个簇取 1 个中心分子(簇内与其他成员平均相似度最高的那个)。 如果仍想增强多样性,可以在代表集中再跑一次 MaxMin picking。 """ import sys, os from pathlib import Path import argparse import pandas as pd import numpy as np from rdkit import Chem, DataStructs from rdkit.Chem import AllChem from sklearn.cluster import AgglomerativeClustering, KMeans, DBSCAN # 把项目根目录加入 sys.path sys.path.append(Path(os.path.abspath(__file__)).parent.parent.as_posix()) from utils.chem_cluster import TanimotoClusterer, FPConfig def tanimoto_matrix(smiles, radius=3, n_bits=1024): fps = [AllChem.GetMorganFingerprintAsBitVect(Chem.MolFromSmiles(s), radius, nBits=n_bits) for s in smiles] n = len(fps) sim_mat = np.zeros((n, n)) for i in range(n): sims = DataStructs.BulkTanimotoSimilarity(fps[i], fps) sim_mat[i, :] = sims return sim_mat def avg_intra_cluster_similarity(labels, sim_mat): """计算每个簇的平均内部相似度""" cluster_sims = [] for lbl in set(labels): idx = np.where(labels == lbl)[0] if len(idx) > 1: sub_sim = sim_mat[np.ix_(idx, idx)] tril_idx = np.tril_indices_from(sub_sim, k=-1) cluster_sims.append(np.mean(sub_sim[tril_idx])) return np.mean(cluster_sims) if cluster_sims else 0 def scan(args): df = pd.read_csv(args.csv) smiles = df[args.smiles_col].astype(str).tolist() # 预先计算相似度矩阵 print("计算 Tanimoto 相似度矩阵...") sim_mat = tanimoto_matrix(smiles, radius=args.radius, n_bits=args.n_bits) dist_mat = 1 - sim_mat # 聚类使用距离矩阵 results = [] # 1. Butina 聚类 from rdkit.ML.Cluster import Butina for cutoff in np.linspace(0.4, 0.8, 5): cluster_res = list(Butina.ClusterData(dist_mat, len(smiles), cutoff, isDistData=True)) labels = np.zeros(len(smiles), dtype=int) for cid, members in enumerate(cluster_res): for m in members: labels[m] = cid avg_sim = avg_intra_cluster_similarity(labels, sim_mat) results.append(("Butina", {"cutoff": round(cutoff, 2)}, len(set(labels)), np.mean(np.bincount(labels)), avg_sim)) # 2. 层次聚类 for thresh in [0.3, 0.4, 0.5]: model = AgglomerativeClustering(n_clusters=None, metric='precomputed', linkage='average', distance_threshold=thresh) labels = model.fit_predict(dist_mat) avg_sim = avg_intra_cluster_similarity(labels, sim_mat) results.append(("Hierarchical", {"threshold": thresh}, len(set(labels)), np.mean(np.bincount(labels)), avg_sim)) # 3. DBSCAN for eps in [0.2, 0.3, 0.4]: model = DBSCAN(eps=eps, min_samples=2, metric="precomputed") labels = model.fit_predict(dist_mat) n_clusters = len(set(labels)) - (1 if -1 in labels else 0) avg_sim = avg_intra_cluster_similarity(labels[labels != -1], sim_mat) results.append(("DBSCAN", {"eps": eps}, n_clusters, np.mean(np.bincount(labels[labels != -1])) if n_clusters > 0 else 0, avg_sim)) # 4. KMeans (先降维再聚类) from sklearn.decomposition import PCA coords = PCA(n_components=10).fit_transform(sim_mat) for k in [10, 20, 50]: model = KMeans(n_clusters=k, random_state=42) labels = model.fit_predict(coords) avg_sim = avg_intra_cluster_similarity(labels, sim_mat) results.append(("KMeans", {"k": k}, len(set(labels)), np.mean(np.bincount(labels)), avg_sim)) # 输出结果表 print(f"{'Method':<15} {'Params':<25} {'#Clusters':<10} {'AvgSize':<10} {'AvgIntraSim':<10}") for r in results: print(f"{r[0]:<15} {str(r[1]):<25} {r[2]:<10} {r[3]:<10.2f} {r[4]:<10.3f}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="聚类粒度扫描") parser.add_argument("--csv", type=str, required=True, help="输入 CSV 文件路径") parser.add_argument("--smiles-col", type=str, required=True, help="SMILES 列名") parser.add_argument("--radius", type=int, default=3, help="Morgan 指纹半径") parser.add_argument("--n-bits", type=int, default=1024, help="指纹位数") args = parser.parse_args() scan(args)