from Tools import blosum62 import numpy as np from sklearn.metrics import roc_auc_score from sklearn.linear_model import LogisticRegressionCV from sklearn.linear_model import LogisticRegression from pathlib import Path def trainning(): length = 30 fold = 10 if not Path('POS.txt').exists(): raise FileNotFoundError("POS.txt or NEG.txt not found.") f1 = open(r"POS.txt", "r") f2 = open(r"NEG.txt", "r") pos = set() neg = set() for line in f1.readlines(): sp = line.strip().split("\t") pep = sp[0] pos.add(pep) for line in f2.readlines(): sp = line.strip().split("\t") pep = sp[0] if pep not in pos: neg.add(pep) print("Frist round。。。。。。。。。。。。。") AAscores, l_aas, AAs = blosum62() l_scores, l_type,l_peps = getWeightScoreType(pos, neg, AAscores, AAs,length) ## 获取位置权重确定 PWD 矩阵 raw_scores = [] for i in range(len(l_scores)): # 所有正负样本的矩阵数目 长度是 21919 个 丙酰化的位点 total = 0.0 for j in range(len(l_scores[i])): total += l_scores[i][j] raw_scores.append(total) # 展平所有打分 X = np.array(l_scores) # 所有评分 Y = np.array(l_type) # 所有正负列表的标签 PEP = np.array(l_peps) # 原始丙酰化肽段的原始字符串 weight_coef, weight_auc = logistic_GPS(X, Y,PEP,"WW",0) print("First weight AUC:" + str(weight_auc)) #MM training l_scores, l_type,peps = getMMScoreType(pos, neg, AAscores, weight_coef, l_aas, AAs,length) raw_scores = [] for i in range(len(l_scores)): total = 0.0 for j in range(len(l_scores[i])): total += l_scores[i][j] raw_scores.append(total) X = np.array(l_scores) Y = np.array(l_type) PEP = np.array(l_peps) MM_coef,MM_auc= logistic_GPS(X, Y,PEP,"MM",0) print("First matix AUC:" + str(MM_auc)) best_weight_auc = weight_auc best_MM_auc = MM_auc file = "traningout_first_" + str(fold) + ".txt" writeParameter_MM(file, 10, 10, AAscores, l_aas, weight_coef, MM_coef, MM_auc) AAscores = newAAScore(AAscores, l_aas, AAs, MM_coef) for i in range(100)[1:]: print("The " + str(i + 1) + "th trainning") l_scores, l_type,l_peps = getWeightScoreType(pos, neg, AAscores, AAs, length) X = np.array(l_scores) Y = np.array(l_type) PEP = np.array(l_peps) weight_coef, weight_auc = logistic_GPS(X, Y,PEP,"WW",i) if weight_auc > best_weight_auc: best_weight_auc = weight_auc file2 = "traningout_weight_best_" + str(fold) + "_" + str(i+1) + ".txt" writeParameter_WW(file2, 30, 30, AAscores, l_aas, weight_coef, weight_auc) # MM training l_scores, l_type, peps = getMMScoreType(pos, neg, AAscores, weight_coef, l_aas, AAs, length) X = np.array(l_scores) Y = np.array(l_type) PEP = np.array(l_peps) MM_coef, MM_auc = logistic_GPS(X, Y,PEP,"MM",i) print("The " + str(i + 1) + "round AUC:" + str(MM_auc)) if MM_auc > best_MM_auc: best_MM_auc = MM_auc file2 = "traningout_MM_best_" + str(fold) + "_" + str(i+1) + ".txt" writeParameter_MM(file2, 30, 30, AAscores, l_aas, weight_coef, MM_coef, MM_auc) else: break AAscores = newAAScore(AAscores, l_aas, AAs, MM_coef) def newAAScore(AAscores, l_aas, AAs, MM_coef): dict_weight = {} for i in range(len(l_aas)): aas = l_aas[i] score = AAscores[aas] mweight = MM_coef[i] newscore = score * mweight dict_weight[aas] = newscore return dict_weight def getWeightScoreType(pos, neg, matrix, AAs,length): scores = [] # scores 是一个二维列表,用于存储每个位置上的氨基酸得分。 for i in range(length*2+1): # 61 = 30+1+30 pos_score = [] for j in range(len(AAs)): aa1 = AAs[j] score = 0.0 for oth in pos: aa2 = oth[i:i + 1] aas = aa1 + "_" + aa2 aas2 = aa2 + "_" + aa1 if aas in matrix: score += matrix[aas] else: score += matrix[aas2] pos_score.append(score) scores.append(pos_score) l_scores = [] # l_scores 是一个二维列表,用于存储每个肽段的得分向量。 l_type = [] l_peps = [] for pep in pos: score = [] for i in range(len(pep)): # range(0,61) aa = pep[i:i + 1] index = AAs.index(aa) aascore = (scores[i][index] - matrix[aa + "_" + aa]) / (len(pos) - 1) # 减去自身样本的影响,使用样本均值 score.append(aascore) l_scores.append(score) l_type.append(1) l_peps.append(pep) # num = 0 for pep in neg: score = [] for i in range(len(pep)): aa = pep[i:i + 1] index = AAs.index(aa) aascore = scores[i][index] / len(pos) # 负样本本身就是噪音并不需要调整自身影响。 score.append(aascore) l_scores.append(score) l_type.append(0) l_peps.append(pep) return l_scores, l_type,l_peps def getMMScoreType(pos, neg, matrix, weights, l_aas, AAs,length): """ 参数解释: 1. `pos` (set): - 正样本集合,包含肽段序列的字符串。 - 每个肽段表示一个多肽序列,例如 "PEPTIDE"。 - 这些肽段对应已知的正类标签(功能重要或感兴趣的多肽)。 2. `neg` (set): - 负样本集合,包含肽段序列的字符串。 - 每个肽段表示一个多肽序列,例如 "NEGTIDE"。 - 这些肽段对应已知的负类标签(非功能重要或不感兴趣的多肽)。 3. `matrix` (dict): - 氨基酸对的评分矩阵,键为氨基酸对(如 "A_R" 或 "G_G"),值为相应的得分。 - 用于衡量不同氨基酸对之间的相似性或重要性。 - 例如:`matrix["A_R"] = 1.2` 表示氨基酸对 A 和 R 的评分为 1.2。 4. `weights` (list): - 权重列表,存储逻辑回归模型计算出的权重,用于加权计算每个位置的影响。 - 长度为 `length * 2 + 1`,每个权重对应肽段中某个相对位置的影响。 - 例如:`weights[0]` 可能表示窗口中心位置的权重,`weights[length]` 表示窗口前 `length` 个位置的权重。 5. `l_aas` (list): - 氨基酸对的完整列表,包含所有可能的氨基酸对组合。 - 用于定位得分矩阵中某个氨基酸对的位置。 - 例如:`l_aas = ["A_A", "A_R", "R_A", "G_G", ...]`。 6. `AAs` (list): - 氨基酸的完整列表,包含所有单个氨基酸字符。 - 用于定位肽段中某个氨基酸的位置。 - 例如:`AAs = ["A", "R", "N", "D", "C", "Q", "E", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V"]`。 7. `length` (int): - 窗口大小,用于定义肽段中心位置的上下游范围。 - 计算时会覆盖从 `-length` 到 `+length` 的范围,总共 `2 * length + 1` 个位置。 - 例如:`length = 30` 时,窗口范围为中心位置前后 30 个氨基酸,总共 61 个位置。 """ scorespos = [] scoresneg = [] for i in range(length*2+1): score_pos = [] score_neg = [] for j in range(len(AAs)): aa1 = AAs[j] score = [] for z in range(len(l_aas)): score.append(0.0) for oth in pos: aa2 = oth[i:i + 1] aas1 = aa1 + "_" + aa2 aas2 = aa2 + "_" + aa1 if aas1 in l_aas: index = l_aas.index(aas1) score[index] += matrix[aas1] * weights[i] elif aas2 in l_aas: index = l_aas.index(aas2) score[index] += matrix[aas2] * weights[i] scoreneg = np.array(score) index2 = l_aas.index(aa1 + "_" + aa1) score[index2] -= matrix[aa1 +"_" + aa1] * weights[i] scorepos = np.array(score) score_pos.append(scorepos) score_neg.append(scoreneg) scorespos.append(score_pos) scoresneg.append(score_neg) l_scores = [] l_type = [] l_peps = [] for pep in pos: score = getArray(l_aas) for i in range(len(pep)): aa = pep[i:i + 1] index = AAs.index(aa) scoreary = scorespos[i][index] score += scoreary score = (score / (len(pos) -1 )).tolist() l_scores.append(score) l_type.append(1) l_peps.append(pep) # num = 0 for pep in neg: score = getArray(l_aas) for i in range(len(pep)): aa = pep[i:i + 1] index = AAs.index(aa) scoreary = scoresneg[i][index] score += scoreary score = (score / len(pos)).tolist() l_scores.append(score) l_type.append(0) l_peps.append(pep) return l_scores, l_type, l_peps def getArray(l_aas): score = [] for i in range(len(l_aas)): score.append(0.0) scoreary = np.array(score) return scoreary def writeParameter_WW(file,left,right,AAscores,l_aas,weight_coef,weight_auc): fw = open(file, "w") list_aa = ["A", "R", "N", "D", "C", "Q", "E", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V", "B", "Z", "X", "*"] dict_weight = {} for i in range(len(l_aas)): aas = l_aas[i] score = AAscores[aas] dict_weight[aas] = score fw.write("#KprFunc 1.0 Parameters\n") fw.write("#Version: 1.0\n") fw.write("#By Chenwei Wang @HUST\n") fw.write("@param\tCode=K\tUp=" + str(left) + "\tDown=" + str(right) + "\n") fw.write("@AUC=" + str(weight_auc) + "\n") fw.write("@weight") for i in range(len(weight_coef)): fw.write("\t" + str(weight_coef[i])) fw.write("\n") for i in range(len(list_aa)): a = list_aa[i] fw.write(" " + a) fw.write("\n") for i in range(len(list_aa)): a1 = list_aa[i] fw.write(a1) for j in range(len(list_aa)): a2 = list_aa[j] aas1 = a1 + "_" + a2 aas2 = a2 + "_" + a1 score = 0.0 if aas1 in dict_weight: score = dict_weight[aas1] elif aas2 in dict_weight: score = dict_weight[aas2] else: print(aas1 + "wrong!") fw.write(" " + str(score)) fw.write("\n") fw.flush() fw.close() def writeParameter_MM(file,left,right,AAscores,l_aas,weight_coef,MM_coef,MM_auc): fw = open(file, "w") list_aa = ["A", "R", "N", "D", "C", "Q", "E", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V", "B", "Z", "X", "*"] dict_weight = {} for i in range(len(l_aas)): aas = l_aas[i] score = AAscores[aas] mweight = MM_coef[i] newscore = score * mweight dict_weight[aas] = newscore fw.write("#KprFunc 1.0 Parameters\n") fw.write("#Version: 1.0\n") fw.write("#By Chenwei Wang @HUST\n") fw.write("@param\tCode=K\tUp=" + str(left) + "\tDown=" + str(right) + "\n") fw.write("@AUC=" + str(MM_auc) + "\n") fw.write("@weight") for i in range(len(weight_coef)): fw.write("\t" + str(weight_coef[i])) fw.write("\n") for i in range(len(list_aa)): a = list_aa[i] fw.write(" " + a) fw.write("\n") for i in range(len(list_aa)): a1 = list_aa[i] fw.write(a1) for j in range(len(list_aa)): a2 = list_aa[j] aas1 = a1 + "_" + a2 aas2 = a2 + "_" + a1 score = 0.0 if aas1 in dict_weight: score = dict_weight[aas1] elif aas2 in dict_weight: score = dict_weight[aas2] else: print(aas1 + "wrong!") fw.write(" " + str(score)) fw.write("\n") fw.flush() fw.close() def logistic_GPS(X: bytearray, Y: bytearray,PEP,type,turn): ''' @description: 在代码中,logistic_GPS 函数是实现 PWD 和 SMO 的核心,完成了逻辑回归的训练与权重优化。 @param: X: 输入特征矩阵,形状为 (n_samples, n_features),其中 n_samples 是样本数量,n_features 是特征数量。肽段得分矩阵 Y: 目标变量(标签),形状为 (n_samples,),表示每个样本的类别(0 或 1)。正负样本,1 或 0 PEP: 肽段数据,形状为 (n_samples,),表示每个样本的肽段序列。虽然在 logistic_GPS 函数中没有直接使用 PEP,但它可能在后续的处理中被用到,例如在 writeParameter_WW 或 writeParameter_MM 函数中。 主要功能是执行逻辑回归训练,特别是交叉验证选择最佳正则化参数 C,并根据此参数训练逻辑回归模型,返回权重系数和 AUC 分数。 return list_coef: list of float 权重系数,形状为 (n_features,),表示每个特征的权重。用于加权评分。 auc: 逻辑回归模型的性能指标,用于评估模型的分类能力。 1. type = "WW" 含义:表示“Weighted Window”(加权窗口)训练类型。 特点: 在这种训练类型中,每个位置的权重是通过逻辑回归模型计算出来的。 重点在于计算每个位置的权重,以便更好地反映每个位置对肽段分类的重要性。 通常用于初始训练阶段,确定每个位置的权重。 2. type = "MM" 含义:表示“Matrix Multiplication”(矩阵乘法)训练类型。 特点: 在这种训练类型中,除了计算每个位置的权重外,还会考虑氨基酸对的评分矩阵。 重点在于结合氨基酸对的评分矩阵和位置权重,进一步优化模型的性能。 通常用于后续的训练阶段,通过矩阵乘法进一步调整权重,提高模型的准确性。 ''' solverchose = 'sag' clscv = LogisticRegressionCV(max_iter=10000, cv=10, solver=solverchose,scoring='roc_auc') # LogisticRegressionCV 的目标是通过最大化AUC交叉验证找到最佳的 C 值 clscv.fit(X, Y) #regularization = clscv.C_[0] regularization = clscv.C_[0] * (10**(-turn)) # 动态调整C的值,通过10^turn 来调整正则化的强度。 print("C=" + str(regularization)) cls = LogisticRegression(max_iter=10000,solver=solverchose,C=regularization) cls.fit(X, Y) list_coef = cls.coef_[0] predict_prob_x = cls.predict_proba(X) predict_x = predict_prob_x[:, 1] auc = roc_auc_score(Y,np.array(predict_x)) print("AUC:" + str(auc)) return list_coef,auc trainning()