426 lines
18 KiB
Python
426 lines
18 KiB
Python
from Tools import blosum62
|
||
import numpy as np
|
||
from sklearn.metrics import roc_auc_score
|
||
from sklearn.linear_model import LogisticRegressionCV
|
||
from sklearn.linear_model import LogisticRegression
|
||
from pathlib import Path
|
||
|
||
def trainning():
|
||
length = 30
|
||
fold = 10 # 交叉验证的折数
|
||
if not Path('POS.txt').exists():
|
||
raise FileNotFoundError("POS.txt or NEG.txt not found.")
|
||
f1 = open(r"POS.txt", "r")
|
||
f2 = open(r"NEG.txt", "r")
|
||
pos = set()
|
||
neg = set()
|
||
for line in f1.readlines():
|
||
sp = line.strip().split("\t")
|
||
pep = sp[0]
|
||
pos.add(pep)
|
||
for line in f2.readlines():
|
||
sp = line.strip().split("\t")
|
||
pep = sp[0]
|
||
if pep not in pos:
|
||
neg.add(pep)
|
||
|
||
print("Frist round。。。。。。。。。。。。。")
|
||
AAscores, l_aas, AAs = blosum62()
|
||
l_scores, l_type,l_peps = getWeightScoreType(pos, neg, AAscores, AAs,length) ## 获取位置权重确定 PWD 矩阵
|
||
raw_scores = []
|
||
for i in range(len(l_scores)): # 所有正负样本的矩阵数目 长度是 21919 个 丙酰化的位点
|
||
total = 0.0
|
||
for j in range(len(l_scores[i])):
|
||
total += l_scores[i][j]
|
||
raw_scores.append(total) # 展平所有打分 可以直接用于排序、筛选或者特征归一化
|
||
X = np.array(l_scores) # 所有评分
|
||
Y = np.array(l_type) # 所有正负列表的标签
|
||
PEP = np.array(l_peps) # 原始丙酰化肽段的原始字符串
|
||
weight_coef, weight_auc = logistic_GPS(X, Y,PEP,"WW",0)
|
||
print("First weight AUC:" + str(weight_auc))
|
||
#MM training
|
||
l_scores, l_type,peps = getMMScoreType(pos, neg, AAscores, weight_coef, l_aas, AAs,length)
|
||
raw_scores = []
|
||
for i in range(len(l_scores)):
|
||
total = 0.0
|
||
for j in range(len(l_scores[i])):
|
||
total += l_scores[i][j]
|
||
raw_scores.append(total)
|
||
X = np.array(l_scores)
|
||
Y = np.array(l_type)
|
||
PEP = np.array(l_peps)
|
||
MM_coef,MM_auc= logistic_GPS(X, Y,PEP,"MM",0)
|
||
print("First matix AUC:" + str(MM_auc))
|
||
best_weight_auc = weight_auc
|
||
best_MM_auc = MM_auc
|
||
|
||
file = "traningout_first_" + str(fold) + ".txt"
|
||
writeParameter_MM(file, 10, 10, AAscores, l_aas, weight_coef, MM_coef, MM_auc)
|
||
|
||
AAscores = newAAScore(AAscores, l_aas, AAs, MM_coef)
|
||
for i in range(100)[1:]:
|
||
print("The " + str(i + 1) + "th trainning")
|
||
l_scores, l_type,l_peps = getWeightScoreType(pos, neg, AAscores, AAs, length)
|
||
X = np.array(l_scores)
|
||
Y = np.array(l_type)
|
||
PEP = np.array(l_peps)
|
||
weight_coef, weight_auc = logistic_GPS(X, Y,PEP,"WW",i)
|
||
if weight_auc > best_weight_auc:
|
||
best_weight_auc = weight_auc
|
||
file2 = "traningout_weight_best_" + str(fold) + "_" + str(i+1) + ".txt"
|
||
writeParameter_WW(file2, 30, 30, AAscores, l_aas, weight_coef, weight_auc)
|
||
# MM training
|
||
l_scores, l_type, peps = getMMScoreType(pos, neg, AAscores, weight_coef, l_aas, AAs, length)
|
||
X = np.array(l_scores)
|
||
Y = np.array(l_type)
|
||
PEP = np.array(l_peps)
|
||
MM_coef, MM_auc = logistic_GPS(X, Y,PEP,"MM",i)
|
||
print("The " + str(i + 1) + "round AUC:" + str(MM_auc))
|
||
if MM_auc > best_MM_auc:
|
||
best_MM_auc = MM_auc
|
||
file2 = "traningout_MM_best_" + str(fold) + "_" + str(i+1) + ".txt"
|
||
writeParameter_MM(file2, 30, 30, AAscores, l_aas, weight_coef, MM_coef, MM_auc)
|
||
else:
|
||
break
|
||
AAscores = newAAScore(AAscores, l_aas, AAs, MM_coef)
|
||
|
||
def newAAScore(AAscores, l_aas, AAs, MM_coef):
|
||
dict_weight = {}
|
||
for i in range(len(l_aas)):
|
||
aas = l_aas[i]
|
||
score = AAscores[aas]
|
||
mweight = MM_coef[i]
|
||
newscore = score * mweight
|
||
dict_weight[aas] = newscore
|
||
return dict_weight
|
||
|
||
|
||
def getWeightScoreType(pos, neg, matrix, AAs,length):
|
||
'''
|
||
这个函数计算每个肽段的得分向量,其中每个位置的得分是基于 BLOSUM62 矩阵和位置权重。
|
||
获取位置权重确定 PWD 矩阵
|
||
参数解释:
|
||
pos: 正样本长度61氨基酸的set集合
|
||
neg: 负样本长度61氨基酸的set集合
|
||
matrix: 62*62的矩阵,用于计算每个氨基酸对位置的权重,即AAscores
|
||
AAs: 24个氨基酸的列表,用于索引矩阵
|
||
length: 肽段的长度,用于确定打分矩阵的行数,即打分矩阵的行数等于肽段的长度*2+1
|
||
return: l_scores, l_type,l_peps
|
||
l_scores: 二维列表,存储每个肽段的打分向量,打分向量是长度为61的列表,每个位置上的打分是该位置上所有氨基酸的打分之和。
|
||
l_type: 二维列表,存储肽段的标签,1表示正样本,0表示负样本。
|
||
l_peps: 二维列表,存储肽段的原始字符串。
|
||
'''
|
||
scores = [] # scores 是一个二维列表,用于存储正样本每个位置上的氨基酸得分矩阵。
|
||
for i in range(length*2+1): # 61 = 30+1+30
|
||
pos_score = []
|
||
for j in range(len(AAs)): # 遍历每种氨基酸
|
||
aa1 = AAs[j] # 当前氨基酸
|
||
score = 0.0 # 初始化得分为 0
|
||
for oth in pos: # 遍历所有正样本
|
||
aa2 = oth[i:i + 1] # 正样本中当前位置的氨基酸
|
||
aas = aa1 + "_" + aa2 # 生成氨基酸对的键
|
||
aas2 = aa2 + "_" + aa1 # 反向氨基酸对的键
|
||
if aas in matrix:
|
||
score += matrix[aas] # 替代矩阵中的相似性得分
|
||
else:
|
||
score += matrix[aas2]
|
||
pos_score.append(score) # 将得分加入当前位置的氨基酸列表
|
||
scores.append(pos_score)
|
||
|
||
l_scores = [] # l_scores 是一个二维列表,用于存储每个肽段的得分向量。对每个肽段,最终形成一个长度为 61 的向量.
|
||
l_type = []
|
||
l_peps = []
|
||
|
||
for pep in pos:
|
||
score = []
|
||
for i in range(len(pep)): # 遍历肽段的每个位置
|
||
aa = pep[i:i + 1] # 当前肽段在位置 i 的氨基酸
|
||
index = AAs.index(aa) # 获取氨基酸在 AAs 列表中的索引
|
||
aascore = (scores[i][index] - matrix[aa + "_" + aa]) / (len(pos) - 1) # 计算平均得分
|
||
score.append(aascore) # 加入当前肽段的位置得分
|
||
l_scores.append(score) # 将该肽段的打分向量加入到结果列表
|
||
l_type.append(1) # 标签为 1,表示正样本
|
||
l_peps.append(pep) # 保存肽段的原始字符串
|
||
|
||
# num = 0
|
||
for pep in neg:
|
||
score = []
|
||
for i in range(len(pep)):
|
||
aa = pep[i:i + 1] # 当前肽段在位置 i 的氨基酸
|
||
index = AAs.index(aa) # 获取氨基酸在 AAs 列表中的索引
|
||
aascore = scores[i][index] / len(pos) # 直接取正样本的平均得分 # 负样本本身就是噪音并不需要调整自身影响。
|
||
score.append(aascore) # 加入当前肽段的位置得分
|
||
l_scores.append(score) # 将该肽段的打分向量加入到结果列表
|
||
l_type.append(0) # 标签为 0,表示负样本
|
||
l_peps.append(pep) # 保存肽段的原始字符串
|
||
|
||
return l_scores, l_type,l_peps # 返回含有正负样本的打分向量,打分向量是长度为61的列表,每个位置上的打分是该位置上所有氨基酸的打分之和。
|
||
|
||
def getMMScoreType(pos, neg, matrix, weights, l_aas, AAs,length):
|
||
# 这个函数计算每个肽段的得分向量,其中每个位置的得分是基于氨基酸对的评分矩阵和位置权重。SMO 矩阵权重
|
||
"""
|
||
参数解释:
|
||
|
||
1. `pos` (set):
|
||
- 正样本集合,包含肽段序列的字符串。
|
||
- 每个肽段表示一个多肽序列,例如 "PEPTIDE"。
|
||
- 这些肽段对应已知的正类标签(功能重要或感兴趣的多肽)。
|
||
|
||
2. `neg` (set):
|
||
- 负样本集合,包含肽段序列的字符串。
|
||
- 每个肽段表示一个多肽序列,例如 "NEGTIDE"。
|
||
- 这些肽段对应已知的负类标签(非功能重要或不感兴趣的多肽)。
|
||
|
||
3. `matrix` (dict):
|
||
- 氨基酸对的评分矩阵,键为氨基酸对(如 "A_R" 或 "G_G"),值为相应的得分。
|
||
- 用于衡量不同氨基酸对之间的相似性或重要性。
|
||
- 例如:`matrix["A_R"] = 1.2` 表示氨基酸对 A 和 R 的评分为 1.2。
|
||
|
||
4. `weights` (list):
|
||
- 权重列表,存储逻辑回归模型计算出的权重,用于加权计算每个位置的影响。
|
||
- 长度为 `length * 2 + 1`,每个权重对应肽段中某个相对位置的影响。
|
||
- 例如:`weights[0]` 可能表示窗口中心位置的权重,`weights[length]` 表示窗口前 `length` 个位置的权重。
|
||
|
||
5. `l_aas` (list):
|
||
- 氨基酸对的完整列表,包含所有可能的氨基酸对组合。
|
||
- 用于定位得分矩阵中某个氨基酸对的位置。
|
||
- 例如:`l_aas = ["A_A", "A_R", "R_A", "G_G", ...]`。
|
||
|
||
6. `AAs` (list):
|
||
- 氨基酸的完整列表,包含所有单个氨基酸字符。
|
||
- 用于定位肽段中某个氨基酸的位置。
|
||
- 例如:`AAs = ["A", "R", "N", "D", "C", "Q", "E", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V"]`。
|
||
|
||
7. `length` (int):
|
||
- 窗口大小,用于定义肽段中心位置的上下游范围。
|
||
- 计算时会覆盖从 `-length` 到 `+length` 的范围,总共 `2 * length + 1` 个位置。
|
||
- 例如:`length = 30` 时,窗口范围为中心位置前后 30 个氨基酸,总共 61 个位置。
|
||
"""
|
||
scorespos = []
|
||
scoresneg = []
|
||
for i in range(length*2+1):
|
||
score_pos = []
|
||
score_neg = []
|
||
for j in range(len(AAs)):
|
||
aa1 = AAs[j]
|
||
score = [] # 是一个长度为 300 的列表,用来保存每个氨基酸对在当前位置的得分。
|
||
for z in range(len(l_aas)): # 遍历所有氨基酸对 (a, b)
|
||
score.append(0.0) # 初始化每个氨基酸对的得分
|
||
for oth in pos:
|
||
aa2 = oth[i:i + 1]
|
||
aas1 = aa1 + "_" + aa2
|
||
aas2 = aa2 + "_" + aa1
|
||
if aas1 in l_aas:
|
||
index = l_aas.index(aas1)
|
||
score[index] += matrix[aas1] * weights[i] # 计算 (a, b) 对应位置 j 的得分
|
||
elif aas2 in l_aas:
|
||
index = l_aas.index(aas2)
|
||
score[index] += matrix[aas2] * weights[i]
|
||
scoreneg = np.array(score)
|
||
index2 = l_aas.index(aa1 + "_" + aa1)
|
||
score[index2] -= matrix[aa1 +"_" + aa1] * weights[i]
|
||
scorepos = np.array(score)
|
||
|
||
score_pos.append(scorepos)
|
||
score_neg.append(scoreneg)
|
||
scorespos.append(score_pos)
|
||
scoresneg.append(score_neg)
|
||
|
||
l_scores = []
|
||
l_type = []
|
||
l_peps = []
|
||
|
||
for pep in pos:
|
||
score = getArray(l_aas)
|
||
for i in range(len(pep)):
|
||
aa = pep[i:i + 1]
|
||
index = AAs.index(aa)
|
||
scoreary = scorespos[i][index]
|
||
score += scoreary
|
||
|
||
score = (score / (len(pos) -1 )).tolist()
|
||
|
||
l_scores.append(score)
|
||
l_type.append(1)
|
||
l_peps.append(pep)
|
||
|
||
# num = 0
|
||
for pep in neg:
|
||
score = getArray(l_aas)
|
||
for i in range(len(pep)):
|
||
aa = pep[i:i + 1]
|
||
index = AAs.index(aa)
|
||
scoreary = scoresneg[i][index]
|
||
score += scoreary
|
||
|
||
score = (score / len(pos)).tolist()
|
||
|
||
l_scores.append(score)
|
||
l_type.append(0)
|
||
l_peps.append(pep)
|
||
return l_scores, l_type, l_peps
|
||
|
||
|
||
def getArray(l_aas):
|
||
score = []
|
||
for i in range(len(l_aas)):
|
||
score.append(0.0)
|
||
scoreary = np.array(score)
|
||
|
||
return scoreary
|
||
|
||
def writeParameter_WW(file,left,right,AAscores,l_aas,weight_coef,weight_auc):
|
||
fw = open(file, "w")
|
||
list_aa = ["A", "R", "N", "D", "C", "Q", "E", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V",
|
||
"B", "Z", "X", "*"]
|
||
dict_weight = {}
|
||
for i in range(len(l_aas)):
|
||
aas = l_aas[i]
|
||
score = AAscores[aas]
|
||
dict_weight[aas] = score
|
||
fw.write("#KprFunc 1.0 Parameters\n")
|
||
fw.write("#Version: 1.0\n")
|
||
fw.write("#By Chenwei Wang @HUST\n")
|
||
fw.write("@param\tCode=K\tUp=" + str(left) + "\tDown=" + str(right) + "\n")
|
||
fw.write("@AUC=" + str(weight_auc) + "\n")
|
||
fw.write("@weight")
|
||
for i in range(len(weight_coef)):
|
||
fw.write("\t" + str(weight_coef[i]))
|
||
fw.write("\n")
|
||
|
||
for i in range(len(list_aa)):
|
||
a = list_aa[i]
|
||
fw.write(" " + a)
|
||
fw.write("\n")
|
||
|
||
for i in range(len(list_aa)):
|
||
a1 = list_aa[i]
|
||
fw.write(a1)
|
||
for j in range(len(list_aa)):
|
||
a2 = list_aa[j]
|
||
aas1 = a1 + "_" + a2
|
||
aas2 = a2 + "_" + a1
|
||
score = 0.0
|
||
if aas1 in dict_weight:
|
||
score = dict_weight[aas1]
|
||
elif aas2 in dict_weight:
|
||
score = dict_weight[aas2]
|
||
else:
|
||
print(aas1 + "wrong!")
|
||
fw.write(" " + str(score))
|
||
fw.write("\n")
|
||
|
||
fw.flush()
|
||
fw.close()
|
||
|
||
def writeParameter_MM(file,left,right,AAscores,l_aas,weight_coef,MM_coef,MM_auc):
|
||
fw = open(file, "w")
|
||
list_aa = ["A", "R", "N", "D", "C", "Q", "E", "G", "H", "I", "L", "K", "M", "F", "P", "S", "T", "W", "Y", "V",
|
||
"B", "Z", "X", "*"]
|
||
dict_weight = {}
|
||
for i in range(len(l_aas)):
|
||
aas = l_aas[i]
|
||
score = AAscores[aas]
|
||
mweight = MM_coef[i]
|
||
newscore = score * mweight
|
||
dict_weight[aas] = newscore
|
||
fw.write("#KprFunc 1.0 Parameters\n")
|
||
fw.write("#Version: 1.0\n")
|
||
fw.write("#By Chenwei Wang @HUST\n")
|
||
fw.write("@param\tCode=K\tUp=" + str(left) + "\tDown=" + str(right) + "\n")
|
||
fw.write("@AUC=" + str(MM_auc) + "\n")
|
||
fw.write("@weight")
|
||
for i in range(len(weight_coef)):
|
||
fw.write("\t" + str(weight_coef[i]))
|
||
fw.write("\n")
|
||
|
||
for i in range(len(list_aa)):
|
||
a = list_aa[i]
|
||
fw.write(" " + a)
|
||
fw.write("\n")
|
||
|
||
for i in range(len(list_aa)):
|
||
a1 = list_aa[i]
|
||
fw.write(a1)
|
||
for j in range(len(list_aa)):
|
||
a2 = list_aa[j]
|
||
aas1 = a1 + "_" + a2
|
||
aas2 = a2 + "_" + a1
|
||
score = 0.0
|
||
if aas1 in dict_weight:
|
||
score = dict_weight[aas1]
|
||
elif aas2 in dict_weight:
|
||
score = dict_weight[aas2]
|
||
else:
|
||
print(aas1 + "wrong!")
|
||
fw.write(" " + str(score))
|
||
fw.write("\n")
|
||
|
||
fw.flush()
|
||
fw.close()
|
||
|
||
def logistic_GPS(X: bytearray, Y: bytearray,PEP,type,turn):
|
||
'''
|
||
@description:
|
||
在代码中,logistic_GPS 函数是实现 PWD 和 SMO 的核心,完成了逻辑回归的训练与权重优化。
|
||
@param:
|
||
X:
|
||
|
||
输入特征矩阵,形状为 (n_samples, n_features),其中 n_samples 是样本数量,n_features 是特征数量。肽段得分矩阵
|
||
Y:
|
||
|
||
目标变量(标签),形状为 (n_samples,),表示每个样本的类别(0 或 1)。正负样本,1 或 0
|
||
PEP:
|
||
|
||
肽段数据,形状为 (n_samples,),表示每个样本的肽段序列。虽然在 logistic_GPS 函数中没有直接使用 PEP,但它可能在后续的处理中被用到,例如在 writeParameter_WW 或 writeParameter_MM 函数中。
|
||
主要功能是执行逻辑回归训练,特别是交叉验证选择最佳正则化参数 C,并根据此参数训练逻辑回归模型,返回权重系数和 AUC 分数。
|
||
|
||
turn: 训练轮次,用于动态调整正则化参数 C 的值。
|
||
|
||
return
|
||
list_coef: list of float
|
||
权重系数,形状为 (n_features,),表示每个特征的权重。用于加权评分。
|
||
auc: 逻辑回归模型的性能指标,用于评估模型的分类能力。
|
||
|
||
1. type = "WW"
|
||
含义:表示“Weighted Window”(加权窗口)训练类型。
|
||
特点:
|
||
在这种训练类型中,每个位置的权重是通过逻辑回归模型计算出来的。
|
||
重点在于计算每个位置的权重,以便更好地反映每个位置对肽段分类的重要性。
|
||
通常用于初始训练阶段,确定每个位置的权重。
|
||
2. type = "MM"
|
||
含义:表示“Matrix Multiplication”(矩阵乘法)训练类型。
|
||
特点:
|
||
在这种训练类型中,除了计算每个位置的权重外,还会考虑氨基酸对的评分矩阵。
|
||
重点在于结合氨基酸对的评分矩阵和位置权重,进一步优化模型的性能。
|
||
通常用于后续的训练阶段,通过矩阵乘法进一步调整权重,提高模型的准确性。
|
||
'''
|
||
solverchose = 'sag' # 选择求解器
|
||
clscv = LogisticRegressionCV(max_iter=10000, cv=10, solver=solverchose,scoring='roc_auc') # LogisticRegressionCV 的目标是通过最大化AUC交叉验证找到最佳的 C 值
|
||
'''
|
||
max_iter=10000: 设置最大迭代次数。
|
||
cv=10: 使用 10 折交叉验证。
|
||
solver='sag': 使用随机平均梯度下降求解器。
|
||
scoring='roc_auc': 使用 ROC AUC 作为评分标准。
|
||
'''
|
||
clscv.fit(X, Y) # PWD 权重优化:通过逻辑回归计算每个位置的权重 Wj
|
||
#regularization = clscv.C_[0]
|
||
regularization = clscv.C_[0] * (10**(-turn)) # 动态调整C的值,通过10^turn 来调整正则化的强度。
|
||
print("C=" + str(regularization))
|
||
# 使用最佳正则化参数训练逻辑回归模型
|
||
cls = LogisticRegression(max_iter=10000,solver=solverchose,C=regularization)
|
||
cls.fit(X, Y)
|
||
# 获取权重系数和 AUC 分数:
|
||
list_coef = cls.coef_[0] # 权重Wj,反映每个位置在区分正负样本时的重要性,用于后续加权特征计算。
|
||
predict_prob_x = cls.predict_proba(X)
|
||
predict_x = predict_prob_x[:, 1]
|
||
auc = roc_auc_score(Y,np.array(predict_x)) # 通过 ROC 曲线评估模型性能,对应论文中的交叉验证 AUC 分数计算。
|
||
print("AUC:" + str(auc))
|
||
return list_coef,auc
|
||
|
||
trainning()
|
||
|
||
|
||
|
||
|