【初心者向け】基礎&実践プログラミング

初心者がつまづきやすいところ、最短で実力が身につく方法をお伝えします。

【Python】ランダムフォレストで疾患分類〜LDAも交えて〜

はじめに

過去の記事でランダムフォレストを使って疾患分類をしたが、そのアップデート版。 今回は、データの標準化およびLiner Discriminant Analysis(LDA)を追加した。 また、結果のまとめがみれるようにClassification ReportとConfusion Matrixが出力されるようにした。

LDA

LDAは、変量の次元を圧縮した特徴量を抽出します。 主成分分析では、各データの分散が最大となるような直行する線を抽出する方法であるのに対し、LDAは、分類するクラスを最もよく分離する直線を引き、そこから1次元に線形写像する方法である。LDAは、主成分分析よりも精度が良いらしい。 ただし、LDAで使用するデータは以下の条件を満たすことが求められる。

  • データが正規分布であること
  • 各クラスの共分散行列は同じ
  • 変量が統計的に互いに独立

実装

使用したコード

# import modules
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib import pyplot as plt
import os
import numpy as np
import pandas as pd

def import_data(file,map,covariate):
    # import csv file
    df = pd.read_csv(file)
    # select biomarker
    # select map
    variable = df.loc[:,df.columns.str.contains(map)] # 'ICVF|OD|ISO|MVF'
    # processing for defect values.
    variable = variable.fillna(variable.median()) 
    # adjust biomarkers for covariates
    from sklearn import linear_model
    def nu_freq(X_reg, y_reg):
        lr = linear_model.LinearRegression()
        lr.fit(X_reg, y_reg)
        return lr.coef_, lr.intercept_

    def control_residuals(X, y, covar):
        control_covar = covar[y == 1].astype(float)
        control_X = X[y == 1].astype(float)
        n_biomarkers = control_X.shape[1]
        regr_params = dict((x, None) for x in range(n_biomarkers))
        for i in range(n_biomarkers):
            mask = ~np.isnan(control_X[:, i]) & ~np.isnan(control_covar).any(axis=1)
            X_reg, y_reg = control_covar[mask, :], control_X[mask, i]
            regr_params[i] = nu_freq(X_reg, y_reg)
        residuals = X.copy()
        for key, value in regr_params.items():
            residuals[:, key] -= regr_params[key][1]
            residuals[:, key] -= np.matmul(covar, regr_params[key][0])
        return residuals
    ## read in covariates
    covars = df[covariate].astype(float).values #['Age', 'Gender', 'ICV']
    ## adjust biomarkers for covariates
    adjusted_variable = control_residuals(variable.values, df.Groups.values, covars)
    ## save data 
    variable.to_csv("./RF_result/pre_adjusted_variable.csv", index=False)
    adjusted_variable = pd.DataFrame(adjusted_variable, columns=variable.columns)
    adjusted_variable.to_csv("./RF_result/post_adjusted_variable.csv", index=False)

    ## select subject
    adjusted_variable = adjusted_variable[(df['Groups'] == 2)| (df['Groups'] == 3)]
    Groups = df.Groups[(df['Groups'] == 2)| (df['Groups'] == 3)]
    true_label = []
    for k in range(len(Groups.values)):
        if Groups.values[k] == 2:
            true_label.append(0)
        else:
            true_label.append(1)
    true_label = np.array(true_label)
    return adjusted_variable, true_label, variable.columns

def normalize(input):
    # normalization (じつはTrainだけ正規化すべき)
    from sklearn.preprocessing import StandardScaler
    sc = StandardScaler()
    sc.fit(input)
    input_std = sc.transform(input)
    return input_std

def lda(input, label,n_components):
    # LDA (じつはTrainだけfitさせてからTestdataにapply)
    from sklearn.discriminant_analysis import  LinearDiscriminantAnalysis
    lda = LinearDiscriminantAnalysis(n_components=n_components)
    input_lda = lda.fit_transform(input,label)
    return input_lda

def random_forest(input, label, param_grid,random_state):
    # Grid Search
    ## Create a based model
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import GridSearchCV
    clf = RandomForestClassifier(random_state=random_state) # random_state=5, 10, Accuracy:0.754 # Instantiate the grid search model
    grid_search = GridSearchCV(estimator = clf, param_grid = param_grid, 
                            cv = 5, n_jobs = -1, verbose = 1)
    ## Fit the grid search to the data
    grid_search = grid_search.fit(input, label) # training
    best_grid = grid_search.best_estimator_
    print("\nBest Parameter is below.")
    print(grid_search.best_params_)
    print("\n")
    return best_grid

def varidation(input, label, features, best_grid):
    ## leave-one-out
    from sklearn.model_selection import cross_val_score
    from sklearn.metrics import recall_score
    from sklearn.model_selection import LeaveOneOut
    scores = cross_val_score(best_grid, input, label, cv = LeaveOneOut()) #分類器としてLeaveOneOut()を指定
    print('Cross-Validation scores: {}'.format(scores))
    print('Average score: {:.3f}'.format(np.mean(scores)))

    # label restore
    pred_label = []
    for k in range(len(scores)):
        if scores[k] == 1:
            pred_label.append(label[k])
        else:
            if label[k] == 0:
                pred_label.append(1)
            else:
                pred_label.append(0)
    pred_label = np.array(pred_label)
    
    # classification report
    from sklearn.metrics import classification_report
    print('=========================Classification Report=========================')
    print(classification_report(pred_label,label))
    print('=========================Classification Report=========================')
    print('\n')
    
    # confusion matrix
    from sklearn.metrics import confusion_matrix
    import seaborn as sns
    plt.figure()
    mat = confusion_matrix(pred_label,label)
    sns.heatmap(mat, square=True, annot=True, cbar=False, fmt='d', cmap='RdPu')
    plt.xlabel('predicted class')
    plt.ylabel('true value')
    plt.savefig('./RF_result/confusion_matrix.png')

def importance(features,best_grid):
    # visualyze
    #features = variable.columns
    importances = best_grid.feature_importances_
    indices = np.argsort(importances)
    plt.figure(figsize=(10,10))
    plt.barh(range(len(indices)), importances[indices], color='b', align='center')
    plt.yticks(range(len(indices)), features[indices])
    plt.savefig('RF_result/importance.png')
    # save importance values
    importance = pd.DataFrame({ 'Valiable' :features[indices], 'Importance' :importances[indices]})
    importance.to_csv("./RF_result/importance.csv", index=False)

def save_trees(best_grid,n_estimators,features):
    import pydotplus
    from IPython.display import Image
    from graphviz import Digraph
    from sklearn.externals.six import StringIO
    from sklearn import tree
    print('\nSaving result to "RF_result" folder...')
    for i in range(n_estimators):
        dot_data = StringIO()
        tree.export_graphviz(best_grid.estimators_[i], out_file=dot_data,feature_names=features)
        graph = pydotplus.graph_from_dot_data(dot_data.getvalue())
        graph.write_png('./RF_result/tree' + str(i) + '.png')

def main():
    # define
    map = 'ICVF|OD|ISO|MVF'
    covariate = ['Age', 'Gender', 'ICV']
    n_components = 1
    random_state = 5
    param_grid = {
    'bootstrap': [True],
    'max_depth': [120], # [110,115,120,125,130]
    #'max_features': [1], # [200,212,225,273,250]
    'min_samples_leaf': [4], # [4, 5]
    'min_samples_split': [8], # [8, 10, 12]
    'n_estimators': [130] # [100,110,120,130]
    } 
    print('Start Random Forest...')
    # ignore warning
    import warnings
    warnings.simplefilter('ignore')
    # make directory
    if not os.path.isdir('RF_result'):
        os.mkdir('RF_result')

    import sys
    args = sys.argv
    data_adjusted, true_label, features = import_data(args[1],map,covariate)
    data_std =  normalize(data_adjusted)
    data_lda = lda(data_std, true_label,n_components)
    best_grid = random_forest(data_lda, true_label, param_grid, random_state)
    varidation(data_lda, true_label, features, best_grid)
    #importance(features,best_grid)
    #save_trees(best_grid,param_grid['n_estimators'][0], features)

if __name__ == '__main__':
    main()

2020/05/17更新

  • NormalizeとLDAのステップでは、Train dataに対してfittingをし、Train dataから求められたパラメータをTest dataに適応
# import modules
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib import pyplot as plt
import os
import numpy as np
import pandas as pd


def import_data(tr_data, te_data, map, covariate):
    # import csv file
    # df = pd.read_csv(file)
    # separate data to trani and test data
    # TR:0, TE:1
    # tr_data = df[df["TR_TE"] == 0].reset_index()
    # te_data = df[df["TR_TE"] == 1].reset_index()
    # print("tr_size:{}, te_size{}".format(tr_data.shape, te_data.shape))
    # select map
    tr_map, te_map = tr_data.loc[:, tr_data.columns.str.contains(
        map)], te_data.loc[:, te_data.columns.str.contains(map)]  # 'ICVF|OD|ISO|MVF'
    # # processing for defect values.
    # tr_map, te_map = tr_map.fillna(
    #     tr_map.median()), te_map.fillna(te_map.median())
    # adjust biomarkers for covariates
    from sklearn import linear_model

    def nu_freq(X_reg, y_reg):
        lr = linear_model.LinearRegression()
        lr.fit(X_reg, y_reg)
        return lr.coef_, lr.intercept_

    def control_residuals(X, y, covar, X2, covar2):
        control_covar = covar[y == 1].astype(float)
        control_X = X[y == 1].astype(float)
        n_biomarkers = control_X.shape[1]
        regr_params = dict((x, None) for x in range(n_biomarkers))
        for i in range(n_biomarkers):
            mask = ~np.isnan(control_X[:, i]) & ~np.isnan(
                control_covar).any(axis=1)
            X_reg, y_reg = control_covar[mask, :], control_X[mask, i]
            regr_params[i] = nu_freq(X_reg, y_reg)
        residuals = X.copy()  # train data
        residuals2 = X2.copy()  # test data
        for key, value in regr_params.items():
            # adjust train data.
            residuals[:, key] -= regr_params[key][1]
            residuals[:, key] -= np.matmul(covar, regr_params[key][0])
            # adjust test data.
            residuals2[:, key] -= regr_params[key][1]
            residuals2[:, key] -= np.matmul(covar2, regr_params[key][0])
        return residuals, residuals2
    # read in (train) covariates
    tr_covars, te_covars = tr_data[covariate].astype(
        float).values, te_data[covariate].astype(float).values  # ['Age', 'Gender', 'ICV']
    # adjust biomarkers for covariates
    tr_adjusted_map, te_adjusted_map = control_residuals(
        tr_map.values, tr_data["Groups"].values, tr_covars, te_map.values, te_covars)
    # save data
    # tr_map.to_csv("./RF_result/TR_pre_adjusted_map.csv", index=False)
    # te_map.to_csv("./RF_result/TE_pre_adjusted_map.csv", index=False)
    tr_adjusted_map = pd.DataFrame(tr_adjusted_map, columns=tr_map.columns)
    te_adjusted_map = pd.DataFrame(te_adjusted_map, columns=te_map.columns)
    # tr_adjusted_map.to_csv("./RF_result/TR_post_adjusted_map.csv", index=False)
    # te_adjusted_map.to_csv("./RF_result/TE_post_adjusted_map.csv", index=False)

    # select subject
    tr_adjusted_map = tr_adjusted_map[(
        tr_data['Groups'] == 2) | (tr_data['Groups'] == 3)]
    te_adjusted_map = te_adjusted_map[(
        te_data['Groups'] == 2) | (te_data['Groups'] == 3)]
    # print(te_adjusted_map.shape)

    tr_groups = tr_data.Groups[(tr_data['Groups'] == 2)
                               | (tr_data['Groups'] == 3)]
    te_groups = te_data.Groups[(te_data['Groups'] == 2)
                               | (te_data['Groups'] == 3)]

    tr_true_label = []
    for k in range(len(tr_groups.values)):
        if tr_groups.values[k] == 2:
            tr_true_label.append(0)
        else:
            tr_true_label.append(1)
    tr_true_label = np.array(tr_true_label)

    te_true_label = []
    for k in range(len(te_groups.values)):
        if te_groups.values[k] == 2:
            te_true_label.append(0)
        else:
            te_true_label.append(1)
    te_true_label = np.array(te_true_label)

    return tr_adjusted_map, te_adjusted_map, tr_true_label, te_true_label, tr_map.columns


def normalize(tr_adjusted_map, te_adjusted_map):
    # normalization by mean and S.D.
    from sklearn.preprocessing import StandardScaler
    sc = StandardScaler()
    sc.fit(tr_adjusted_map)
    tr_adjusted_map_std = sc.transform(tr_adjusted_map)
    # print("te_adjusted_map_std:{}".format(te_adjusted_map.shape))
    te_adjusted_map_std = sc.transform(te_adjusted_map)
    return tr_adjusted_map_std, te_adjusted_map_std


def lda(tr_adjusted_map_std, te_adjusted_map_std, tr_true_label, n_components):
    # LDA (じつはTrainだけfitさせてからTestdataにapply)
    from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
    # Number of components (<= min(n_classes - 1, n_features))
    lda = LinearDiscriminantAnalysis(n_components=n_components)
    tr_adjusted_map_std_lda = lda.fit_transform(
        tr_adjusted_map_std, tr_true_label)
    te_adjusted_map_std_lda = lda.transform(te_adjusted_map_std)
    return tr_adjusted_map_std_lda, te_adjusted_map_std_lda


def random_forest(tr_adjusted_map_std_lda, tr_true_label, te_adjusted_map_std_lda, te_true_label, param_grid, random_state):
    import warnings
    warnings.simplefilter('ignore')
    # Grid Search
    # Create a based model
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import GridSearchCV
    # random_state=?, Accuracy:? # Instantiate the grid search model
    clf = RandomForestClassifier(random_state=random_state)
    grid_search = GridSearchCV(estimator=clf, param_grid=param_grid,
                               cv=5, n_jobs=-1, verbose=0)
    # Fit the grid search to the data
    grid_search = grid_search.fit(
        tr_adjusted_map_std_lda, tr_true_label)  # training
    best_grid = grid_search.best_estimator_
    # print("\nBest Parameter is below.")
    print(grid_search.best_params_)
    # print("\n")
    tr_scores = best_grid.score(tr_adjusted_map_std_lda, tr_true_label)
    te_scores = best_grid.score(te_adjusted_map_std_lda, te_true_label)
    # print('Train Accuracy: {:.3f}, Test Accuracy: {:.3f}'.format(tr_scores, te_scores))
    pred_label = grid_search.predict(te_adjusted_map_std_lda)
    # print("pred_label:{}, true_label:{}".format(pred_label, te_true_label))
    return grid_search, pred_label


def varidation(all_pred_label, te_true_label, features, grid_search):
    # classification report
    from sklearn.metrics import classification_report
    print('=========================Classification Report=========================')
    print(classification_report(all_pred_label, te_true_label))
    print('=========================Classification Report=========================')
    print('\n')

    # confusion matrix
    from sklearn.metrics import confusion_matrix
    import seaborn as sns
    plt.figure()
    mat = confusion_matrix(all_pred_label, te_true_label)
    sns.heatmap(mat, square=True, annot=True, cbar=False, fmt='d', cmap='RdPu')
    plt.xlabel('predicted class')
    plt.ylabel('true value')
    plt.savefig('./RF_result/confusion_matrix.png')


def main():
    # define
    map = 'ICVF|OD|ISO|MVF'
    covariate = ['Age', 'Gender', 'ICV']
    n_components = 1
    random_state = 5
    param_grid = {
        'bootstrap': [True],
        'max_depth': [110,115,120,125,130],  # [110,115,120,125,130]
        # 'max_features': [1], # [200,212,225,273,250]
        'min_samples_leaf': [4, 5],  # [4, 5]
        'min_samples_split': [8, 10, 12],  # [8, 10, 12]
        'n_estimators': [100,110,120,130]  # [100,110,120,130]
    }
    print('Start Random Forest...')
    # ignore warning
    import warnings
    warnings.simplefilter('ignore')
    # make directory
    if not os.path.isdir('RF_result'):
        os.mkdir('RF_result')

    import sys
    args = sys.argv
    df = pd.read_csv(args[1]).replace("-nan","").fillna(0)
    all_te_true_label, all_pred_label = [], []
    for i in range(df.shape[0]):
        try:
            # separate data to train and test data, like leave-one-out.
            tr_data = df[df["No"] != i].reset_index()
            te_data = df[df["No"] == i].reset_index()

            tr_adjusted_map, te_adjusted_map, tr_true_label, te_true_label, features = import_data(
                tr_data, te_data, map, covariate)
            tr_adjusted_map_std, te_adjusted_map_std = normalize(
                tr_adjusted_map, te_adjusted_map)
            tr_adjusted_map_std_lda, te_adjusted_map_std_lda = lda(
                tr_adjusted_map_std, te_adjusted_map_std, tr_true_label, n_components)
            grid_search, pred_label = random_forest(
                tr_adjusted_map_std_lda, tr_true_label, te_adjusted_map_std_lda, te_true_label, param_grid, random_state)
            all_te_true_label.append(te_true_label)
            all_pred_label.append(pred_label)
        except:
            print("non-targeted subject, No.{}".format(i))
    print("all_pred_label:{}, all_te_true_label:{}".format(len(all_pred_label),len(all_te_true_label)))
    varidation(all_pred_label, all_te_true_label, features, grid_search)


if __name__ == '__main__':
    main()

まとめ

LDAを使うことで10%分類精度が向上した。 今回はTrain data + Test dataで標準化をしたが、本来はTrain dataのみで平均値・分散を算出して標準化し、このTrain dataの平均値・分散を用いてTest dataを標準化すべきであろう。 また、LDAのfittingでもTrain data + Test dataでfittingをしているため、この点このやり方はずるであろう。

2020/05/17更新

上記の問題をクリア。 精度は検証中。



頑張れ!喝!!の代わりにB!ブックマークを押していただけるとただただうれしいです(^^)! ↓