From 2041bd4ec37826d027baaecee615463655767f9e Mon Sep 17 00:00:00 2001 From: mm644706215 Date: Tue, 11 Mar 2025 18:09:30 +0800 Subject: [PATCH] generate_3d_structures --- pycomsia/src/balloon_rdkit_pipeline.py | 339 +++++++++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100644 pycomsia/src/balloon_rdkit_pipeline.py diff --git a/pycomsia/src/balloon_rdkit_pipeline.py b/pycomsia/src/balloon_rdkit_pipeline.py new file mode 100644 index 0000000..d7429f1 --- /dev/null +++ b/pycomsia/src/balloon_rdkit_pipeline.py @@ -0,0 +1,339 @@ +#!/usr/bin/env python +""" +完整训练代码示例 + +功能: +1. 从 CSV 文件中读取 SMILES 和目标变量,计算2D描述符(利用 RDKit 与 Mordred),并对描述符进行 SelectKBest 特征选择。 +2. 根据 CSV 文件中的 SMILES 生成分子的3D构象,然后利用 MolecularGridCalculator 与 MolecularFieldCalculator 计算3D‐QSAR场特征, + 将5个场(steric、electrostatic、hydrophobic、hbond_donor、hbond_acceptor)展平后合并为一个特征向量。 +3. 合并2D与3D特征,并使用随机森林和 XGBoost 进行回归训练,同时支持 Optuna 超参数调优。 +4. 通过命令行传递 CSV 文件路径和目标变量名称,使用 click 库实现,并提供详细帮助信息。 + +用法示例: + python main.py --data-smi data_smi.csv --target MIC_LOG_ATCC25923 +""" + +import os +import numpy as np +import pandas as pd +import click +from rdkit import Chem +from rdkit.Chem import Descriptors, AllChem +from mordred import Calculator, descriptors +from sklearn.feature_selection import SelectKBest, f_regression +from sklearn.model_selection import train_test_split +from sklearn.ensemble import RandomForestRegressor +import xgboost as xgb +from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error +import optuna + +# 导入3D相关模块(请确保 MolecularGridCalculator.py 和 MolecularFieldCalculator.py 在同一目录下) +from MolecularGridCalculator import MolecularGridCalculator +from MolecularFieldCalculator import MolecularFieldCalculator + +# ------------------------ 辅助函数 ------------------------ +def safe_nunique(series): + """安全计算唯一值数量,防止非标量数据出错""" + try: + return series.nunique(dropna=False) + except Exception: + return series.apply(lambda x: tuple(x) if isinstance(x, (np.ndarray, list)) and not pd.isnull(x) else x).nunique(dropna=False) + +# ------------------------ 2D描述符计算 ------------------------ +def get_rdkit_descriptors(smiles, missingVal=None): + """利用 RDKit 计算分子所有描述符""" + mol = Chem.MolFromSmiles(smiles) + res = {} + if mol is None: + for nm, _ in Descriptors._descList: + res[nm] = missingVal + return res + for nm, fn in Descriptors._descList: + try: + res[nm] = fn(mol) + except Exception: + res[nm] = missingVal + return res + +def get_mordred_descriptors(smiles): + """利用 Mordred 计算分子描述符""" + mol = Chem.MolFromSmiles(smiles) + if mol is None: + return {} + calc = Calculator(descriptors) + try: + result = calc(mol) + return result.asdict() + except Exception: + return {} + +def compute_2d_features(csv_file): + data = pd.read_csv(csv_file) + # RDKit描述符 + rdkit_desc_list = data["SMILES"].apply(get_rdkit_descriptors) + X_df_rdkit = pd.DataFrame(rdkit_desc_list.tolist()) + # 转换为数值类型并用均值填充(针对数值列) + X_df_rdkit = X_df_rdkit.apply(pd.to_numeric, errors='coerce') + X_df_rdkit = X_df_rdkit.fillna(X_df_rdkit.mean(numeric_only=True)) + + # Mordred描述符 + mordred_desc_list = [] + for smi in data["SMILES"]: + mordred_desc_list.append(get_mordred_descriptors(smi)) + X_df_mordred = pd.DataFrame(mordred_desc_list) + X_df_mordred.dropna(axis=1, how='all', inplace=True) + invalid_features = [col for col in X_df_mordred.columns if safe_nunique(X_df_mordred[col]) <= 1] + if invalid_features: + print("Mordred 删除恒定特征:", invalid_features) + X_df_mordred.drop(columns=invalid_features, inplace=True) + X_df_mordred = X_df_mordred.apply(pd.to_numeric, errors='coerce') + X_df_mordred = X_df_mordred.fillna(X_df_mordred.mean(numeric_only=True)) + + # 合并两部分描述符 + X_df = pd.concat([X_df_rdkit, X_df_mordred], axis=1) + X_df = X_df.loc[:, ~X_df.columns.duplicated()] + constant_features = [col for col in X_df.columns if safe_nunique(X_df[col]) <= 1] + if constant_features: + print("Combined 删除恒定特征:", constant_features) + X_df.drop(columns=constant_features, inplace=True) + + # 检查哪些列仍包含非数值数据(即转换时出现了 NaN,但原始数据非空) + non_numeric_features = {} + for col in X_df.columns: + # 尝试将列转换为数值 + col_numeric = pd.to_numeric(X_df[col], errors='coerce') + # 如果原始列中有非空值,但转换后对应位置为NaN,则说明存在非数值数据 + mask = X_df[col].notna() & col_numeric.isna() + if mask.any(): + unique_vals = set(X_df.loc[mask, col].unique()) + non_numeric_features[col] = unique_vals + if non_numeric_features: + print("以下特征包含非数值数据(经过 set 去重):") + for col, uniq in non_numeric_features.items(): + print(f"{col}: {uniq}") + else: + print("所有特征均为数值类型。") + + # 返回处理好的 DataFrame + return X_df + +def select_2d_features(X_df, y, k=10): + """ + 使用 SelectKBest 选择 2D 描述符的前 k 个特征, + 返回转换后的特征矩阵和选中特征名称。 + """ + selector = SelectKBest(score_func=f_regression, k=k) + selector.fit(X_df, y) + selected_features = X_df.columns[selector.get_support()].tolist() + print("SelectKBest 选中的2D特征:", selected_features) + X_selected = selector.transform(X_df) + return X_selected, selected_features + +# ------------------------ 3D-QSAR特征计算(从 CSV 中 SMILES 生成3D构象) ------------------------ +def generate_3d_mols_from_csv(csv_file, mmffVariant='MMFF94'): + """ + 从 CSV 文件中读取 SMILES,生成分子的3D构象(添加氢原子、嵌入构象并进行MMFF能量最小化)。 + 针对大环分子,启用了宏环扭转角优化并增大了嵌入尝试次数。 + + mmffVariant:MMFF94或MMFF94S,默认为 MMFF94 + + 返回值:分子列表,每个元素为 (mol, True) + """ + import pandas as pd + from rdkit import Chem + from rdkit.Chem import AllChem + + data = pd.read_csv(csv_file) + mols = [] + # 距离几何+ETKDG生成3D构象 + for smi in data["SMILES"]: + mol = Chem.MolFromSmiles(smi) + if mol is None: + print(f"Warning: 无法从 SMILES {smi} 生成分子。") + continue + m3d = Chem.AddHs(mol) + AllChem.EmbedMolecule(m3d, randomSeed=10, useMacrocycleTorsions=True) + # MMFF生成3D构象 优化 + if m3d.GetNumConformers() > 0: + AllChem.MMFFOptimizeMolecule(m3d) + mols.append(m3d) + else: + print(f"Warning: 分子 {smi} 未生成构象。") + + aligned_results = [(mol, True) for mol in mols if mol.GetNumConformers() > 0] + return aligned_results + + +def compute_3d_features_with_params_from_csv(csv_file, grid_spacing, padding, alpha): + """ + 根据 CSV 文件中的 SMILES 生成分子的3D构象后,利用 MolecularGridCalculator 与 MolecularFieldCalculator 计算3D-QSAR场特征, + 参数 grid_spacing、padding 和 alpha 可调。 + 返回一个 shape=(n_samples, feature_dim) 的特征矩阵。 + """ + # 生成3D分子 + aligned_results = generate_3d_mols_from_csv(csv_file) + + grid_calc = MolecularGridCalculator() + field_calc = MolecularFieldCalculator() + + grid_spacing_tuple, grid_dimensions, grid_origin = grid_calc.generate_grid(aligned_results, resolution=grid_spacing, padding=padding) + # 修改高斯衰减参数 alpha + field_calc.ALPHA = alpha + fields_dict = field_calc.calc_field(aligned_results, grid_spacing_tuple, grid_dimensions, grid_origin) + + selected_field_names = ["steric_field", "electrostatic_field", "hydrophobic_field", "hbond_donor_field", "hbond_acceptor_field"] + X_3d_list = [] + n_mols = len(aligned_results) + for i in range(n_mols): + feat_vec = [] + for field in selected_field_names: + field_vec = fields_dict['train_fields'][field][i] + feat_vec.extend(field_vec) + X_3d_list.append(feat_vec) + X_3d = np.array(X_3d_list) + return X_3d + +def split_fields_from_X3d(X_3d_all, field_dims): + """ + 根据各场的维度将3D特征矩阵 X_3d_all 拆分为列表, + field_dims 为各场展平后特征的维度列表。 + """ + fields = [] + start = 0 + for dim in field_dims: + fields.append(X_3d_all[:, start:start+dim]) + start += dim + return fields + +# ------------------------ 模型训练与评估 ------------------------ +def evaluate_model(X, y, random_state=42): + """ + 将数据划分为80:20,训练随机森林和 XGBoost 回归模型, + 返回各模型的 R²、RMSE、MAE 指标字典。 + """ + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=random_state) + + models = { + "RandomForest": RandomForestRegressor(random_state=random_state), + "XGBoost": xgb.XGBRegressor(random_state=random_state, verbosity=0) + } + + results = {} + for name, model in models.items(): + model.fit(X_train, y_train) + y_pred = model.predict(X_test) + r2 = r2_score(y_test, y_pred) + rmse = np.sqrt(mean_squared_error(y_test, y_pred)) + mae = mean_absolute_error(y_test, y_pred) + results[name] = {"R2": r2, "RMSE": rmse, "MAE": mae} + return results + +# ------------------------ 超参数调优(Optuna) ------------------------ +def objective(trial): + # 超参数:选择使用哪些3D场 + use_steric = trial.suggest_categorical("use_steric", [True, False]) + use_electrostatic = trial.suggest_categorical("use_electrostatic", [True, False]) + use_hydrophobic = trial.suggest_categorical("use_hydrophobic", [True, False]) + use_hbond_donor = trial.suggest_categorical("use_hbond_donor", [True, False]) + use_hbond_acceptor = trial.suggest_categorical("use_hbond_acceptor", [True, False]) + + # 对每个场设置权重(若未选中则为0) + weight_steric = trial.suggest_float("weight_steric", 0.0, 2.0) if use_steric else 0.0 + weight_electrostatic = trial.suggest_float("weight_electrostatic", 0.0, 2.0) if use_electrostatic else 0.0 + weight_hydrophobic = trial.suggest_float("weight_hydrophobic", 0.0, 2.0) if use_hydrophobic else 0.0 + weight_hbond_donor = trial.suggest_float("weight_hbond_donor", 0.0, 2.0) if use_hbond_donor else 0.0 + weight_hbond_acceptor = trial.suggest_float("weight_hbond_acceptor", 0.0, 2.0) if use_hbond_acceptor else 0.0 + + # 网格与衰减参数 + grid_spacing = trial.suggest_float("grid_spacing", 0.5, 2.0) + alpha = trial.suggest_float("alpha", 0.1, 1.0) + + # 随机森林超参数 + n_estimators = trial.suggest_int("n_estimators", 50, 300) + max_depth = trial.suggest_int("max_depth", 3, 15) + + # 计算3D特征(从 CSV 中生成3D构象) + csv_file = click.get_current_context().params.get("data_smi") + X_3d_all = compute_3d_features_with_params_from_csv(csv_file, grid_spacing=grid_spacing, padding=3, alpha=alpha) + # 假设每个场展平后的维度为 total_dim/5 + dim_per_field = X_3d_all.shape[1] // 5 + field_dims = [dim_per_field] * 5 + X_3d_fields = split_fields_from_X3d(X_3d_all, field_dims) + + # 根据选择情况与权重组合3D特征 + selected_fields = [] + for flag, weight, field in zip( + [use_steric, use_electrostatic, use_hydrophobic, use_hbond_donor, use_hbond_acceptor], + [weight_steric, weight_electrostatic, weight_hydrophobic, weight_hbond_donor, weight_hbond_acceptor], + X_3d_fields): + if flag: + selected_fields.append(field * weight) + if selected_fields: + X_3d_selected = np.hstack(selected_fields) + else: + X_3d_selected = np.zeros((X_3d_all.shape[0], 1)) + + # 读取2D特征(SelectKBest后的结果) + data = pd.read_csv(csv_file) + target = click.get_current_context().params.get("target") + y = data[target].values + X_df = compute_2d_features(csv_file) + X_2d_selected, _ = select_2d_features(X_df, y, k=10) + + # 合并2D与3D特征 + X_combined = np.hstack((X_2d_selected, X_3d_selected)) + + # 划分训练/验证集并评估模型 + X_train, X_val, y_train, y_val = train_test_split(X_combined, y, test_size=0.2, random_state=42) + model = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, random_state=42) + model.fit(X_train, y_train) + y_pred = model.predict(X_val) + rmse = np.sqrt(mean_squared_error(y_val, y_pred)) + return rmse + +# ------------------------ 主程序入口(Click命令行) ------------------------ +@click.command() +@click.option('--data-smi', required=True, type=click.Path(exists=True), + help="包含 SMILES 和目标变量的 CSV 文件路径。") +@click.option('--target', default="MIC_LOG_ATCC25923", + help="目标变量列名,默认:MIC_LOG_ATCC25923。") +def cli(data_smi, target): + """ + 通过命令行启动QSAR模型训练和超参数调优流程。 + + 示例: + python main.py --data-smi data_smi.csv --target MIC_LOG_ATCC25923 + """ + ctx = click.get_current_context() + ctx.params["data_smi"] = data_smi + ctx.params["target"] = target + + # 计算2D描述符和SelectKBest特征 + data = pd.read_csv(data_smi) + y = data[target].values + X_df = compute_2d_features(data_smi) + X_2d_selected, selected_feats = select_2d_features(X_df, y, k=10) + click.echo("2D特征选择完毕:{}".format(selected_feats)) + + # 计算3D特征(从 CSV 中 SMILES生成3D构象) + X_3d = compute_3d_features_with_params_from_csv(data_smi, grid_spacing=1.0, padding=3, alpha=0.3) + click.echo("3D特征计算完毕。") + + # 合并2D与3D特征 + X_combined = np.hstack((X_2d_selected, X_3d)) + + click.echo("开始初步模型训练评估(不调超参)...") + results_2d = evaluate_model(X_2d_selected, y) + results_combined = evaluate_model(X_combined, y) + click.echo("仅2D描述符模型结果:{}".format(results_2d)) + click.echo("2D + 3D描述符模型结果:{}".format(results_combined)) + + click.echo("开始使用Optuna进行超参数调优...") + study = optuna.create_study(direction="minimize") + study.optimize(objective, n_trials=50) + click.echo("Optuna最佳试验结果:") + click.echo(study.best_trial) + +if __name__ == "__main__": + cli() +# python main.py --data-smi /root/project/qsar/1d-qsar/data_smi.csv --target MIC_LOG_ATCC25923