Simple Keras Neural Network Example

Published on Aug. 16, 2019, 10:43 p.m.

This example demonstraters a simple classification neural network.

import os
import time
import tensorflow as tf
import keras.initializers
import numpy as np
from keras.layers import Input, Dense
from keras.models import Model
from keras.callbacks import TensorBoard, EarlyStopping
from keras.backend import clear_session
from ver_3.machine_learning.keras_models.custom_callbacks import TrainPlotCallback
from ver_3.functions.data_functions import get_train_test_data
from ver_3.machine_learning.keras_models.custom_metrics import tp, fn, fp, tn


def classification_model(network_name_str, X, y, output_folder_path_str, data_splitting_list, general_conf_param_list, optimizer_conf_param_list, stopping_criteria_list=('Epoch Number', 0.5, 100), signal_list=None):
    """
    This function receives the train data and the corresponding labels, and trains classification model with specified configurations
    :param network_name_str:
    :param X: numpy ndarray representing the train data.
    :param y: numpy ndarray representing the train labels.
    :param output_folder_path_str: the path to the output root folder (String).
    :param data_splitting_list:
    :param general_conf_param_list: (models_number=1, hidden_layers_number=1, neurons_number=20, activation_func='relu', weight_init_func='PCA-GC', batch_size=100, validation_split=0.2, epochs=10
    :param optimizer_conf_param_list: a list of type [String, float, ]containing the configuration parameters of the optimizer to use.
    :param stopping_criteria_list: a list of type [Stopping criteria name (String), error threshold (float), epoch number (int)] that represents the desired way to stop training.
    :param signal_list:
    :return: None
    """

    # ===================== GENERAL_CONF_PARAM_LIST =====================
    number_of_models = general_conf_param_list[0]
    number_of_layers = general_conf_param_list[1]
    number_of_neurons_in_layer_int_list = general_conf_param_list[2]
    neuron_activation_function_in_layer_str_list = general_conf_param_list[3]
    loss_function_str = general_conf_param_list[4]
    weight_initialization_function_str = general_conf_param_list[5]
    number_of_epochs = general_conf_param_list[6]
    # ===================== GENERAL_CONF_PARAM_LIST =====================
    # ///////////////////// WEIGHT INITIALIZATION FUNCTION CONFIGURATIONS /////////////////////
    weight_initialization_function = tf.keras.initializers.Orthogonal()
    if weight_initialization_function_str == 'Normal (mean=0, std=1)':
        weight_initialization_function = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.05, seed=None)
    elif weight_initialization_function_str == 'Xavier':
        weight_initialization_function = keras.initializers.glorot_normal(seed=None)
    elif weight_initialization_function_str == 'Uniform [0, 1]':
        weight_initialization_function = keras.initializers.RandomUniform(minval=-0.05, maxval=0.05, seed=None)
    # ///////////////////// WEIGHT INITIALIZATION FUNCTION CONFIGURATIONS /////////////////////

    # ===================== OPTIMIZER_CONF_PARAM_LIST =====================
    optimizer_name_str = optimizer_conf_param_list[0]
    lr = optimizer_conf_param_list[1]
    momentum = optimizer_conf_param_list[2]
    decay = optimizer_conf_param_list[3]
    rho = optimizer_conf_param_list[4]
    epsilon = optimizer_conf_param_list[5]
    beta_1 = optimizer_conf_param_list[6]
    beta_2 = optimizer_conf_param_list[7]
    clip_norm = optimizer_conf_param_list[8]
    clip_value = optimizer_conf_param_list[9]
    nesterov = optimizer_conf_param_list[10]
    amsgrad = optimizer_conf_param_list[11]
    # ===================== OPTIMIZER_CONF_PARAM_LIST =====================
    # ///////////////////// OPTIMIZER CONFIGURATIONS /////////////////////
    if optimizer_name_str == 'Stochastic Gradient Descent':
        optimizer_func = keras.optimizers.SGD(lr=lr, momentum=momentum, decay=decay, nesterov=nesterov, clipnorm=clip_norm, clipvalue=clip_value)
    elif optimizer_name_str == 'RSMprop':
        optimizer_func = keras.optimizers.RMSprop(lr=lr, decay=decay, rho=rho, epsilon=epsilon, clipnorm=clip_norm, clipvalue=clip_value)
    elif optimizer_name_str == 'Adagrad':
        optimizer_func = keras.optimizers.Adagrad(lr=lr, decay=decay, epsilon=epsilon, clipnorm=clip_norm, clipvalue=clip_value)
    elif optimizer_name_str == 'Adadelta':
        optimizer_func = keras.optimizers.Adadelta(lr=lr, decay=decay, rho=rho, epsilon=epsilon, clipnorm=clip_norm, clipvalue=clip_value)
    elif optimizer_name_str == 'Adam':
        optimizer_func = keras.optimizers.Adam(lr=lr, decay=decay, epsilon=None, beta_1=beta_1, beta_2=beta_2, amsgrad=amsgrad, clipnorm=clip_norm, clipvalue=clip_value)
    elif optimizer_name_str == 'Adamax':
        optimizer_func = keras.optimizers.Adamax(lr=lr, decay=decay, epsilon=None, beta_1=beta_1, beta_2=beta_2, clipnorm=clip_norm, clipvalue=clip_value)
    elif optimizer_name_str == 'Nadam':
        optimizer_func = keras.optimizers.Nadam(lr=lr, schedule_decay=decay, epsilon=None, beta_1=beta_1, beta_2=beta_2, clipnorm=clip_norm, clipvalue=clip_value)
    else:
        optimizer_func = keras.optimizers.Adam()  # lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False, clipnorm=clip_norm, clipvalue=clip_value
    # ///////////////////// OPTIMIZER CONFIGURATIONS /////////////////////

    # ===================== STOPPING_CRITERIA_LIST =====================
    stopping_criteria_str = stopping_criteria_list[0]
    baseline = stopping_criteria_list[1]
    patience = stopping_criteria_list[2]
    # ===================== STOPPING_CRITERIA_LIST =====================
    # ///////////////////// STOPPING CRITERIA CONFIGURATIONS /////////////////////
    if stopping_criteria_str == 'Error Monitoring':  # run until you encounter the threshold error for specified number of epochs
        early_stopping_callback = EarlyStopping(monitor='val_acc',  patience=patience, verbose=1, mode='max', baseline=baseline)
    elif stopping_criteria_str == 'Error Threshold':  # run until you encounter the threshold error for one epoch
        early_stopping_callback = EarlyStopping(monitor='val_acc', patience=patience, verbose=1, mode='min', baseline=baseline)
    else:  # if no early stopping is defined - run all the specified epochs
        early_stopping_callback = EarlyStopping(monitor='val_acc', min_delta=baseline, patience=patience, verbose=1, mode='auto')
    # ///////////////////// STOPPING CRITERIA CONFIGURATIONS /////////////////////

    # ===================== SIGNAL_LIST =====================
    validation_type_signal = signal_list[0]
    epoch_end_signal = signal_list[1]
    test_results_signal = signal_list[2]
    error_signal = signal_list[3]
    # ===================== SIGNAL_LIST =====================
    # ///////////////////// CALLBACKS /////////////////////
    callback_list = [early_stopping_callback]
    plot_losses = TrainPlotCallback(output_folder_path_str, epoch_end_signal)
    callback_list.append(plot_losses)
    # ///////////////////// CALLBACKS /////////////////////

    # ===================== DATA_SPLITTING_LIST =====================
    k_fold_cross_validation_data_split = data_splitting_list[0]
    train_test_data_split_proportion = data_splitting_list[1]
    train_validation_data_split_proportion = data_splitting_list[2]
    batch_size = data_splitting_list[3]
    # ===================== DATA_SPLITTING_LIST =====================
    # ///////////////////// DATA SPLIT /////////////////////
    X_train_data, y_train_data, X_test_data, y_test_data = get_train_test_data(X=X, y=y, k_fold_cross_validation=k_fold_cross_validation_data_split, train_test_proportion=train_test_data_split_proportion, validation_type_signal=validation_type_signal, error_signal=error_signal)
    # ///////////////////// DATA SPLIT /////////////////////

    time_list = []

    loss_list = []
    acc_list = []
    mse_list = []
    mae_list = []
    mape_list = []
    cosine_list = []
    test_tp_list = []
    test_fn_list = []
    test_tn_list = []
    test_fp_list = []

    k_split_index = 0
    # GENERATES MODELS AS THE PARAMETER AND FOR EACH MODEL CROSS VALIDATES THE RESULTS (OVERKILL ? )
    for model_index in range(number_of_models):

        # ///////////////////// NAME /////////////////////
        name = '{}) {} (k - {}, type - {}, ts - {})'.format(model_index + 1, network_name_str, k_split_index + 1, 'classification', int(time.time()))
        # ///////////////////// NAME /////////////////////

        # ///////////////////// CALLBACKS /////////////////////
        tensorboard = TensorBoard(log_dir='logs/{}'.format(name))
        callback_list.append(tensorboard)
        # ///////////////////// CALLBACKS /////////////////////

        # ///////////////////// MODEL TRAINING /////////////////////
        for X_train, y_train, X_test, y_test in zip(X_train_data, y_train_data, X_test_data, y_test_data):
            # ///////////////////// CONVERT THE DATA INTO NUMPY NDARRAYS /////////////////////
            X_train = np.array(X_train)
            y_train = np.array(y_train)
            X_test = np.array(X_test)
            y_test = np.array(y_test)
            # ///////////////////// CONVERT THE DATA INTO NUMPY NDARRAYS /////////////////////
            print('CLASSIFICATION - {} : {}'.format(type(X_train), X_train))
            print('CLASSIFICATION - {} : {}'.format(type(y_train), y_train))
            # MODEL CREATION

            # ///////////////////// INPUT LAYER /////////////////////
            start = time.time()
            input_size = X_train.shape[1]
            input_layer = Input(shape=(input_size,))

            print('Input Size : {}'.format(input_size))
            print('X_train.shape : {}'.format(X_train.shape))
            # layer = Dense(units=20, activation='relu', kernel_initializer=weight_initialization_function)(input_layer)

            # ///////////////////// INPUT LAYER /////////////////////

            # ///////////////////// HIDDEN LAYERS /////////////////////
            layer = Dense(units=number_of_neurons_in_layer_int_list[0], activation=neuron_activation_function_in_layer_str_list[0], kernel_initializer=weight_initialization_function)(input_layer)
            for layer_number in range(number_of_layers):
                print('Neurons : {}, Activation : {}, Weight Init. : {}'.format(number_of_neurons_in_layer_int_list[layer_number], neuron_activation_function_in_layer_str_list[layer_number], weight_initialization_function))
                if layer_number == 0:
                    print(1)
                    pass
                else:
                    print(2)
                    layer = Dense(units=number_of_neurons_in_layer_int_list[layer_number], activation=neuron_activation_function_in_layer_str_list[layer_number], kernel_initializer=weight_initialization_function)(layer)
            # ///////////////////// HIDDEN LAYERS /////////////////////

            # ///////////////////// OUTPUT LAYERS /////////////////////
            predictions = Dense(len(y_train[0]), activation='softmax')(layer)
            # ///////////////////// OUTPUT LAYERS /////////////////////

            # ///////////////////// MODEL DEFINITION /////////////////////
            model = Model(inputs=input_layer, outputs=predictions)
            model.compile(optimizer=optimizer_func,
                          loss=loss_function_str,
                          metrics=['accuracy', 'mse', 'mae', 'mape', 'cosine', tp, fn, fp, tn])
            # ///////////////////// MODEL DEFINITION /////////////////////

            model.fit(X_train, y_train, validation_split=train_validation_data_split_proportion, batch_size=batch_size, epochs=number_of_epochs, callbacks=callback_list)
            time_list.append(time.time() - start)
            # ///////////////////// MODEL TRAINING /////////////////////

            # ///////////////////// SAVE MODEL /////////////////////
            classification_models_save_path = os.path.join(output_folder_path_str, 'Classification Models')
            if not os.path.exists(classification_models_save_path):
                os.mkdir(classification_models_save_path)

            model_save_path = os.path.join(classification_models_save_path, network_name_str)
            if not os.path.exists(model_save_path):
                os.mkdir(model_save_path)
            model.save(os.path.join(model_save_path, name + '.h5'))
            # ///////////////////// SAVE MODEL /////////////////////

            # ///////////////////// MODEL TESTING /////////////////////
            if (X_test is not None and len(X_test) > 0) and (y_test is not None and len(y_test) > 0):
                test_loss, test_acc, test_mse, test_mae, test_mape, test_cosine, test_tp, test_fn, test_tn, test_fp = model.evaluate(x=X_test, y=y_test)
                loss_list.append(test_loss)
                acc_list.append(test_acc)
                mse_list.append(test_mse)
                mae_list.append(test_mae)
                mape_list.append(test_mape)
                cosine_list.append(test_cosine)
                test_tp_list.append(test_tp)
                test_fn_list.append(test_fn)
                test_tn_list.append(test_tn)
                test_fp_list.append(test_fp)

            # ///////////////////// MODEL TESTING /////////////////////
            print('*')
            # CLEAR THE SESSION EACH ON ROUND
            print('***')

                       clear_session()
            k_split_index += 1
            print('k-split = {}'.format(k_split_index))

        # FOR EACH VALIDATION EPOCH END - SENT THE MEAN TEST ERROR AND ITS' STANDARD DEVIATION LISTS
        # test_results_signal.emit((np.mean(loss_list), np.mean(acc_list), np.mean(mse_list), np.mean(mae_list), np.mean(mape_list), np.mean(cosine_list)), (np.std(loss_list), np.std(acc_list), np.std(mse_list), np.std(mae_list), np.std(mape_list), np.std(cosine_list)), (test_tp, test_fn, test_tn, test_fp))
        test_results_signal.emit((np.mean(loss_list), np.mean(acc_list), np.mean(mse_list), np.mean(mae_list), np.mean(mape_list), np.mean(cosine_list), np.mean(test_tp_list), np.mean(test_fn_list), np.mean(test_tn_list), np.mean(test_fp_list)), (np.std(loss_list), np.std(acc_list), np.std(mse_list), np.std(mae_list), np.std(mape_list), np.std(cosine_list), np.std(test_tp_list), np.std(test_fn_list), np.std(test_tn_list), np.std(test_fp_list)))

# if __name__ == '__main__':
#     train_data_path = 'D:\\OneDrive\\Development\\Python\\DeepLearning\\nn_p_checker\\pickled_flattened_normalized_data\\Train'
#     validation_data_path = 'D:\\OneDrive\\Development\\Python\\DeepLearning\\nn_p_checker\\pickled_flattened_normalized_data\\Validation'
#     test_data_path = 'D:\\OneDrive\\Development\\Python\\DeepLearning\\nn_p_checker\\pickled_flattened_normalized_data\\Test'
#     output_folder_path = 'D:\\OneDrive\\Development\\Python\\Neural Net Studio\\ver_3\\output'
#
#     classification_model(train_data_path=train_data_path,
#                          validation_data_path=validation_data_path,
#                          test_data_path=test_data_path,
#                          output_folder_path=output_folder_path,
#                          models_number=2)
  • Simple Keras Neural Network Example
    (currently viewing)
  • Custom Callbacks