204 lines
6.3 KiB
Python
204 lines
6.3 KiB
Python
#!/usr/bin/env python
|
|
# -*- encoding: utf-8 -*-
|
|
'''
|
|
@file: optimized_RFE_mpi_mordred.py
|
|
@Description: 优化的 MPI 并行特征选择流程
|
|
@Date: 2025/03/04
|
|
@Author: your_name
|
|
'''
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
from mpi4py import MPI
|
|
|
|
# ---------------------------
|
|
# MPI 初始化
|
|
# ---------------------------
|
|
comm = MPI.COMM_WORLD
|
|
rank = comm.Get_rank()
|
|
size = comm.Get_size()
|
|
|
|
# ---------------------------
|
|
# 数据加载及特征构造(仅在 rank 0 执行)
|
|
# ---------------------------
|
|
if rank == 0:
|
|
data = pd.read_csv("../../data_smi.csv", index_col="Entry ID")
|
|
target_data = data[["SMILES", "S.aureus ATCC25923"]]
|
|
target_data.columns = ["SMILES", "TARGET"]
|
|
|
|
from rdkit import Chem
|
|
from mordred import Calculator, descriptors
|
|
|
|
calc = Calculator(descriptors)
|
|
desc_list = []
|
|
|
|
for smi in target_data['SMILES']:
|
|
mol = Chem.MolFromSmiles(smi)
|
|
if mol is None:
|
|
desc_list.append({})
|
|
continue
|
|
try:
|
|
result = calc(mol)
|
|
desc_dict = result.asdict()
|
|
except:
|
|
desc_dict = {}
|
|
desc_list.append(desc_dict)
|
|
|
|
X_df = pd.DataFrame(desc_list)
|
|
X_df.dropna(axis=1, how='all', inplace=True)
|
|
invalid_features = [col for col in X_df.columns if X_df[col].nunique(dropna=False) <= 1]
|
|
X_df.drop(columns=invalid_features, inplace=True)
|
|
X_df = X_df.apply(pd.to_numeric, errors='coerce')
|
|
X_df = X_df.fillna(X_df.mean())
|
|
|
|
X = X_df.values.astype(np.float64)
|
|
y = target_data['TARGET'].values.astype(np.float64)
|
|
feature_names = list(X_df.columns)
|
|
else:
|
|
X, y, feature_names = None, None, None
|
|
|
|
# 广播数据到所有进程
|
|
X = comm.bcast(X, root=0)
|
|
y = comm.bcast(y, root=0)
|
|
feature_names = comm.bcast(feature_names, root=0)
|
|
|
|
# ---------------------------
|
|
# 并行评估工具函数
|
|
# ---------------------------
|
|
def parallel_evaluate(candidates, evaluate_func):
|
|
local_candidates = [c for i, c in enumerate(candidates) if i % size == rank]
|
|
local_results = {c: evaluate_func(c) for c in local_candidates}
|
|
all_results = comm.gather(local_results, root=0)
|
|
if rank == 0:
|
|
merged = {}
|
|
for d in all_results:
|
|
merged.update(d)
|
|
return merged
|
|
return None
|
|
|
|
# ---------------------------
|
|
# 动态评估树的数量(并行版本)
|
|
# ---------------------------
|
|
def evaluate_n_estimators_parallel():
|
|
candidates = list(range(50, 1001, 50))
|
|
|
|
def evaluate(candidate):
|
|
from sklearn.model_selection import KFold
|
|
kf = KFold(n_splits=5, shuffle=True, random_state=0)
|
|
mse_scores = []
|
|
for train_idx, test_idx in kf.split(X):
|
|
X_train, X_test = X[train_idx], X[test_idx]
|
|
y_train, y_test = y[train_idx], y[test_idx]
|
|
model = RandomForestRegressor(n_estimators=candidate, random_state=0, n_jobs=-1)
|
|
model.fit(X_train, y_train)
|
|
y_pred = model.predict(X_test)
|
|
mse_scores.append(np.mean((y_test - y_pred)**2))
|
|
return np.mean(mse_scores)
|
|
|
|
return parallel_evaluate(candidates, evaluate)
|
|
|
|
# ---------------------------
|
|
# 置换重要性计算(并行版本)
|
|
# ---------------------------
|
|
def permutation_importance(model, X_subset):
|
|
baseline = np.mean((y - model.predict(X_subset)) ** 2)
|
|
n_features = X_subset.shape[1]
|
|
importances = np.zeros(n_features)
|
|
|
|
# 分配特征到各进程
|
|
features_per_rank = np.array_split(range(n_features), size)
|
|
local_features = features_per_rank[rank]
|
|
|
|
rng = np.random.RandomState(42)
|
|
local_importances = np.zeros(n_features)
|
|
|
|
for j in local_features:
|
|
X_permuted = X_subset.copy()
|
|
rng.shuffle(X_permuted[:, j])
|
|
y_pred = model.predict(X_permuted)
|
|
local_importances[j] = np.mean((y - y_pred) ** 2) - baseline
|
|
|
|
# 汇总所有进程的结果
|
|
comm.Allreduce(MPI.IN_PLACE, local_importances, op=MPI.SUM)
|
|
return local_importances
|
|
|
|
# ---------------------------
|
|
# 主流程控制
|
|
# ---------------------------
|
|
if rank == 0:
|
|
# 动态评估树的数量
|
|
print("Evaluating optimal n_estimators...")
|
|
tree_results = evaluate_n_estimators_parallel()
|
|
|
|
plt.figure(figsize=(8, 5))
|
|
plt.plot(list(tree_results.keys()), list(tree_results.values()), marker='o')
|
|
plt.xlabel('n_estimators'), plt.ylabel('CV MSE'), plt.grid(True)
|
|
plt.savefig("tree_vs_mse.png", dpi=300)
|
|
plt.close()
|
|
|
|
best_n = min(tree_results, key=tree_results.get)
|
|
print(f"Optimal n_estimators: {best_n}")
|
|
else:
|
|
best_n = None
|
|
|
|
best_n = comm.bcast(best_n, root=0)
|
|
|
|
# ---------------------------
|
|
# 并行 RFECV 流程
|
|
# ---------------------------
|
|
current_features = list(range(X.shape[1]))
|
|
best_features = current_features.copy()
|
|
best_score = float('inf')
|
|
history = []
|
|
|
|
while len(current_features) > 1:
|
|
# 仅在 rank 0 计算交叉验证分数
|
|
if rank == 0:
|
|
X_subset = X[:, current_features]
|
|
score = cross_val_score_cpu(X_subset, y, n_estimators=best_n)
|
|
history.append((len(current_features), score))
|
|
|
|
if score < best_score:
|
|
best_score = score
|
|
best_features = current_features.copy()
|
|
|
|
print(f"Features: {len(current_features)}, Score: {score:.4f}")
|
|
|
|
# 训练模型并广播
|
|
model = RandomForestRegressor(n_estimators=best_n, random_state=0, n_jobs=-1)
|
|
model.fit(X_subset, y)
|
|
else:
|
|
model = None
|
|
|
|
# 广播模型到所有进程
|
|
model = comm.bcast(model, root=0)
|
|
|
|
# 计算置换重要性
|
|
X_subset = X[:, current_features]
|
|
importances = permutation_importance(model, X_subset)
|
|
|
|
# 确定要移除的特征
|
|
if rank == 0:
|
|
idx = np.argmin(importances)
|
|
removed_feature = current_features[idx]
|
|
current_features.pop(idx)
|
|
|
|
# 广播更新后的特征索引
|
|
current_features = comm.bcast(current_features, root=0)
|
|
|
|
# ---------------------------
|
|
# 结果输出
|
|
# ---------------------------
|
|
if rank == 0:
|
|
print(f"\nOptimal features ({len(best_features)}):")
|
|
print([feature_names[i] for i in best_features])
|
|
|
|
# 保存结果
|
|
import pickle
|
|
with open("best_features.pkl", "wb") as f:
|
|
pickle.dump({
|
|
'features': [feature_names[i] for i in best_features],
|
|
'n_estimators': best_n,
|
|
'history': history
|
|
}, f) |