generate_3d_structures
This commit is contained in:
339
pycomsia/src/balloon_rdkit_pipeline.py
Normal file
339
pycomsia/src/balloon_rdkit_pipeline.py
Normal file
@@ -0,0 +1,339 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
完整训练代码示例
|
||||
|
||||
功能:
|
||||
1. 从 CSV 文件中读取 SMILES 和目标变量,计算2D描述符(利用 RDKit 与 Mordred),并对描述符进行 SelectKBest 特征选择。
|
||||
2. 根据 CSV 文件中的 SMILES 生成分子的3D构象,然后利用 MolecularGridCalculator 与 MolecularFieldCalculator 计算3D‐QSAR场特征,
|
||||
将5个场(steric、electrostatic、hydrophobic、hbond_donor、hbond_acceptor)展平后合并为一个特征向量。
|
||||
3. 合并2D与3D特征,并使用随机森林和 XGBoost 进行回归训练,同时支持 Optuna 超参数调优。
|
||||
4. 通过命令行传递 CSV 文件路径和目标变量名称,使用 click 库实现,并提供详细帮助信息。
|
||||
|
||||
用法示例:
|
||||
python main.py --data-smi data_smi.csv --target MIC_LOG_ATCC25923
|
||||
"""
|
||||
|
||||
import os
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import click
|
||||
from rdkit import Chem
|
||||
from rdkit.Chem import Descriptors, AllChem
|
||||
from mordred import Calculator, descriptors
|
||||
from sklearn.feature_selection import SelectKBest, f_regression
|
||||
from sklearn.model_selection import train_test_split
|
||||
from sklearn.ensemble import RandomForestRegressor
|
||||
import xgboost as xgb
|
||||
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
|
||||
import optuna
|
||||
|
||||
# 导入3D相关模块(请确保 MolecularGridCalculator.py 和 MolecularFieldCalculator.py 在同一目录下)
|
||||
from MolecularGridCalculator import MolecularGridCalculator
|
||||
from MolecularFieldCalculator import MolecularFieldCalculator
|
||||
|
||||
# ------------------------ 辅助函数 ------------------------
|
||||
def safe_nunique(series):
|
||||
"""安全计算唯一值数量,防止非标量数据出错"""
|
||||
try:
|
||||
return series.nunique(dropna=False)
|
||||
except Exception:
|
||||
return series.apply(lambda x: tuple(x) if isinstance(x, (np.ndarray, list)) and not pd.isnull(x) else x).nunique(dropna=False)
|
||||
|
||||
# ------------------------ 2D描述符计算 ------------------------
|
||||
def get_rdkit_descriptors(smiles, missingVal=None):
|
||||
"""利用 RDKit 计算分子所有描述符"""
|
||||
mol = Chem.MolFromSmiles(smiles)
|
||||
res = {}
|
||||
if mol is None:
|
||||
for nm, _ in Descriptors._descList:
|
||||
res[nm] = missingVal
|
||||
return res
|
||||
for nm, fn in Descriptors._descList:
|
||||
try:
|
||||
res[nm] = fn(mol)
|
||||
except Exception:
|
||||
res[nm] = missingVal
|
||||
return res
|
||||
|
||||
def get_mordred_descriptors(smiles):
|
||||
"""利用 Mordred 计算分子描述符"""
|
||||
mol = Chem.MolFromSmiles(smiles)
|
||||
if mol is None:
|
||||
return {}
|
||||
calc = Calculator(descriptors)
|
||||
try:
|
||||
result = calc(mol)
|
||||
return result.asdict()
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def compute_2d_features(csv_file):
|
||||
data = pd.read_csv(csv_file)
|
||||
# RDKit描述符
|
||||
rdkit_desc_list = data["SMILES"].apply(get_rdkit_descriptors)
|
||||
X_df_rdkit = pd.DataFrame(rdkit_desc_list.tolist())
|
||||
# 转换为数值类型并用均值填充(针对数值列)
|
||||
X_df_rdkit = X_df_rdkit.apply(pd.to_numeric, errors='coerce')
|
||||
X_df_rdkit = X_df_rdkit.fillna(X_df_rdkit.mean(numeric_only=True))
|
||||
|
||||
# Mordred描述符
|
||||
mordred_desc_list = []
|
||||
for smi in data["SMILES"]:
|
||||
mordred_desc_list.append(get_mordred_descriptors(smi))
|
||||
X_df_mordred = pd.DataFrame(mordred_desc_list)
|
||||
X_df_mordred.dropna(axis=1, how='all', inplace=True)
|
||||
invalid_features = [col for col in X_df_mordred.columns if safe_nunique(X_df_mordred[col]) <= 1]
|
||||
if invalid_features:
|
||||
print("Mordred 删除恒定特征:", invalid_features)
|
||||
X_df_mordred.drop(columns=invalid_features, inplace=True)
|
||||
X_df_mordred = X_df_mordred.apply(pd.to_numeric, errors='coerce')
|
||||
X_df_mordred = X_df_mordred.fillna(X_df_mordred.mean(numeric_only=True))
|
||||
|
||||
# 合并两部分描述符
|
||||
X_df = pd.concat([X_df_rdkit, X_df_mordred], axis=1)
|
||||
X_df = X_df.loc[:, ~X_df.columns.duplicated()]
|
||||
constant_features = [col for col in X_df.columns if safe_nunique(X_df[col]) <= 1]
|
||||
if constant_features:
|
||||
print("Combined 删除恒定特征:", constant_features)
|
||||
X_df.drop(columns=constant_features, inplace=True)
|
||||
|
||||
# 检查哪些列仍包含非数值数据(即转换时出现了 NaN,但原始数据非空)
|
||||
non_numeric_features = {}
|
||||
for col in X_df.columns:
|
||||
# 尝试将列转换为数值
|
||||
col_numeric = pd.to_numeric(X_df[col], errors='coerce')
|
||||
# 如果原始列中有非空值,但转换后对应位置为NaN,则说明存在非数值数据
|
||||
mask = X_df[col].notna() & col_numeric.isna()
|
||||
if mask.any():
|
||||
unique_vals = set(X_df.loc[mask, col].unique())
|
||||
non_numeric_features[col] = unique_vals
|
||||
if non_numeric_features:
|
||||
print("以下特征包含非数值数据(经过 set 去重):")
|
||||
for col, uniq in non_numeric_features.items():
|
||||
print(f"{col}: {uniq}")
|
||||
else:
|
||||
print("所有特征均为数值类型。")
|
||||
|
||||
# 返回处理好的 DataFrame
|
||||
return X_df
|
||||
|
||||
def select_2d_features(X_df, y, k=10):
|
||||
"""
|
||||
使用 SelectKBest 选择 2D 描述符的前 k 个特征,
|
||||
返回转换后的特征矩阵和选中特征名称。
|
||||
"""
|
||||
selector = SelectKBest(score_func=f_regression, k=k)
|
||||
selector.fit(X_df, y)
|
||||
selected_features = X_df.columns[selector.get_support()].tolist()
|
||||
print("SelectKBest 选中的2D特征:", selected_features)
|
||||
X_selected = selector.transform(X_df)
|
||||
return X_selected, selected_features
|
||||
|
||||
# ------------------------ 3D-QSAR特征计算(从 CSV 中 SMILES 生成3D构象) ------------------------
|
||||
def generate_3d_mols_from_csv(csv_file, mmffVariant='MMFF94'):
|
||||
"""
|
||||
从 CSV 文件中读取 SMILES,生成分子的3D构象(添加氢原子、嵌入构象并进行MMFF能量最小化)。
|
||||
针对大环分子,启用了宏环扭转角优化并增大了嵌入尝试次数。
|
||||
|
||||
mmffVariant:MMFF94或MMFF94S,默认为 MMFF94
|
||||
|
||||
返回值:分子列表,每个元素为 (mol, True)
|
||||
"""
|
||||
import pandas as pd
|
||||
from rdkit import Chem
|
||||
from rdkit.Chem import AllChem
|
||||
|
||||
data = pd.read_csv(csv_file)
|
||||
mols = []
|
||||
# 距离几何+ETKDG生成3D构象
|
||||
for smi in data["SMILES"]:
|
||||
mol = Chem.MolFromSmiles(smi)
|
||||
if mol is None:
|
||||
print(f"Warning: 无法从 SMILES {smi} 生成分子。")
|
||||
continue
|
||||
m3d = Chem.AddHs(mol)
|
||||
AllChem.EmbedMolecule(m3d, randomSeed=10, useMacrocycleTorsions=True)
|
||||
# MMFF生成3D构象 优化
|
||||
if m3d.GetNumConformers() > 0:
|
||||
AllChem.MMFFOptimizeMolecule(m3d)
|
||||
mols.append(m3d)
|
||||
else:
|
||||
print(f"Warning: 分子 {smi} 未生成构象。")
|
||||
|
||||
aligned_results = [(mol, True) for mol in mols if mol.GetNumConformers() > 0]
|
||||
return aligned_results
|
||||
|
||||
|
||||
def compute_3d_features_with_params_from_csv(csv_file, grid_spacing, padding, alpha):
|
||||
"""
|
||||
根据 CSV 文件中的 SMILES 生成分子的3D构象后,利用 MolecularGridCalculator 与 MolecularFieldCalculator 计算3D-QSAR场特征,
|
||||
参数 grid_spacing、padding 和 alpha 可调。
|
||||
返回一个 shape=(n_samples, feature_dim) 的特征矩阵。
|
||||
"""
|
||||
# 生成3D分子
|
||||
aligned_results = generate_3d_mols_from_csv(csv_file)
|
||||
|
||||
grid_calc = MolecularGridCalculator()
|
||||
field_calc = MolecularFieldCalculator()
|
||||
|
||||
grid_spacing_tuple, grid_dimensions, grid_origin = grid_calc.generate_grid(aligned_results, resolution=grid_spacing, padding=padding)
|
||||
# 修改高斯衰减参数 alpha
|
||||
field_calc.ALPHA = alpha
|
||||
fields_dict = field_calc.calc_field(aligned_results, grid_spacing_tuple, grid_dimensions, grid_origin)
|
||||
|
||||
selected_field_names = ["steric_field", "electrostatic_field", "hydrophobic_field", "hbond_donor_field", "hbond_acceptor_field"]
|
||||
X_3d_list = []
|
||||
n_mols = len(aligned_results)
|
||||
for i in range(n_mols):
|
||||
feat_vec = []
|
||||
for field in selected_field_names:
|
||||
field_vec = fields_dict['train_fields'][field][i]
|
||||
feat_vec.extend(field_vec)
|
||||
X_3d_list.append(feat_vec)
|
||||
X_3d = np.array(X_3d_list)
|
||||
return X_3d
|
||||
|
||||
def split_fields_from_X3d(X_3d_all, field_dims):
|
||||
"""
|
||||
根据各场的维度将3D特征矩阵 X_3d_all 拆分为列表,
|
||||
field_dims 为各场展平后特征的维度列表。
|
||||
"""
|
||||
fields = []
|
||||
start = 0
|
||||
for dim in field_dims:
|
||||
fields.append(X_3d_all[:, start:start+dim])
|
||||
start += dim
|
||||
return fields
|
||||
|
||||
# ------------------------ 模型训练与评估 ------------------------
|
||||
def evaluate_model(X, y, random_state=42):
|
||||
"""
|
||||
将数据划分为80:20,训练随机森林和 XGBoost 回归模型,
|
||||
返回各模型的 R²、RMSE、MAE 指标字典。
|
||||
"""
|
||||
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=random_state)
|
||||
|
||||
models = {
|
||||
"RandomForest": RandomForestRegressor(random_state=random_state),
|
||||
"XGBoost": xgb.XGBRegressor(random_state=random_state, verbosity=0)
|
||||
}
|
||||
|
||||
results = {}
|
||||
for name, model in models.items():
|
||||
model.fit(X_train, y_train)
|
||||
y_pred = model.predict(X_test)
|
||||
r2 = r2_score(y_test, y_pred)
|
||||
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
|
||||
mae = mean_absolute_error(y_test, y_pred)
|
||||
results[name] = {"R2": r2, "RMSE": rmse, "MAE": mae}
|
||||
return results
|
||||
|
||||
# ------------------------ 超参数调优(Optuna) ------------------------
|
||||
def objective(trial):
|
||||
# 超参数:选择使用哪些3D场
|
||||
use_steric = trial.suggest_categorical("use_steric", [True, False])
|
||||
use_electrostatic = trial.suggest_categorical("use_electrostatic", [True, False])
|
||||
use_hydrophobic = trial.suggest_categorical("use_hydrophobic", [True, False])
|
||||
use_hbond_donor = trial.suggest_categorical("use_hbond_donor", [True, False])
|
||||
use_hbond_acceptor = trial.suggest_categorical("use_hbond_acceptor", [True, False])
|
||||
|
||||
# 对每个场设置权重(若未选中则为0)
|
||||
weight_steric = trial.suggest_float("weight_steric", 0.0, 2.0) if use_steric else 0.0
|
||||
weight_electrostatic = trial.suggest_float("weight_electrostatic", 0.0, 2.0) if use_electrostatic else 0.0
|
||||
weight_hydrophobic = trial.suggest_float("weight_hydrophobic", 0.0, 2.0) if use_hydrophobic else 0.0
|
||||
weight_hbond_donor = trial.suggest_float("weight_hbond_donor", 0.0, 2.0) if use_hbond_donor else 0.0
|
||||
weight_hbond_acceptor = trial.suggest_float("weight_hbond_acceptor", 0.0, 2.0) if use_hbond_acceptor else 0.0
|
||||
|
||||
# 网格与衰减参数
|
||||
grid_spacing = trial.suggest_float("grid_spacing", 0.5, 2.0)
|
||||
alpha = trial.suggest_float("alpha", 0.1, 1.0)
|
||||
|
||||
# 随机森林超参数
|
||||
n_estimators = trial.suggest_int("n_estimators", 50, 300)
|
||||
max_depth = trial.suggest_int("max_depth", 3, 15)
|
||||
|
||||
# 计算3D特征(从 CSV 中生成3D构象)
|
||||
csv_file = click.get_current_context().params.get("data_smi")
|
||||
X_3d_all = compute_3d_features_with_params_from_csv(csv_file, grid_spacing=grid_spacing, padding=3, alpha=alpha)
|
||||
# 假设每个场展平后的维度为 total_dim/5
|
||||
dim_per_field = X_3d_all.shape[1] // 5
|
||||
field_dims = [dim_per_field] * 5
|
||||
X_3d_fields = split_fields_from_X3d(X_3d_all, field_dims)
|
||||
|
||||
# 根据选择情况与权重组合3D特征
|
||||
selected_fields = []
|
||||
for flag, weight, field in zip(
|
||||
[use_steric, use_electrostatic, use_hydrophobic, use_hbond_donor, use_hbond_acceptor],
|
||||
[weight_steric, weight_electrostatic, weight_hydrophobic, weight_hbond_donor, weight_hbond_acceptor],
|
||||
X_3d_fields):
|
||||
if flag:
|
||||
selected_fields.append(field * weight)
|
||||
if selected_fields:
|
||||
X_3d_selected = np.hstack(selected_fields)
|
||||
else:
|
||||
X_3d_selected = np.zeros((X_3d_all.shape[0], 1))
|
||||
|
||||
# 读取2D特征(SelectKBest后的结果)
|
||||
data = pd.read_csv(csv_file)
|
||||
target = click.get_current_context().params.get("target")
|
||||
y = data[target].values
|
||||
X_df = compute_2d_features(csv_file)
|
||||
X_2d_selected, _ = select_2d_features(X_df, y, k=10)
|
||||
|
||||
# 合并2D与3D特征
|
||||
X_combined = np.hstack((X_2d_selected, X_3d_selected))
|
||||
|
||||
# 划分训练/验证集并评估模型
|
||||
X_train, X_val, y_train, y_val = train_test_split(X_combined, y, test_size=0.2, random_state=42)
|
||||
model = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
|
||||
model.fit(X_train, y_train)
|
||||
y_pred = model.predict(X_val)
|
||||
rmse = np.sqrt(mean_squared_error(y_val, y_pred))
|
||||
return rmse
|
||||
|
||||
# ------------------------ 主程序入口(Click命令行) ------------------------
|
||||
@click.command()
|
||||
@click.option('--data-smi', required=True, type=click.Path(exists=True),
|
||||
help="包含 SMILES 和目标变量的 CSV 文件路径。")
|
||||
@click.option('--target', default="MIC_LOG_ATCC25923",
|
||||
help="目标变量列名,默认:MIC_LOG_ATCC25923。")
|
||||
def cli(data_smi, target):
|
||||
"""
|
||||
通过命令行启动QSAR模型训练和超参数调优流程。
|
||||
|
||||
示例:
|
||||
python main.py --data-smi data_smi.csv --target MIC_LOG_ATCC25923
|
||||
"""
|
||||
ctx = click.get_current_context()
|
||||
ctx.params["data_smi"] = data_smi
|
||||
ctx.params["target"] = target
|
||||
|
||||
# 计算2D描述符和SelectKBest特征
|
||||
data = pd.read_csv(data_smi)
|
||||
y = data[target].values
|
||||
X_df = compute_2d_features(data_smi)
|
||||
X_2d_selected, selected_feats = select_2d_features(X_df, y, k=10)
|
||||
click.echo("2D特征选择完毕:{}".format(selected_feats))
|
||||
|
||||
# 计算3D特征(从 CSV 中 SMILES生成3D构象)
|
||||
X_3d = compute_3d_features_with_params_from_csv(data_smi, grid_spacing=1.0, padding=3, alpha=0.3)
|
||||
click.echo("3D特征计算完毕。")
|
||||
|
||||
# 合并2D与3D特征
|
||||
X_combined = np.hstack((X_2d_selected, X_3d))
|
||||
|
||||
click.echo("开始初步模型训练评估(不调超参)...")
|
||||
results_2d = evaluate_model(X_2d_selected, y)
|
||||
results_combined = evaluate_model(X_combined, y)
|
||||
click.echo("仅2D描述符模型结果:{}".format(results_2d))
|
||||
click.echo("2D + 3D描述符模型结果:{}".format(results_combined))
|
||||
|
||||
click.echo("开始使用Optuna进行超参数调优...")
|
||||
study = optuna.create_study(direction="minimize")
|
||||
study.optimize(objective, n_trials=50)
|
||||
click.echo("Optuna最佳试验结果:")
|
||||
click.echo(study.best_trial)
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
# python main.py --data-smi /root/project/qsar/1d-qsar/data_smi.csv --target MIC_LOG_ATCC25923
|
||||
Reference in New Issue
Block a user