#!/usr/bin/env python # -*- coding: utf-8 -*- """ 聚类粒度扫描脚本 运行示例: python scripts/cluster_granularity_scan.py \ --csv result/filtered_results/qed_values_trpe_combined_filtered.csv \ --smiles-col smiles \ --radius 3 \ --n-bits 1024 """ import sys, os from pathlib import Path import argparse import pandas as pd import numpy as np from rdkit import Chem, DataStructs from rdkit.Chem import AllChem from sklearn.cluster import AgglomerativeClustering, KMeans, DBSCAN # 把项目根目录加入 sys.path sys.path.append(Path(os.path.abspath(__file__)).parent.parent.as_posix()) from utils.chem_cluster import TanimotoClusterer, FPConfig def tanimoto_matrix(smiles, radius=3, n_bits=1024): fps = [AllChem.GetMorganFingerprintAsBitVect(Chem.MolFromSmiles(s), radius, nBits=n_bits) for s in smiles] n = len(fps) sim_mat = np.zeros((n, n)) for i in range(n): sims = DataStructs.BulkTanimotoSimilarity(fps[i], fps) sim_mat[i, :] = sims return sim_mat def avg_intra_cluster_similarity(labels, sim_mat): """计算每个簇的平均内部相似度""" cluster_sims = [] for lbl in set(labels): idx = np.where(labels == lbl)[0] if len(idx) > 1: sub_sim = sim_mat[np.ix_(idx, idx)] tril_idx = np.tril_indices_from(sub_sim, k=-1) cluster_sims.append(np.mean(sub_sim[tril_idx])) return np.mean(cluster_sims) if cluster_sims else 0 def scan(args): df = pd.read_csv(args.csv) smiles = df[args.smiles_col].astype(str).tolist() # 预先计算相似度矩阵 print("计算 Tanimoto 相似度矩阵...") sim_mat = tanimoto_matrix(smiles, radius=args.radius, n_bits=args.n_bits) dist_mat = 1 - sim_mat # 聚类使用距离矩阵 results = [] # 1. Butina 聚类 from rdkit.ML.Cluster import Butina for cutoff in np.linspace(0.4, 0.8, 5): cluster_res = list(Butina.ClusterData(dist_mat, len(smiles), cutoff, isDistData=True)) labels = np.zeros(len(smiles), dtype=int) for cid, members in enumerate(cluster_res): for m in members: labels[m] = cid avg_sim = avg_intra_cluster_similarity(labels, sim_mat) results.append(("Butina", {"cutoff": round(cutoff, 2)}, len(set(labels)), np.mean(np.bincount(labels)), avg_sim)) # 2. 层次聚类 for thresh in [0.3, 0.4, 0.5]: model = AgglomerativeClustering(n_clusters=None, metric='precomputed', linkage='average', distance_threshold=thresh) labels = model.fit_predict(dist_mat) avg_sim = avg_intra_cluster_similarity(labels, sim_mat) results.append(("Hierarchical", {"threshold": thresh}, len(set(labels)), np.mean(np.bincount(labels)), avg_sim)) # 3. DBSCAN for eps in [0.2, 0.3, 0.4]: model = DBSCAN(eps=eps, min_samples=2, metric="precomputed") labels = model.fit_predict(dist_mat) n_clusters = len(set(labels)) - (1 if -1 in labels else 0) avg_sim = avg_intra_cluster_similarity(labels[labels != -1], sim_mat) results.append(("DBSCAN", {"eps": eps}, n_clusters, np.mean(np.bincount(labels[labels != -1])) if n_clusters > 0 else 0, avg_sim)) # 4. KMeans (先降维再聚类) from sklearn.decomposition import PCA coords = PCA(n_components=10).fit_transform(sim_mat) for k in [10, 20, 50]: model = KMeans(n_clusters=k, random_state=42) labels = model.fit_predict(coords) avg_sim = avg_intra_cluster_similarity(labels, sim_mat) results.append(("KMeans", {"k": k}, len(set(labels)), np.mean(np.bincount(labels)), avg_sim)) # 输出结果表 print(f"{'Method':<15} {'Params':<25} {'#Clusters':<10} {'AvgSize':<10} {'AvgIntraSim':<10}") for r in results: print(f"{r[0]:<15} {str(r[1]):<25} {r[2]:<10} {r[3]:<10.2f} {r[4]:<10.3f}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="聚类粒度扫描") parser.add_argument("--csv", type=str, required=True, help="输入 CSV 文件路径") parser.add_argument("--smiles-col", type=str, required=True, help="SMILES 列名") parser.add_argument("--radius", type=int, default=3, help="Morgan 指纹半径") parser.add_argument("--n-bits", type=int, default=1024, help="指纹位数") args = parser.parse_args() scan(args)