Files
qsar/1d-qsar/cuda/mordred/RFE_cpu_permutation_mordred.py
2025-03-11 12:24:38 +08:00

225 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@file: RFE_cpu_mordred.py
@Description: 使用纯 CPU 版本完成分子描述符提取和特征选择:
1. 利用 mordred 计算所有分子描述符
2. 使用手动 RFECV基于置换重要性逐步移除不重要特征
3. 动态评估随机森林树的数量n_estimators直至 CV MSE 收敛
4. 使用最佳特征子集和最优树数训练最终模型
@Date: 2025/03/04
@Author: your_name
'''
# 添加猴子补丁,确保 numpy.product 存在
import numpy as np
if not hasattr(np, "product"):
np.product = np.prod
import pandas as pd
import matplotlib.pyplot as plt
from multiprocessing import freeze_support
freeze_support()
# ---------------------------
# 数据加载及特征构造(使用 mordred 计算所有分子描述符)
# ---------------------------
data = pd.read_csv("../../data_smi.csv", index_col="Entry ID")
target_data = data[["SMILES", "S.aureus ATCC25923"]]
target_data.columns = ["SMILES", "TARGET"]
from rdkit import Chem
from mordred import Calculator, descriptors, is_missing
# 创建 mordred 计算器,注册所有描述符
calc = Calculator(descriptors)
desc_list = []
for smi in target_data['SMILES']:
mol = Chem.MolFromSmiles(smi)
if mol is None:
# 若 SMILES 转换失败,则保存空字典
desc_list.append({})
continue
try:
result = calc(mol)
# 将计算结果转换为字典形式
desc_dict = result.asdict()
except Exception as e:
desc_dict = {}
desc_list.append(desc_dict)
# 构建描述符 DataFrame
X_df = pd.DataFrame(desc_list)
# 清理:删除全为空的列和只有单一取值的列
X_df.dropna(axis=1, how='all', inplace=True)
invalid_features = [col for col in X_df.columns if X_df[col].nunique(dropna=False) <= 1]
X_df.drop(columns=invalid_features, inplace=True)
# 将所有列转换为数值类型,无法转换的置为 NaN再用均值填充
X_df = X_df.apply(pd.to_numeric, errors='coerce')
X_df = X_df.fillna(X_df.mean())
X = X_df.values.astype(np.float64)
y = target_data['TARGET'].values.astype(np.float64)
print(f"Number of training samples: {len(y)}, Number of features: {X.shape[1]}")
# ---------------------------
# 辅助函数:计算模型均方误差 (MSE)
# ---------------------------
def model_mse(model, X_data, y_data):
y_pred = model.predict(X_data)
mse = np.mean((y_data - y_pred) ** 2)
return mse
# ---------------------------
# 基于 CPU 的置换重要性函数
# ---------------------------
def permutation_importance_cpu(model, X_data, y_data, n_repeats=3, random_state=0):
baseline = model_mse(model, X_data, y_data)
importances = np.zeros(X_data.shape[1])
rng = np.random.RandomState(random_state)
for j in range(X_data.shape[1]):
scores = []
for _ in range(n_repeats):
X_permuted = X_data.copy()
rng.shuffle(X_permuted[:, j])
score = model_mse(model, X_permuted, y_data)
scores.append(score)
importances[j] = np.mean(scores) - baseline
return importances
# ---------------------------
# 交叉验证函数:使用 scikit-learn 的 RandomForestRegressor启用多核
# ---------------------------
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import KFold
def cross_val_score_cpu(X_data, y_data, cv=5, n_estimators=100):
kf = KFold(n_splits=cv, shuffle=True, random_state=0)
mse_scores = []
for train_index, test_index in kf.split(X_data):
X_train = X_data[train_index, :]
X_test = X_data[test_index, :]
y_train = y_data[train_index]
y_test = y_data[test_index]
model = RandomForestRegressor(n_estimators=n_estimators, random_state=0, n_jobs=-1)
model.fit(X_train, y_train)
mse = model_mse(model, X_test, y_test)
mse_scores.append(mse)
return np.mean(mse_scores)
# ---------------------------
# 动态评估树的数量:不断增大 n_estimators 直至 CV MSE 收敛
# ---------------------------
def evaluate_n_estimators_dynamic(X_data, y_data, cv=5, start=50, step=50, threshold=1e-3, max_estimators=1000):
results = {}
candidate = start
prev_mse = None
while candidate <= max_estimators:
mse = cross_val_score_cpu(X_data, y_data, cv=cv, n_estimators=candidate)
results[candidate] = mse
print(f"n_estimators: {candidate}, CV MSE: {mse:.4f}")
if prev_mse is not None and abs(prev_mse - mse) < threshold:
break
prev_mse = mse
candidate += step
return results
tree_results = evaluate_n_estimators_dynamic(X, y, cv=5, start=50, step=50, threshold=1e-3, max_estimators=1000)
plt.figure(figsize=(8, 5))
plt.plot(list(tree_results.keys()), list(tree_results.values()), marker='o')
plt.xlabel('Random Forest n_estimators')
plt.ylabel('CV MSE')
plt.title('CV MSE vs Number of Trees (CPU)')
plt.grid(True)
plt.savefig("tree_vs_cv_mse_cpu.png", dpi=300)
plt.close()
best_n_estimators = min(tree_results, key=tree_results.get)
print(f"Optimal number of trees determined: {best_n_estimators}")
# ---------------------------
# 手动 RFECV 实现(基于置换重要性)
# ---------------------------
def manual_rfecv(X_data, y_data, feature_names, cv=5, n_estimators=100):
n_features = X_data.shape[1]
features = list(range(n_features))
best_score = float('inf')
best_features = features.copy()
scores_history = []
perm_imp_history = [] # 记录每次移除特征时的置换重要性
removed_feature_names = [] # 记录被移除的特征名称
while len(features) > 0:
current_score = cross_val_score_cpu(X_data[:, features], y_data, cv=cv, n_estimators=n_estimators)
scores_history.append((features.copy(), current_score))
print(f"Current number of features: {len(features)}, CV MSE: {current_score:.4f}")
if current_score < best_score:
best_score = current_score
best_features = features.copy()
if len(features) == 1:
break
model = RandomForestRegressor(n_estimators=n_estimators, random_state=0, n_jobs=-1)
model.fit(X_data[:, features], y_data)
imp = permutation_importance_cpu(model, X_data[:, features], y_data, n_repeats=3, random_state=0)
idx_to_remove = int(np.argmin(imp))
removed_feature = features[idx_to_remove]
removed_imp = imp[idx_to_remove]
removed_feature_name = feature_names[removed_feature]
perm_imp_history.append(removed_imp)
removed_feature_names.append(removed_feature_name)
print(f"Removed feature index: {removed_feature}, feature name: {removed_feature_name}, permutation importance: {removed_imp:.4f}")
del features[idx_to_remove]
return best_features, best_score, scores_history, perm_imp_history, removed_feature_names
cv_folds = 5
best_features_rfecv, best_mse_rfecv, history, perm_imp_history, removed_feature_names = manual_rfecv(
X, y, feature_names=list(X_df.columns), cv=cv_folds, n_estimators=best_n_estimators
)
print(f"\nManual RFECV selected {len(best_features_rfecv)} features, CV MSE: {best_mse_rfecv:.4f}")
selected_feature_names = [X_df.columns[i] for i in best_features_rfecv]
print("Selected feature names:", selected_feature_names)
plt.figure(figsize=(8, 5))
iterations = list(range(1, len(perm_imp_history) + 1))
plt.plot(iterations, perm_imp_history, marker='o')
plt.xlabel('RFECV Iteration')
plt.ylabel('Permutation Importance of Removed Feature')
plt.title('Permutation Importance during RFECV (CPU)')
plt.grid(True)
annotation_threshold = 0.001
for i, imp_val in enumerate(perm_imp_history):
if imp_val > annotation_threshold:
plt.annotate(removed_feature_names[i],
(iterations[i], imp_val),
textcoords="offset points",
xytext=(0, 5),
ha='center')
plt.savefig("rfecv_perm_importance_cpu.png", dpi=300)
plt.close()
perm_importance_dict = {name: imp for name, imp in zip(removed_feature_names, perm_imp_history) if imp > annotation_threshold}
import pickle
with open("perm_importance_dict_cpu.pkl", "wb") as f:
pickle.dump(perm_importance_dict, f)
print("Permutation Importance Dictionary:", perm_importance_dict)
# ---------------------------
# 最终模型训练:使用最佳特征子集和最优的树数训练最终模型
# ---------------------------
final_model = RandomForestRegressor(n_estimators=best_n_estimators, random_state=0, n_jobs=-1)
final_model.fit(X[:, best_features_rfecv], y)
final_cv_mse = cross_val_score_cpu(X[:, best_features_rfecv], y, cv=cv_folds, n_estimators=best_n_estimators)
print(f"\nFinal model (selected features, n_estimators={best_n_estimators}) CV MSE: {final_cv_mse:.4f}")