
- 作者:斎藤勇哉
- 発売日: 2020/11/30
- メディア: Kindle版
目次
はじめに
過去の記事でランダムフォレストを使って疾患分類をしたが、そのアップデート版。 今回は、データの標準化およびLiner Discriminant Analysis(LDA)を追加した。 また、結果のまとめがみれるようにClassification ReportとConfusion Matrixが出力されるようにした。
LDA
LDAは、変量の次元を圧縮した特徴量を抽出します。 主成分分析では、各データの分散が最大となるような直行する線を抽出する方法であるのに対し、LDAは、分類するクラスを最もよく分離する直線を引き、そこから1次元に線形写像する方法である。LDAは、主成分分析よりも精度が良いらしい。 ただし、LDAで使用するデータは以下の条件を満たすことが求められる。
- データが正規分布であること
- 各クラスの共分散行列は同じ
- 変量が統計的に互いに独立
実装
使用したコード
# import modules import matplotlib.pyplot as plt import seaborn as sns from matplotlib import pyplot as plt import os import numpy as np import pandas as pd def import_data(file,map,covariate): # import csv file df = pd.read_csv(file) # select biomarker # select map variable = df.loc[:,df.columns.str.contains(map)] # 'ICVF|OD|ISO|MVF' # processing for defect values. variable = variable.fillna(variable.median()) # adjust biomarkers for covariates from sklearn import linear_model def nu_freq(X_reg, y_reg): lr = linear_model.LinearRegression() lr.fit(X_reg, y_reg) return lr.coef_, lr.intercept_ def control_residuals(X, y, covar): control_covar = covar[y == 1].astype(float) control_X = X[y == 1].astype(float) n_biomarkers = control_X.shape[1] regr_params = dict((x, None) for x in range(n_biomarkers)) for i in range(n_biomarkers): mask = ~np.isnan(control_X[:, i]) & ~np.isnan(control_covar).any(axis=1) X_reg, y_reg = control_covar[mask, :], control_X[mask, i] regr_params[i] = nu_freq(X_reg, y_reg) residuals = X.copy() for key, value in regr_params.items(): residuals[:, key] -= regr_params[key][1] residuals[:, key] -= np.matmul(covar, regr_params[key][0]) return residuals ## read in covariates covars = df[covariate].astype(float).values #['Age', 'Gender', 'ICV'] ## adjust biomarkers for covariates adjusted_variable = control_residuals(variable.values, df.Groups.values, covars) ## save data variable.to_csv("./RF_result/pre_adjusted_variable.csv", index=False) adjusted_variable = pd.DataFrame(adjusted_variable, columns=variable.columns) adjusted_variable.to_csv("./RF_result/post_adjusted_variable.csv", index=False) ## select subject adjusted_variable = adjusted_variable[(df['Groups'] == 2)| (df['Groups'] == 3)] Groups = df.Groups[(df['Groups'] == 2)| (df['Groups'] == 3)] true_label = [] for k in range(len(Groups.values)): if Groups.values[k] == 2: true_label.append(0) else: true_label.append(1) true_label = np.array(true_label) return adjusted_variable, true_label, variable.columns def normalize(input): # normalization (じつはTrainだけ正規化すべき) from sklearn.preprocessing import StandardScaler sc = StandardScaler() sc.fit(input) input_std = sc.transform(input) return input_std def lda(input, label,n_components): # LDA (じつはTrainだけfitさせてからTestdataにapply) from sklearn.discriminant_analysis import LinearDiscriminantAnalysis lda = LinearDiscriminantAnalysis(n_components=n_components) input_lda = lda.fit_transform(input,label) return input_lda def random_forest(input, label, param_grid,random_state): # Grid Search ## Create a based model from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import GridSearchCV clf = RandomForestClassifier(random_state=random_state) # random_state=5, 10, Accuracy:0.754 # Instantiate the grid search model grid_search = GridSearchCV(estimator = clf, param_grid = param_grid, cv = 5, n_jobs = -1, verbose = 1) ## Fit the grid search to the data grid_search = grid_search.fit(input, label) # training best_grid = grid_search.best_estimator_ print("\nBest Parameter is below.") print(grid_search.best_params_) print("\n") return best_grid def varidation(input, label, features, best_grid): ## leave-one-out from sklearn.model_selection import cross_val_score from sklearn.metrics import recall_score from sklearn.model_selection import LeaveOneOut scores = cross_val_score(best_grid, input, label, cv = LeaveOneOut()) #分類器としてLeaveOneOut()を指定 print('Cross-Validation scores: {}'.format(scores)) print('Average score: {:.3f}'.format(np.mean(scores))) # label restore pred_label = [] for k in range(len(scores)): if scores[k] == 1: pred_label.append(label[k]) else: if label[k] == 0: pred_label.append(1) else: pred_label.append(0) pred_label = np.array(pred_label) # classification report from sklearn.metrics import classification_report print('=========================Classification Report=========================') print(classification_report(pred_label,label)) print('=========================Classification Report=========================') print('\n') # confusion matrix from sklearn.metrics import confusion_matrix import seaborn as sns plt.figure() mat = confusion_matrix(pred_label,label) sns.heatmap(mat, square=True, annot=True, cbar=False, fmt='d', cmap='RdPu') plt.xlabel('predicted class') plt.ylabel('true value') plt.savefig('./RF_result/confusion_matrix.png') def importance(features,best_grid): # visualyze #features = variable.columns importances = best_grid.feature_importances_ indices = np.argsort(importances) plt.figure(figsize=(10,10)) plt.barh(range(len(indices)), importances[indices], color='b', align='center') plt.yticks(range(len(indices)), features[indices]) plt.savefig('RF_result/importance.png') # save importance values importance = pd.DataFrame({ 'Valiable' :features[indices], 'Importance' :importances[indices]}) importance.to_csv("./RF_result/importance.csv", index=False) def save_trees(best_grid,n_estimators,features): import pydotplus from IPython.display import Image from graphviz import Digraph from sklearn.externals.six import StringIO from sklearn import tree print('\nSaving result to "RF_result" folder...') for i in range(n_estimators): dot_data = StringIO() tree.export_graphviz(best_grid.estimators_[i], out_file=dot_data,feature_names=features) graph = pydotplus.graph_from_dot_data(dot_data.getvalue()) graph.write_png('./RF_result/tree' + str(i) + '.png') def main(): # define map = 'ICVF|OD|ISO|MVF' covariate = ['Age', 'Gender', 'ICV'] n_components = 1 random_state = 5 param_grid = { 'bootstrap': [True], 'max_depth': [120], # [110,115,120,125,130] #'max_features': [1], # [200,212,225,273,250] 'min_samples_leaf': [4], # [4, 5] 'min_samples_split': [8], # [8, 10, 12] 'n_estimators': [130] # [100,110,120,130] } print('Start Random Forest...') # ignore warning import warnings warnings.simplefilter('ignore') # make directory if not os.path.isdir('RF_result'): os.mkdir('RF_result') import sys args = sys.argv data_adjusted, true_label, features = import_data(args[1],map,covariate) data_std = normalize(data_adjusted) data_lda = lda(data_std, true_label,n_components) best_grid = random_forest(data_lda, true_label, param_grid, random_state) varidation(data_lda, true_label, features, best_grid) #importance(features,best_grid) #save_trees(best_grid,param_grid['n_estimators'][0], features) if __name__ == '__main__': main()
2020/05/17更新
- NormalizeとLDAのステップでは、Train dataに対してfittingをし、Train dataから求められたパラメータをTest dataに適応
# import modules import matplotlib.pyplot as plt import seaborn as sns from matplotlib import pyplot as plt import os import numpy as np import pandas as pd def import_data(tr_data, te_data, map, covariate): # import csv file # df = pd.read_csv(file) # separate data to trani and test data # TR:0, TE:1 # tr_data = df[df["TR_TE"] == 0].reset_index() # te_data = df[df["TR_TE"] == 1].reset_index() # print("tr_size:{}, te_size{}".format(tr_data.shape, te_data.shape)) # select map tr_map, te_map = tr_data.loc[:, tr_data.columns.str.contains( map)], te_data.loc[:, te_data.columns.str.contains(map)] # 'ICVF|OD|ISO|MVF' # # processing for defect values. # tr_map, te_map = tr_map.fillna( # tr_map.median()), te_map.fillna(te_map.median()) # adjust biomarkers for covariates from sklearn import linear_model def nu_freq(X_reg, y_reg): lr = linear_model.LinearRegression() lr.fit(X_reg, y_reg) return lr.coef_, lr.intercept_ def control_residuals(X, y, covar, X2, covar2): control_covar = covar[y == 1].astype(float) control_X = X[y == 1].astype(float) n_biomarkers = control_X.shape[1] regr_params = dict((x, None) for x in range(n_biomarkers)) for i in range(n_biomarkers): mask = ~np.isnan(control_X[:, i]) & ~np.isnan( control_covar).any(axis=1) X_reg, y_reg = control_covar[mask, :], control_X[mask, i] regr_params[i] = nu_freq(X_reg, y_reg) residuals = X.copy() # train data residuals2 = X2.copy() # test data for key, value in regr_params.items(): # adjust train data. residuals[:, key] -= regr_params[key][1] residuals[:, key] -= np.matmul(covar, regr_params[key][0]) # adjust test data. residuals2[:, key] -= regr_params[key][1] residuals2[:, key] -= np.matmul(covar2, regr_params[key][0]) return residuals, residuals2 # read in (train) covariates tr_covars, te_covars = tr_data[covariate].astype( float).values, te_data[covariate].astype(float).values # ['Age', 'Gender', 'ICV'] # adjust biomarkers for covariates tr_adjusted_map, te_adjusted_map = control_residuals( tr_map.values, tr_data["Groups"].values, tr_covars, te_map.values, te_covars) # save data # tr_map.to_csv("./RF_result/TR_pre_adjusted_map.csv", index=False) # te_map.to_csv("./RF_result/TE_pre_adjusted_map.csv", index=False) tr_adjusted_map = pd.DataFrame(tr_adjusted_map, columns=tr_map.columns) te_adjusted_map = pd.DataFrame(te_adjusted_map, columns=te_map.columns) # tr_adjusted_map.to_csv("./RF_result/TR_post_adjusted_map.csv", index=False) # te_adjusted_map.to_csv("./RF_result/TE_post_adjusted_map.csv", index=False) # select subject tr_adjusted_map = tr_adjusted_map[( tr_data['Groups'] == 2) | (tr_data['Groups'] == 3)] te_adjusted_map = te_adjusted_map[( te_data['Groups'] == 2) | (te_data['Groups'] == 3)] # print(te_adjusted_map.shape) tr_groups = tr_data.Groups[(tr_data['Groups'] == 2) | (tr_data['Groups'] == 3)] te_groups = te_data.Groups[(te_data['Groups'] == 2) | (te_data['Groups'] == 3)] tr_true_label = [] for k in range(len(tr_groups.values)): if tr_groups.values[k] == 2: tr_true_label.append(0) else: tr_true_label.append(1) tr_true_label = np.array(tr_true_label) te_true_label = [] for k in range(len(te_groups.values)): if te_groups.values[k] == 2: te_true_label.append(0) else: te_true_label.append(1) te_true_label = np.array(te_true_label) return tr_adjusted_map, te_adjusted_map, tr_true_label, te_true_label, tr_map.columns def normalize(tr_adjusted_map, te_adjusted_map): # normalization by mean and S.D. from sklearn.preprocessing import StandardScaler sc = StandardScaler() sc.fit(tr_adjusted_map) tr_adjusted_map_std = sc.transform(tr_adjusted_map) # print("te_adjusted_map_std:{}".format(te_adjusted_map.shape)) te_adjusted_map_std = sc.transform(te_adjusted_map) return tr_adjusted_map_std, te_adjusted_map_std def lda(tr_adjusted_map_std, te_adjusted_map_std, tr_true_label, n_components): # LDA (じつはTrainだけfitさせてからTestdataにapply) from sklearn.discriminant_analysis import LinearDiscriminantAnalysis # Number of components (<= min(n_classes - 1, n_features)) lda = LinearDiscriminantAnalysis(n_components=n_components) tr_adjusted_map_std_lda = lda.fit_transform( tr_adjusted_map_std, tr_true_label) te_adjusted_map_std_lda = lda.transform(te_adjusted_map_std) return tr_adjusted_map_std_lda, te_adjusted_map_std_lda def random_forest(tr_adjusted_map_std_lda, tr_true_label, te_adjusted_map_std_lda, te_true_label, param_grid, random_state): import warnings warnings.simplefilter('ignore') # Grid Search # Create a based model from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import GridSearchCV # random_state=?, Accuracy:? # Instantiate the grid search model clf = RandomForestClassifier(random_state=random_state) grid_search = GridSearchCV(estimator=clf, param_grid=param_grid, cv=5, n_jobs=-1, verbose=0) # Fit the grid search to the data grid_search = grid_search.fit( tr_adjusted_map_std_lda, tr_true_label) # training best_grid = grid_search.best_estimator_ # print("\nBest Parameter is below.") print(grid_search.best_params_) # print("\n") tr_scores = best_grid.score(tr_adjusted_map_std_lda, tr_true_label) te_scores = best_grid.score(te_adjusted_map_std_lda, te_true_label) # print('Train Accuracy: {:.3f}, Test Accuracy: {:.3f}'.format(tr_scores, te_scores)) pred_label = grid_search.predict(te_adjusted_map_std_lda) # print("pred_label:{}, true_label:{}".format(pred_label, te_true_label)) return grid_search, pred_label def varidation(all_pred_label, te_true_label, features, grid_search): # classification report from sklearn.metrics import classification_report print('=========================Classification Report=========================') print(classification_report(all_pred_label, te_true_label)) print('=========================Classification Report=========================') print('\n') # confusion matrix from sklearn.metrics import confusion_matrix import seaborn as sns plt.figure() mat = confusion_matrix(all_pred_label, te_true_label) sns.heatmap(mat, square=True, annot=True, cbar=False, fmt='d', cmap='RdPu') plt.xlabel('predicted class') plt.ylabel('true value') plt.savefig('./RF_result/confusion_matrix.png') def main(): # define map = 'ICVF|OD|ISO|MVF' covariate = ['Age', 'Gender', 'ICV'] n_components = 1 random_state = 5 param_grid = { 'bootstrap': [True], 'max_depth': [110,115,120,125,130], # [110,115,120,125,130] # 'max_features': [1], # [200,212,225,273,250] 'min_samples_leaf': [4, 5], # [4, 5] 'min_samples_split': [8, 10, 12], # [8, 10, 12] 'n_estimators': [100,110,120,130] # [100,110,120,130] } print('Start Random Forest...') # ignore warning import warnings warnings.simplefilter('ignore') # make directory if not os.path.isdir('RF_result'): os.mkdir('RF_result') import sys args = sys.argv df = pd.read_csv(args[1]).replace("-nan","").fillna(0) all_te_true_label, all_pred_label = [], [] for i in range(df.shape[0]): try: # separate data to train and test data, like leave-one-out. tr_data = df[df["No"] != i].reset_index() te_data = df[df["No"] == i].reset_index() tr_adjusted_map, te_adjusted_map, tr_true_label, te_true_label, features = import_data( tr_data, te_data, map, covariate) tr_adjusted_map_std, te_adjusted_map_std = normalize( tr_adjusted_map, te_adjusted_map) tr_adjusted_map_std_lda, te_adjusted_map_std_lda = lda( tr_adjusted_map_std, te_adjusted_map_std, tr_true_label, n_components) grid_search, pred_label = random_forest( tr_adjusted_map_std_lda, tr_true_label, te_adjusted_map_std_lda, te_true_label, param_grid, random_state) all_te_true_label.append(te_true_label) all_pred_label.append(pred_label) except: print("non-targeted subject, No.{}".format(i)) print("all_pred_label:{}, all_te_true_label:{}".format(len(all_pred_label),len(all_te_true_label))) varidation(all_pred_label, all_te_true_label, features, grid_search) if __name__ == '__main__': main()
まとめ
LDAを使うことで10%分類精度が向上した。
今回はTrain data + Test dataで標準化をしたが、本来はTrain dataのみで平均値・分散を算出して標準化し、このTrain dataの平均値・分散を用いてTest dataを標準化すべきであろう。
また、LDAのfittingでもTrain data + Test dataでfittingをしているため、この点このやり方はずるであろう。
2020/05/17更新
上記の問題をクリア。 精度は検証中。
頑張れ!、喝!!の代わりにB!ブックマークを押していただけるとただただうれしいです(^^)! ↓

- 作者:斎藤勇哉
- 発売日: 2020/11/30
- メディア: Kindle版