diff --git a/.gitignore b/.gitignore index b7e6331..40b0699 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ experimental/ .idea/ feat_extraction +tmp/ +tests/ diff --git a/BioAutoML-binary.py b/BioAutoML-binary.py index 4037795..018dc88 100644 --- a/BioAutoML-binary.py +++ b/BioAutoML-binary.py @@ -12,6 +12,8 @@ import joblib # import shutil import xgboost as xgb +import matplotlib.pyplot as plt +import shap from sklearn.metrics import roc_auc_score from sklearn.model_selection import cross_val_predict # from sklearn.metrics import multilabel_confusion_matrix @@ -47,7 +49,12 @@ from hyperopt import hp, fmin, tpe, STATUS_OK, Trials from sklearn.preprocessing import LabelEncoder from tpot import TPOTClassifier - +from reportlab.lib.enums import TA_JUSTIFY, TA_CENTER +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image +from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle +from numpy.random import default_rng +from interpretability_report import Report, REPORT_MAIN_TITLE_BINARY, REPORT_SHAP_PREAMBLE_BINARY, REPORT_SHAP_BAR_BINARY, \ + REPORT_SHAP_BEESWARM_BINARY, REPORT_SHAP_WATERFALL_BINARY def header(output_header): @@ -461,10 +468,122 @@ def save_prediction(prediction, nameseqs, pred_output): file.write('\n') return +def type_model(explainer, model, data, labels): + """ + Check the type of exit and modify the "shap" structure as is necessary in the next function. + """ + + + shap_values = explainer(data) + xgbtype = "" + cattype = "" + lgbmtype = "" + randtype = "" + assert lgbmtype == str(type(model)) or randtype == str(type(model)) or xgbtype == str(type(model))\ + or cattype == str(type(model)), "Error: Model type don't expected " + + if lgbmtype == str(type(model) ) or randtype == str(type(model)): + shap_values = shap_values[:, :, 0] + if xgbtype == str(type(model)): + labels = le.fit_transform(labels) + + return shap_values, labels + +def shap_waterf(explainer, model, X_test, X_label, path): + """ + To do two waterfall graph for each classes in the problem. + """ + graphs_path = [] + X_label= pd.DataFrame(data={'label': X_label}) + classes = X_label.iloc[:,0].unique() + + assert len(classes) == 2,\ + "Error: Classes generated by the explainer of 'model' doesn't match the distinct number " +\ + f"of classes in 'targets'. [Explainer={2}, Target={len(classes)}]" + + for i in range(2): + # made a subset with only one class + subset = X_test[X_label.label==classes[i]] + shap_values, classes = type_model(explainer, model, subset, classes) + + # choose two samples from a given class + numbers = default_rng().choice(range(1, subset.shape[0]), size=(2), replace=False) + + for j in numbers: + waterfall_name = 'class_' + str(classes[i]) + '_sample_' +str(j) + local_name = os.path.join(path, f"{waterfall_name}.png") + plt.title(waterfall_name, fontsize=16) + sp = shap.plots.waterfall(shap_values[j], show=False) + plt.savefig(local_name, dpi=300,bbox_inches='tight') + plt.close(sp) + graphs_path.append(local_name) + # return the graph paths + return graphs_path + + +def shap_bar(shap_values, path, fig_name): + + local_name = os.path.join(path, f"{fig_name}.png") + plt.title(fig_name, fontsize=16) + sp = shap.plots.bar(shap_values, show=False) + plt.savefig(local_name, dpi=300,bbox_inches='tight') + plt.close(sp) + return local_name + +def shap_beeswarm(shap_values, path, fig_name): + + local_name = os.path.join(path, f"{fig_name}.png") + plt.title(fig_name, fontsize=16) + sp = shap.plots.beeswarm(shap_values, show=False) + plt.savefig(local_name, dpi=300,bbox_inches='tight') + plt.close(sp) + return local_name + + +def interp_shap(model, X_test, X_label,output,path='explanations'): + """ + To do all types of graphs for interpretability by shap values. + """ + path = os.path.join(output,path) + generated_plt = {} + explainer = shap.TreeExplainer(model,feature_perturbation="tree_path_dependent") + + shap_values, X_label = type_model(explainer, model, X_test, X_label) + + if not os.path.exists(path): + print(f"Creating explanations directory: {path}...") + os.mkdir(path) + else: + print(f"Directory {path} already exists. Will proceed using it...") + + generated_plt['bar_graph']=[shap_bar(shap_values, path, fig_name='bar_graph')] + generated_plt['beeswarm_graph']=[shap_beeswarm(shap_values, path, fig_name='beeswarm_graph')] + generated_plt['waterfall_graph']=shap_waterf(explainer, model, X_test, X_label, path) + return generated_plt + + +def build_interpretability_report(generated_plt=[], report_name="interpretability.pdf", directory="."): + report = Report(report_name, directory=directory) + root_dir = os.path.abspath(os.path.join(__file__, os.pardir)) + + report.insert_doc_header(REPORT_MAIN_TITLE_BINARY, logo_fig=os.path.join(root_dir, "img/BioAutoML.png")) + report.insert_text_on_doc(REPORT_SHAP_PREAMBLE_BINARY, font_size=14) + + report.insert_figure_on_doc(generated_plt['bar_graph']) + report.insert_text_on_doc(REPORT_SHAP_BAR_BINARY, font_size=14) + + report.insert_figure_on_doc(generated_plt['beeswarm_graph']) + report.insert_text_on_doc(REPORT_SHAP_BEESWARM_BINARY, font_size=12) + + report.insert_figure_on_doc(generated_plt['waterfall_graph']) + report.insert_text_on_doc(REPORT_SHAP_WATERFALL_BINARY, font_size=12) + + report.build() + def binary_pipeline(test, test_labels, test_nameseq, norm, fs, classifier, tuning, output): - global clf, train, train_labels + global clf, train, train_labels, le if not os.path.exists(output): os.mkdir(output) @@ -578,19 +697,22 @@ def binary_pipeline(test, test_labels, test_nameseq, norm, fs, classifier, tunin if imbalance_data is True: train, train_labels = imbalanced_function(clf, train, train_labels) elif classifier == 3: - if tuning is True: - print('Tuning: ' + str(tuning)) - print('Classifier: XGBClassifier') - clf = xgb.XGBClassifier(eval_metric='mlogloss', random_state=63) - if imbalance_data is True: - train, train_labels = imbalanced_function(clf, train, train_labels) - print('Tuning not yet available for XGBClassifier.') - else: - print('Tuning: ' + str(tuning)) - print('Classifier: XGBClassifier') - clf = xgb.XGBClassifier(eval_metric='mlogloss', random_state=63) - if imbalance_data is True: - train, train_labels = imbalanced_function(clf, train, train_labels) + le = LabelEncoder() + train_labels = le.fit_transform(train_labels) + test_labels = le.fit_transform(test_labels) + if tuning is True: + print('Tuning: ' + str(tuning)) + print('Classifier: XGBClassifier') + clf = xgb.XGBClassifier(eval_metric='mlogloss', random_state=63) + if imbalance_data is True: + train, train_labels = imbalanced_function(clf, train, train_labels) + print('Tuning not yet available for XGBClassifier.') + else: + print('Tuning: ' + str(tuning)) + print('Classifier: XGBClassifier') + clf = xgb.XGBClassifier(eval_metric='mlogloss', random_state=63) + if imbalance_data is True: + train, train_labels = imbalanced_function(clf, train, train_labels) else: sys.exit('This classifier option does not exist - Try again') @@ -635,6 +757,22 @@ def binary_pipeline(test, test_labels, test_nameseq, norm, fs, classifier, tunin print('Saving trained model in ' + model_output + '...') print('Training: Finished...') + """Generating Interpretability Summary """ + + try: + generated_plt = interp_shap(clf, train, train_labels,output) + build_interpretability_report(generated_plt=generated_plt, directory=output) + except ValueError as e: + print(e) + print("If you believe this is a bug, please report it to https://github.com/Bonidia/BioAutoML.") + print("Generation of explanation plots and report failed. Proceeding without it...") + except AssertionError as e: + print(e) + print("This is certainly a bug. Please report it to https://github.com/Bonidia/BioAutoML.") + print("Generation of explanation plots and report failed. Proceeding without it...") + else: + print("Explanation plots and report generated successfully!") + """Generating Feature Importance - Selected feature subset...""" print('Generating Feature Importance - Selected feature subset...') @@ -643,6 +781,7 @@ def binary_pipeline(test, test_labels, test_nameseq, norm, fs, classifier, tunin print('Saving results in ' + importance_output + '...') """Testing model...""" + #test_labels = le.fit_transform(test_labels) if os.path.exists(ftest) is True: print('Generating Performance Test...') diff --git a/BioAutoML-env.yml b/BioAutoML-env.yml index bdffa07..30ab40b 100644 --- a/BioAutoML-env.yml +++ b/BioAutoML-env.yml @@ -6,25 +6,28 @@ dependencies: - _libgcc_mutex=0.1=main - _openmp_mutex=4.5=1_gnu - _py-xgboost-mutex=2.0=cpu_0 + - backcall=0.2.0=pyhd3eb1b0_0 - biopython=1.78=py37h4abf009_1 - blas=2.14=openblas - brotlipy=0.7.0=py37hb5d75c8_1001 - bzip2=1.0.8=h516909a_3 - - ca-certificates=2021.10.26=h06a4308_2 + - ca-certificates=2022.10.11=h06a4308_0 - cairo=1.16.0=h3fc0475_1005 - catboost=1.0.3=py37h89c1867_1 - - certifi=2021.10.8=py37h89c1867_1 + - certifi=2022.9.24=py37h06a4308_0 - cffi=1.15.0=py37h7f8727e_0 - charset-normalizer=2.0.9=pyhd8ed1ab_0 - cloudpickle=2.0.0=pyhd8ed1ab_0 - colorama=0.4.4=pyh9f0ad1d_0 - cryptography=36.0.0=py37h9ce1e76_0 + - cycler=0.11.0=pyhd3eb1b0_0 - deap=1.3.1=py37h9fdb41a_2 - decorator=5.1.0=pyhd8ed1ab_0 - fontconfig=2.13.1=h7e3eb15_1002 - freetype=2.10.4=h7ca028e_0 - future=0.18.2=py37h89c1867_4 - gettext=0.19.8.1=hf34092f_1004 + - giflib=5.2.1=h7b6447c_0 - glib=2.58.3=py37he00f558_1004 - gmp=6.2.1=h58526e2_0 - hyperopt=0.2.5=pyh9f0ad1d_0 @@ -32,7 +35,12 @@ dependencies: - idna=3.3=pyhd3eb1b0_0 - igraph=0.8.3=hef4adab_1 - imbalanced-learn=0.8.1=pyhd8ed1ab_0 + - ipython=7.31.1=py37h06a4308_1 + - jedi=0.18.1=py37h06a4308_1 - joblib=1.1.0=pyhd8ed1ab_0 + - jpeg=9e=h7f8727e_0 + - kiwisolver=1.4.2=py37h295c915_0 + - lcms2=2.12=h3be6417_0 - libblas=3.8.0=14_openblas - libcblas=3.8.0=14_openblas - libedit=3.1.20191231=he28a2e2_2 @@ -44,29 +52,48 @@ dependencies: - libiconv=1.16=h516909a_0 - liblapack=3.8.0=14_openblas - liblapacke=3.8.0=14_openblas + - libllvm11=11.1.0=h3826bc1_1 - libopenblas=0.3.7=h5ec1e0e_6 - libpng=1.6.37=h21135ba_2 - libstdcxx-ng=9.1.0=hdf63c60_0 + - libtiff=4.2.0=h85742a9_0 - libuuid=2.32.1=h14c3975_1000 + - libwebp=1.2.2=h55f646e_0 + - libwebp-base=1.2.2=h7f8727e_0 - libxcb=1.13=h14c3975_1002 - libxgboost=1.5.0=h295c915_1 - libxml2=2.9.10=h68273f3_2 - libzlib=1.2.11=h36c2ea0_1013 - lightgbm=3.2.1=py37h295c915_0 + - llvmlite=0.38.0=py37h4ff587b_0 + - lz4-c=1.9.3=h295c915_1 + - matplotlib=3.2.2=1 + - matplotlib-base=3.2.2=py37h1d35a4c_1 + - matplotlib-inline=0.1.6=py37h06a4308_0 - ncurses=6.2=h58526e2_4 - networkx=2.6.3=pyhd3eb1b0_0 + - numba=0.55.1=py37h51133e4_0 - numpy=1.19.2=py37h7008fea_1 - - openssl=1.1.1o=h7f8727e_0 + - openssl=1.1.1s=h7f8727e_0 + - orderedset=2.0.3=py37h8f50634_3 - pandas=0.25.3=py37hb3f55d8_0 + - parso=0.8.3=pyhd3eb1b0_0 - pcre=8.44=he1b5a44_0 + - pexpect=4.8.0=pyhd3eb1b0_3 + - pickleshare=0.7.5=pyhd3eb1b0_1003 + - pillow=9.0.1=py37h22f2fdc_0 - pip=21.0.1=pyhd8ed1ab_0 - pixman=0.38.0=h516909a_1003 + - prompt-toolkit=3.0.20=pyhd3eb1b0_0 - pthread-stubs=0.4=h36c2ea0_1001 + - ptyprocess=0.7.0=pyhd3eb1b0_2 - py-xgboost=1.5.0=py37h06a4308_1 - pycairo=1.20.0=py37h01af8b0_1 - pycparser=2.21=pyhd8ed1ab_0 + - pygments=2.11.2=pyhd3eb1b0_0 - pymongo=3.12.0=py37h295c915_0 - pyopenssl=21.0.0=pyhd8ed1ab_0 + - pyparsing=3.0.9=py37h06a4308_0 - pysocks=1.7.1=py37h89c1867_4 - python=3.7.3=h5b0a415_0 - python-dateutil=2.8.0=py_0 @@ -74,20 +101,28 @@ dependencies: - python_abi=3.7=2_cp37m - pytz=2021.1=pyhd8ed1ab_0 - readline=7.0=hf8c457e_1001 + - reportlab=3.5.67=py37hfdd840d_1 - requests=2.26.0=pyhd8ed1ab_1 - scikit-learn=1.0.1=py37h51133e4_0 - scipy=1.6.1=py37hf56f3a7_0 - setuptools=52.0.0=py37h06a4308_1 + - shap=0.39.0=py37h51133e4_0 - six=1.15.0=pyh9f0ad1d_0 + - slicer=0.0.7=pyhd3eb1b0_0 - sqlite=3.33.0=h62c20be_0 - stopit=1.1.2=py_0 + - tbb=2021.5.0=hd09550d_0 - texttable=1.6.3=pyh9f0ad1d_0 - threadpoolctl=3.0.0=pyh8a188c0_0 - tk=8.6.10=h21135ba_1 + - tornado=6.1=py37h27cfd23_0 - tpot=0.11.7=pyhd8ed1ab_1 - tqdm=4.62.3=pyhd8ed1ab_0 + - traitlets=5.1.1=pyhd3eb1b0_0 + - typing_extensions=4.3.0=py37h06a4308_0 - update_checker=0.18.0=pyh9f0ad1d_0 - urllib3=1.26.7=pyhd8ed1ab_0 + - wcwidth=0.2.5=pyhd3eb1b0_0 - wheel=0.36.2=pyhd3deb0d_0 - xorg-kbproto=1.0.7=h14c3975_1002 - xorg-libice=1.0.10=h516909a_0 @@ -102,4 +137,4 @@ dependencies: - xorg-xproto=7.0.31=h14c3975_1007 - xz=5.2.5=h516909a_1 - zlib=1.2.11=h36c2ea0_1013 -prefix: /home/robson/miniconda3/envs/bioautoml \ No newline at end of file + - zstd=1.4.9=haebb681_0 diff --git a/BioAutoML-multiclass.py b/BioAutoML-multiclass.py index 736f463..e81e3ac 100644 --- a/BioAutoML-multiclass.py +++ b/BioAutoML-multiclass.py @@ -1,8 +1,10 @@ import warnings warnings.filterwarnings(action='ignore', category=FutureWarning) warnings.filterwarnings('ignore') + import pandas as pd import numpy as np +import matplotlib.pyplot as plt import random import argparse import sys @@ -10,675 +12,810 @@ import time import lightgbm as lgb import joblib +import shap +import xgboost as xgb # import shutil -# import xgboost as xgb -# from sklearn.metrics import roc_auc_score -from sklearn.model_selection import cross_val_predict +from catboost import CatBoostClassifier +# from tpot import TPOTClassifier +from sklearn.metrics import accuracy_score from sklearn.metrics import classification_report +from sklearn.metrics import f1_score +from sklearn.metrics import cohen_kappa_score, make_scorer +from sklearn.metrics import matthews_corrcoef +# from sklearn.metrics import roc_auc_score # from sklearn.metrics import multilabel_confusion_matrix -# from sklearn.model_selection import KFold -from catboost import CatBoostClassifier # from sklearn.metrics import balanced_accuracy_score -# from sklearn.pipeline import Pipeline -# from sklearn.preprocessing import MinMaxScaler -# from sklearn.model_selection import train_test_split -from sklearn.metrics import accuracy_score +# from sklearn.metrics import precision_score +# from sklearn.metrics import recall_score +from sklearn.model_selection import cross_val_predict from sklearn.model_selection import cross_validate +from sklearn.model_selection import StratifiedKFold +from sklearn.model_selection import cross_val_score from sklearn.preprocessing import StandardScaler +from sklearn.preprocessing import LabelEncoder +# from sklearn.preprocessing import MinMaxScaler +# from sklearn.model_selection import KFold +# from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier -from sklearn.metrics import f1_score -from sklearn.ensemble import AdaBoostClassifier -# from sklearn.metrics import precision_score -# from sklearn.metrics import recall_score -from sklearn.metrics import matthews_corrcoef +# from sklearn.ensemble import AdaBoostClassifier from sklearn.feature_selection import SelectFromModel +# from sklearn.pipeline import Pipeline +from sklearn.impute import SimpleImputer +# from imblearn.metrics import geometric_mean_score from imblearn.over_sampling import SMOTE from imblearn.under_sampling import RandomUnderSampler -from sklearn.model_selection import StratifiedKFold -from sklearn.metrics import cohen_kappa_score, make_scorer -# from imblearn.metrics import geometric_mean_score from imblearn.pipeline import Pipeline -from sklearn.impute import SimpleImputer -from sklearn.model_selection import cross_val_score from hyperopt import hp, fmin, tpe, STATUS_OK, Trials -# from tpot import TPOTClassifier +from interpretability_report import Report, REPORT_MAIN_TITLE_MULTICLASS, REPORT_SHAP_PREAMBLE, \ + REPORT_SHAP_SUMMARY_1, REPORT_SHAP_SUMMARY_2, REPORT_SHAP_WATERFALL_1, REPORT_SUMMARY_TITLE, \ + REPORT_WATERFALL_TITLE, REPORT_SHAP_WATERFALL_2 +SUMMARY = 0 +WATERFALL = 1 def header(output_header): - """Header Function: Header of the evaluate_model_cross Function""" + """Header Function: Header of the evaluate_model_cross Function""" - file = open(output_header, 'a') - file.write('ACC,std_ACC,MCC,std_MCC,F1_micro,std_F1_micro,' - 'F1_macro,std_F1_macro,F1_w,std_F1_w,kappa,std_kappa') - file.write('\n') - return + file = open(output_header, 'a') + file.write('ACC,std_ACC,MCC,std_MCC,F1_micro,std_F1_micro,' + 'F1_macro,std_F1_macro,F1_w,std_F1_w,kappa,std_kappa') + file.write('\n') + return def save_measures(output_measures, scores): - """Save Measures Function: Output of the evaluate_model_cross Function""" + """Save Measures Function: Output of the evaluate_model_cross Function""" - header(output_measures) - file = open(output_measures, 'a') - file.write('%0.4f,%0.2f,%0.4f,%0.2f,%0.4f,%0.2f,%0.4f,%0.2f,%0.4f,%0.2f,%0.4f,%0.2f' % (scores['test_ACC'].mean(), - + scores['test_ACC'].std(), scores['test_MCC'].mean(), scores['test_MCC'].std(), - + scores['test_f1_mi'].mean(), scores['test_f1_mi'].std(), - + scores['test_f1_ma'].mean(), scores['test_f1_ma'].std(), - + scores['test_f1_w'].mean(), scores['test_f1_w'].std(), - + scores['test_kappa'].mean(), scores['test_kappa'].std())) - file.write('\n') - return + header(output_measures) + file = open(output_measures, 'a') + file.write('%0.4f,%0.2f,%0.4f,%0.2f,%0.4f,%0.2f,%0.4f,%0.2f,%0.4f,%0.2f,%0.4f,%0.2f' % (scores['test_ACC'].mean(), + + scores['test_ACC'].std(), scores['test_MCC'].mean(), scores['test_MCC'].std(), + + scores['test_f1_mi'].mean(), scores['test_f1_mi'].std(), + + scores['test_f1_ma'].mean(), scores['test_f1_ma'].std(), + + scores['test_f1_w'].mean(), scores['test_f1_w'].std(), + + scores['test_kappa'].mean(), scores['test_kappa'].std())) + file.write('\n') + return def evaluate_model_cross(X, y, model, output_cross, matrix_output): - """Evaluation Function: Using Cross-Validation""" + """Evaluation Function: Using Cross-Validation""" + + scoring = {'ACC': make_scorer(accuracy_score), + 'MCC': make_scorer(matthews_corrcoef), + 'f1_mi': make_scorer(f1_score, average='micro'), + 'f1_ma': make_scorer(f1_score, average='macro'), + 'f1_w': make_scorer(f1_score, average='weighted'), + 'kappa': make_scorer(cohen_kappa_score)} - scoring = {'ACC': make_scorer(accuracy_score), - 'MCC': make_scorer(matthews_corrcoef), - 'f1_mi': make_scorer(f1_score, average='micro'), - 'f1_ma': make_scorer(f1_score, average='macro'), - 'f1_w': make_scorer(f1_score, average='weighted'), - 'kappa': make_scorer(cohen_kappa_score)} - kfold = StratifiedKFold(n_splits=10, shuffle=True) - scores = cross_validate(model, X, y, cv=kfold, scoring=scoring) - save_measures(output_cross, scores) - y_pred = cross_val_predict(model, X, y, cv=kfold) - conf_mat = (pd.crosstab(y, y_pred, rownames=['REAL'], colnames=['PREDITO'], margins=True)) - conf_mat.to_csv(matrix_output) - return + kfold = StratifiedKFold(n_splits=10, shuffle=True) + scores = cross_validate(model, X, y, cv=kfold, scoring=scoring) + save_measures(output_cross, scores) + y_pred = cross_val_predict(model, X, y, cv=kfold) + conf_mat = (pd.crosstab(y, y_pred, rownames=['REAL'], colnames=['PREDITO'], margins=True)) + conf_mat.to_csv(matrix_output) def objective_rf(space): - """Tuning of classifier: Objective Function - Random Forest - Bayesian Optimization""" + """Tuning of classifier: Objective Function - Random Forest - Bayesian Optimization""" - model = RandomForestClassifier(n_estimators=int(space['n_estimators']), - criterion=space['criterion'], - max_depth=int(space['max_depth']), - max_features=space['max_features'], - min_samples_leaf=int(space['min_samples_leaf']), - min_samples_split=int(space['min_samples_split']), - random_state=63, - bootstrap=space['bootstrap'], - n_jobs=n_cpu) + model = RandomForestClassifier(n_estimators=int(space['n_estimators']), + criterion=space['criterion'], + max_depth=int(space['max_depth']), + max_features=space['max_features'], + min_samples_leaf=int(space['min_samples_leaf']), + min_samples_split=int(space['min_samples_split']), + random_state=63, + bootstrap=space['bootstrap'], + n_jobs=n_cpu) - kfold = StratifiedKFold(n_splits=5, shuffle=True) - metric = cross_val_score(model, - train, - train_labels, - cv=kfold, - scoring=make_scorer(f1_score, average='weighted'), - n_jobs=n_cpu).mean() + kfold = StratifiedKFold(n_splits=5, shuffle=True) + metric = cross_val_score(model, + train, + train_labels, + cv=kfold, + scoring=make_scorer(f1_score, average='weighted'), + n_jobs=n_cpu).mean() - return {'loss': -metric, 'status': STATUS_OK} + return {'loss': -metric, 'status': STATUS_OK} def tuning_rf_bayesian(): - """Tuning of classifier: Random Forest - Bayesian Optimization""" - - param = {'criterion': ['entropy', 'gini'], 'max_features': ['auto', 'sqrt', 'log2', None], 'bootstrap': [True, False]} - space = {'criterion': hp.choice('criterion', ['entropy', 'gini']), - 'n_estimators': hp.quniform('n_estimators', 100, 2000, 50), - 'max_depth': hp.quniform('max_depth', 10, 100, 5), - 'max_features': hp.choice('max_features', ['auto', 'sqrt', 'log2', None]), - 'min_samples_leaf': hp.quniform('min_samples_leaf', 1, 10, 1), - 'min_samples_split': hp.quniform('min_samples_split', 2, 10, 1), - 'bootstrap': hp.choice('bootstrap', [True, False])} - - trials = Trials() - best_tuning = fmin(fn=objective_rf, - space=space, - algo=tpe.suggest, - max_evals=100, - trials=trials) - - best_rf = RandomForestClassifier(n_estimators=int(best_tuning['n_estimators']), - criterion=param['criterion'][best_tuning['criterion']], - max_depth=int(best_tuning['max_depth']), - max_features=param['max_features'][best_tuning['max_features']], - min_samples_leaf=int(best_tuning['min_samples_leaf']), - min_samples_split=int(best_tuning['min_samples_split']), - random_state=63, - bootstrap=param['bootstrap'][best_tuning['bootstrap']], - n_jobs=n_cpu) - return best_tuning, best_rf - - + """Tuning of classifier: Random Forest - Bayesian Optimization""" + + param = {'criterion': ['entropy', 'gini'], 'max_features': ['auto', 'sqrt', 'log2', None], 'bootstrap': [True, False]} + space = {'criterion': hp.choice('criterion', ['entropy', 'gini']), + 'n_estimators': hp.quniform('n_estimators', 100, 2000, 50), + 'max_depth': hp.quniform('max_depth', 10, 100, 5), + 'max_features': hp.choice('max_features', ['auto', 'sqrt', 'log2', None]), + 'min_samples_leaf': hp.quniform('min_samples_leaf', 1, 10, 1), + 'min_samples_split': hp.quniform('min_samples_split', 2, 10, 1), + 'bootstrap': hp.choice('bootstrap', [True, False])} + + trials = Trials() + best_tuning = fmin(fn=objective_rf, + space=space, + algo=tpe.suggest, + max_evals=100, + trials=trials) + + best_rf = RandomForestClassifier(n_estimators=int(best_tuning['n_estimators']), + criterion=param['criterion'][best_tuning['criterion']], + max_depth=int(best_tuning['max_depth']), + max_features=param['max_features'][best_tuning['max_features']], + min_samples_leaf=int(best_tuning['min_samples_leaf']), + min_samples_split=int(best_tuning['min_samples_split']), + random_state=63, + bootstrap=param['bootstrap'][best_tuning['bootstrap']], + n_jobs=n_cpu) + return best_tuning, best_rf + + +# function not used anywhere def objective_cb(space): - """Tuning of classifier: Objective Function - CatBoost - Bayesian Optimization""" + """Tuning of classifier: Objective Function - CatBoost - Bayesian Optimization""" - model = CatBoostClassifier(n_estimators=int(space['n_estimators']), - max_depth=int(space['max_depth']), - learning_rate=space['learning_rate'], - thread_count=n_cpu, nan_mode='Max', logging_level='Silent', random_state=63) + model = CatBoostClassifier(n_estimators=int(space['n_estimators']), + max_depth=int(space['max_depth']), + learning_rate=space['learning_rate'], + thread_count=n_cpu, nan_mode='Max', + logging_level='Silent', random_state=63) - kfold = StratifiedKFold(n_splits=5, shuffle=True) - metric = cross_val_score(model, - train, - train_labels, - cv=kfold, - scoring=make_scorer(f1_score, average='weighted'), - n_jobs=n_cpu).mean() + kfold = StratifiedKFold(n_splits=5, shuffle=True) + metric = cross_val_score(model, + train, + train_labels, + cv=kfold, + scoring=make_scorer(f1_score, average='weighted'), + n_jobs=n_cpu).mean() - return {'loss': -metric, 'status': STATUS_OK} + return {'loss': -metric, 'status': STATUS_OK} +# function not used anywhere def tuning_catboost_bayesian(): - """Tuning of classifier: CatBoost - Bayesian Optimization""" + """Tuning of classifier: CatBoost - Bayesian Optimization""" - space = {'n_estimators': hp.quniform('n_estimators', 100, 2000, 50), - 'learning_rate': hp.uniform('learning_rate', 0.01, 0.5), - 'max_depth': hp.quniform('max_depth', 1, 16, 1), - # 'random_strength': hp.loguniform('random_strength', 1e-9, 10), - # 'bagging_temperature': hp.uniform('bagging_temperature', 0.0, 1.0), - # 'border_count': hp.quniform('border_count', 1, 255, 1), - # 'l2_leaf_reg': hp.quniform('l2_leaf_reg', 2, 30, 1), - # 'scale_pos_weight': hp.uniform('scale_pos_weight', 0.01, 1.0), - # 'bootstrap_type' = hp.choice('bootstrap_type', ['Bayesian', 'Bernoulli', 'MVS']) - } + space = {'n_estimators': hp.quniform('n_estimators', 100, 2000, 50), + 'learning_rate': hp.uniform('learning_rate', 0.01, 0.5), + 'max_depth': hp.quniform('max_depth', 1, 16, 1), + # 'random_strength': hp.loguniform('random_strength', 1e-9, 10), + # 'bagging_temperature': hp.uniform('bagging_temperature', 0.0, 1.0), + # 'border_count': hp.quniform('border_count', 1, 255, 1), + # 'l2_leaf_reg': hp.quniform('l2_leaf_reg', 2, 30, 1), + # 'scale_pos_weight': hp.uniform('scale_pos_weight', 0.01, 1.0), + # 'bootstrap_type' = hp.choice('bootstrap_type', ['Bayesian', 'Bernoulli', 'MVS']) + } - trials = Trials() - best_tuning = fmin(fn=objective_cb, - space=space, - algo=tpe.suggest, - max_evals=100, - trials=trials) + trials = Trials() + best_tuning = fmin(fn=objective_cb, + space=space, + algo=tpe.suggest, + max_evals=100, + trials=trials) - best_cb = CatBoostClassifier(n_estimators=int(best_tuning['n_estimators']), - max_depth=int(best_tuning['max_depth']), - learning_rate=best_tuning['learning_rate'], - thread_count=n_cpu, nan_mode='Max', logging_level='Silent', random_state=63) + best_cb = CatBoostClassifier(n_estimators=int(best_tuning['n_estimators']), + max_depth=int(best_tuning['max_depth']), + learning_rate=best_tuning['learning_rate'], + thread_count=n_cpu, nan_mode='Max', logging_level='Silent', random_state=63) - return best_tuning, best_cb + return best_tuning, best_cb def objective_lightgbm(space): - """Tuning of classifier: Objective Function - Lightgbm - Bayesian Optimization""" + """Tuning of classifier: Objective Function - Lightgbm - Bayesian Optimization""" - model = lgb.LGBMClassifier(n_estimators=int(space['n_estimators']), - max_depth=int(space['max_depth']), - num_leaves=int(space['num_leaves']), - learning_rate=space['learning_rate'], - subsample=space['subsample'], - n_jobs=n_cpu, random_state=63) + model = lgb.LGBMClassifier(n_estimators=int(space['n_estimators']), + max_depth=int(space['max_depth']), + num_leaves=int(space['num_leaves']), + learning_rate=space['learning_rate'], + subsample=space['subsample'], + n_jobs=n_cpu, random_state=63) - kfold = StratifiedKFold(n_splits=5, shuffle=True) - metric = cross_val_score(model, - train, - train_labels, - cv=kfold, - scoring=make_scorer(f1_score, average='weighted'), - n_jobs=n_cpu).mean() + kfold = StratifiedKFold(n_splits=5, shuffle=True) + metric = cross_val_score(model, + train, + train_labels, + cv=kfold, + scoring=make_scorer(f1_score, average='weighted'), + n_jobs=n_cpu).mean() - return {'loss': -metric, 'status': STATUS_OK} + return {'loss': -metric, 'status': STATUS_OK} def tuning_lightgbm_bayesian(): - """Tuning of classifier: Lightgbm - Bayesian Optimization""" + """Tuning of classifier: Lightgbm - Bayesian Optimization""" - space = {'n_estimators': hp.quniform('n_estimators', 100, 1000, 50), - 'max_depth': hp.quniform('max_depth', 1, 30, 1), - 'num_leaves': hp.quniform('num_leaves', 10, 200, 1), - 'learning_rate': hp.uniform('learning_rate', 0.01, 0.5), - 'subsample': hp.uniform('subsample', 0.1, 1.0)} + space = {'n_estimators': hp.quniform('n_estimators', 100, 1000, 50), + 'max_depth': hp.quniform('max_depth', 1, 30, 1), + 'num_leaves': hp.quniform('num_leaves', 10, 200, 1), + 'learning_rate': hp.uniform('learning_rate', 0.01, 0.5), + 'subsample': hp.uniform('subsample', 0.1, 1.0)} - trials = Trials() - best_tuning = fmin(fn=objective_lightgbm, - space=space, - algo=tpe.suggest, - max_evals=100, - trials=trials) + trials = Trials() + best_tuning = fmin(fn=objective_lightgbm, + space=space, + algo=tpe.suggest, + max_evals=100, + trials=trials) - best_cb = lgb.LGBMClassifier(n_estimators=int(best_tuning['n_estimators']), - max_depth=int(best_tuning['max_depth']), - num_leaves=int(best_tuning['num_leaves']), - learning_rate=best_tuning['learning_rate'], - subsample=best_tuning['subsample'], - n_jobs=n_cpu, random_state=63) + best_cb = lgb.LGBMClassifier(n_estimators=int(best_tuning['n_estimators']), + max_depth=int(best_tuning['max_depth']), + num_leaves=int(best_tuning['num_leaves']), + learning_rate=best_tuning['learning_rate'], + subsample=best_tuning['subsample'], + n_jobs=n_cpu, random_state=63) - return best_tuning, best_cb + return best_tuning, best_cb def objective_feature_selection(space): - """Feature Importance-based Feature selection: Objective Function - Bayesian Optimization""" + """Feature Importance-based Feature selection: Objective Function - Bayesian Optimization""" - t = space['threshold'] + t = space['threshold'] - fs = SelectFromModel(clf, threshold=t) - fs.fit(train, train_labels) - fs_train = fs.transform(train) - kfold = StratifiedKFold(n_splits=5, shuffle=True) - f1 = cross_val_score(clf, - fs_train, - train_labels, - cv=kfold, - scoring=make_scorer(f1_score, average='weighted'), - n_jobs=n_cpu).mean() + fs = SelectFromModel(clf, threshold=t) + fs.fit(train, train_labels) + fs_train = fs.transform(train) + kfold = StratifiedKFold(n_splits=5, shuffle=True) + f1 = cross_val_score(clf, + fs_train, + train_labels, + cv=kfold, + scoring=make_scorer(f1_score, average='weighted'), + n_jobs=n_cpu).mean() - return {'loss': -f1, 'status': STATUS_OK} + return {'loss': -f1, 'status': STATUS_OK} def feature_importance_fs_bayesian(model, train, train_labels): - """Feature Importance-based Feature selection using Bayesian Optimization""" + """Feature Importance-based Feature selection using Bayesian Optimization""" - model.fit(train, train_labels) - importances = set(model.feature_importances_) - importances.remove(max(importances)) - importances.remove(max(importances)) + model.fit(train, train_labels) + importances = set(model.feature_importances_) + importances.remove(max(importances)) + importances.remove(max(importances)) - space = {'threshold': hp.uniform('threshold', min(importances), max(importances))} + space = {'threshold': hp.uniform('threshold', min(importances), max(importances))} - trials = Trials() - best_threshold = fmin(fn=objective_feature_selection, - space=space, - algo=tpe.suggest, - max_evals=50, - trials=trials) + trials = Trials() + best_threshold = fmin(fn=objective_feature_selection, + space=space, + algo=tpe.suggest, + max_evals=50, + trials=trials) - return best_threshold['threshold'] + return best_threshold['threshold'] def feature_importance_fs(model, train, train_labels, column_train): - """threshold: features that have an importance of more than ...""" - - if len(column_train) > 100: - samples = round(int(len(column_train)) * 0.40) - else: - samples = round(int(len(column_train)) * 0.80) - model.fit(train, train_labels) - importances = set(model.feature_importances_) - threshold = random.sample(importances, samples) - best_t = 0 - best_baac = 0 - for t in threshold: - if t != max(importances): - fs = SelectFromModel(model, threshold=t) - fs.fit(train, train_labels) - fs_train = fs.transform(train) - kfold = StratifiedKFold(n_splits=5, shuffle=True) - bacc = cross_val_score(model, - fs_train, - train_labels, - cv=kfold, - scoring=make_scorer(f1_score, average='weighted'), - n_jobs=n_cpu).mean() - if bacc > best_baac: - best_baac = bacc - best_t = t - elif bacc == best_baac and t > best_t: - best_t = t - else: - pass - else: - pass - return best_t, best_baac + """threshold: features that have an importance of more than ...""" + + if len(column_train) > 100: + samples = round(int(len(column_train)) * 0.40) + else: + samples = round(int(len(column_train)) * 0.80) + model.fit(train, train_labels) + importances = set(model.feature_importances_) + threshold = random.sample(importances, samples) + best_t = 0 + best_baac = 0 + for t in threshold: + if t != max(importances): + fs = SelectFromModel(model, threshold=t) + fs.fit(train, train_labels) + fs_train = fs.transform(train) + kfold = StratifiedKFold(n_splits=5, shuffle=True) + bacc = cross_val_score(model, + fs_train, + train_labels, + cv=kfold, + scoring=make_scorer(f1_score, average='weighted'), + n_jobs=n_cpu).mean() + if bacc > best_baac: + best_baac = bacc + best_t = t + elif bacc == best_baac and t > best_t: + best_t = t + else: + pass + else: + pass + return best_t, best_baac def features_importance_ensembles(model, features, output_importances): - """Generate feature importance values""" + """Generate feature importance values""" - file = open(output_importances, 'a') - importances = model.feature_importances_ - indices = np.argsort(importances)[::-1] - names = [features[i] for i in indices] - for f in range(len(features)): - file.write('%d. Feature (%s): (%f)' % (f + 1, names[f], importances[indices[f]])) - file.write('\n') - # print('%d. %s: (%f)' % (f + 1, names[f], importances[indices[f]])) - return names + file = open(output_importances, 'a') + importances = model.feature_importances_ + indices = np.argsort(importances)[::-1] + names = [features[i] for i in indices] + for f in range(len(features)): + file.write('%d. Feature (%s): (%f)' % (f + 1, names[f], importances[indices[f]])) + file.write('\n') + # print('%d. %s: (%f)' % (f + 1, names[f], importances[indices[f]])) + return names def imbalanced_techniques(model, tech, train, train_labels): - """Testing imbalanced data techniques""" + """Testing imbalanced data techniques""" - sm = tech - pipe = Pipeline([('tech', sm), ('classifier', model)]) - # train_new, train_labels_new = sm.fit_sample(train, train_labels) - kfold = StratifiedKFold(n_splits=5, shuffle=True) - f1 = cross_val_score(pipe, - train, - train_labels, - cv=kfold, - scoring=make_scorer(f1_score, average='weighted'), - n_jobs=n_cpu).mean() - return f1 + sm = tech + pipe = Pipeline([('tech', sm), ('classifier', model)]) + # train_new, train_labels_new = sm.fit_sample(train, train_labels) + kfold = StratifiedKFold(n_splits=5, shuffle=True) + f1 = cross_val_score(pipe, + train, + train_labels, + cv=kfold, + scoring=make_scorer(f1_score, average='weighted'), + n_jobs=n_cpu).mean() + return f1 def imbalanced_function(clf, train, train_labels): - """Preprocessing: Imbalanced datasets""" - - print('Checking for imbalanced labels...') - df = pd.DataFrame(train_labels) - n_labels = pd.value_counts(df.values.flatten()) - if all(x == n_labels[0] for x in n_labels) is False: - print('There are imbalanced labels...') - print('Checking the best technique...') - smote = imbalanced_techniques(clf, SMOTE(random_state=42), train, train_labels) - random = imbalanced_techniques(clf, RandomUnderSampler(random_state=42), train, train_labels) - if smote > random: - print('Applying Smote - Oversampling...') - sm = SMOTE(random_state=42) - train, train_labels = sm.fit_sample(train, train_labels) - else: - print('Applying Random - Undersampling...') - sm = RandomUnderSampler(random_state=42) - train, train_labels = sm.fit_sample(train, train_labels) - else: - print('There are no imbalanced labels...') - return train, train_labels + """Preprocessing: Imbalanced datasets""" + + print('Checking for imbalanced labels...') + df = pd.DataFrame(train_labels) + n_labels = pd.value_counts(df.values.flatten()) + if all(x == n_labels[0] for x in n_labels) is False: + print('There are imbalanced labels...') + print('Checking the best technique...') + smote = imbalanced_techniques(clf, SMOTE(random_state=42), train, train_labels) + random = imbalanced_techniques(clf, RandomUnderSampler(random_state=42), train, train_labels) + if smote > random: + print('Applying Smote - Oversampling...') + sm = SMOTE(random_state=42) + train, train_labels = sm.fit_sample(train, train_labels) + else: + print('Applying Random - Undersampling...') + sm = RandomUnderSampler(random_state=42) + train, train_labels = sm.fit_sample(train, train_labels) + else: + print('There are no imbalanced labels...') + return train, train_labels def save_prediction(prediction, nameseqs, pred_output): - """Saving prediction - test set""" - - file = open(pred_output, 'a') - - if os.path.exists(nameseq_test) is True: - for i in range(len(prediction)): - file.write('%s,' % str(nameseqs[i])) - file.write('%s' % str(prediction[i])) - file.write('\n') - else: - for i in range(len(prediction)): - file.write('%s' % str(prediction[i])) - file.write('\n') - return - - -def multiclass_pipeline(test, test_labels, test_nameseq, norm, classifier, tuning, output): - - global clf, train, train_labels - - if not os.path.exists(output): - os.mkdir(output) - - train = train_read - train_labels = train_labels_read - column_train = train.columns - column_test = '' - output = output + '/' - - # tmp = sys.stdout - # log_file = open(output + 'task.log', 'a') - # sys.stdout = log_file - - """Number of Samples and Features: Train and Test""" - - print('Number of samples (train): ' + str(len(train))) - - """Number of labels""" - - print('Number of Labels (train):') - df_label = pd.DataFrame(train_labels) - print(str(pd.value_counts(df_label.values.flatten()))) - - if os.path.exists(ftest) is True: - column_test = test.columns - print('Number of samples (test): ' + str(len(test))) - print('Number of Labels (test):') - df_label = pd.DataFrame(test_labels) - print(str(pd.value_counts(df_label.values.flatten()))) - - print('Number of features (train): ' + str(len(column_train))) - - if os.path.exists(ftest_labels) is True: - print('Number of features (test): ' + str(len(column_test))) - - """Preprocessing: Missing Values""" - - print('Checking missing values...') - missing = train.isnull().values.any() - inf = train.isin([np.inf, -np.inf]).values.any() - missing_test = False - inf_test = False - if os.path.exists(ftest) is True: - missing_test = test.isnull().values.any() - inf_test = test.isin([np.inf, -np.inf]).values.any() - if missing or inf or missing_test or inf_test: - print('There are missing values...') - print('Applying SimpleImputer - strategy (mean)...') - imp = SimpleImputer(missing_values=np.nan, strategy='mean') - train = pd.DataFrame(imp.fit_transform(train), columns=column_train) - if os.path.exists(ftest) is True: - test = pd.DataFrame(imp.transform(test), columns=column_test) - else: - pass - else: - print('There are no missing values...') - - """Preprocessing: StandardScaler()""" - - if norm is True: - print('Applying StandardScaler()....') - sc = StandardScaler() - train = pd.DataFrame(sc.fit_transform(train), columns=column_train) - if os.path.exists(ftest) is True: - test = pd.DataFrame(sc.transform(test), columns=column_test) - else: - pass - - """Choosing Classifier """ - - print('Choosing Classifier...') - if classifier == 0: - if tuning is True: - print('Tuning: ' + str(tuning)) - print('Classifier: AdaBoost') - clf = AdaBoostClassifier(n_estimators=500, random_state=63) - # train, train_labels = imbalanced_function(clf, train, train_labels) - else: - print('Tuning: ' + str(tuning)) - print('Classifier: AdaBoost') - clf = AdaBoostClassifier(n_estimators=500, random_state=63) - # train, train_labels = imbalanced_function(clf, train, train_labels) - elif classifier == 1: - if tuning is True: - print('Tuning: ' + str(tuning)) - print('Classifier: Random Forest') - clf = RandomForestClassifier(n_estimators=200, n_jobs=n_cpu, random_state=63) - # train, train_labels = imbalanced_function(clf, train, train_labels) - best_tuning, clf = tuning_rf_bayesian() - print('Finished Tuning') - else: - print('Tuning: ' + str(tuning)) - print('Classifier: Random Forest') - clf = RandomForestClassifier(n_estimators=200, n_jobs=n_cpu, random_state=63) - # train, train_labels = imbalanced_function(clf, train, train_labels) - elif classifier == 2: - if tuning is True: - print('Tuning: ' + str(tuning)) - print('Classifier: LightGBM') - clf = lgb.LGBMClassifier(n_estimators=500, n_jobs=n_cpu, random_state=63) - # train, train_labels = imbalanced_function(clf, train, train_labels) - best_tuning, clf = tuning_lightgbm_bayesian() - print('Finished Tuning') - else: - print('Tuning: ' + str(tuning)) - print('Classifier: LightGBM') - clf = lgb.LGBMClassifier(n_estimators=500, n_jobs=n_cpu, random_state=63) - # train, train_labels = imbalanced_function(clf, train, train_labels) - else: - sys.exit('This classifier option does not exist - Try again') - - """Preprocessing: Feature Importance-Based Feature Selection""" - - print('Applying Feature Importance-Based Feature Selection...') - # best_t, best_baac = feature_importance_fs(clf, train, train_labels, column_train) - best_t = feature_importance_fs_bayesian(clf, train, train_labels) - fs = SelectFromModel(clf, threshold=best_t) - fs.fit(train, train_labels) - feature_idx = fs.get_support() - feature_name = column_train[feature_idx] - train = pd.DataFrame(fs.transform(train), columns=feature_name) - if os.path.exists(ftest) is True: - test = pd.DataFrame(fs.transform(test), columns=feature_name) - else: - pass - print('Best Feature Subset: ' + str(len(feature_name))) - print('Reduction: ' + str(len(column_train)-len(feature_name)) + ' features') - fs_train = output + 'best_feature_train.csv' - fs_test = output + 'best_feature_test.csv' - print('Saving dataset with selected feature subset - train: ' + fs_train) - train.to_csv(fs_train, index=False) - if os.path.exists(ftest) is True: - print('Saving dataset with selected feature subset - test: ' + fs_test) - test.to_csv(fs_test, index=False) - print('Feature Selection - Finished...') - - """Training - StratifiedKFold (cross-validation = 10)...""" - - print('Training: StratifiedKFold (cross-validation = 10)...') - train_output = output + 'training_kfold(10)_metrics.csv' - matrix_output = output + 'training_confusion_matrix.csv' - model_output = output + 'trained_model.sav' - evaluate_model_cross(train, train_labels, clf, train_output, matrix_output) - clf.fit(train, train_labels) - joblib.dump(clf, model_output) - print('Saving results in ' + train_output + '...') - print('Saving confusion matrix in ' + matrix_output + '...') - print('Saving trained model in ' + model_output + '...') - print('Training: Finished...') - - """Generating Feature Importance - Selected feature subset...""" - - print('Generating Feature Importance - Selected feature subset...') - importance_output = output + 'feature_importance.csv' - features_importance_ensembles(clf, feature_name, importance_output) - print('Saving results in ' + importance_output + '...') - - """Testing model...""" - - if os.path.exists(ftest) is True: - print('Generating Performance Test...') - preds = clf.predict(test) - pred_output = output + 'test_predictions.csv' - print('Saving prediction in ' + pred_output + '...') - save_prediction(preds, test_nameseq, pred_output) - if os.path.exists(ftest_labels) is True: - print('Generating Metrics - Test set...') - report = classification_report(test_labels, preds, output_dict=True) - matrix_test = (pd.crosstab(test_labels, preds, rownames=["REAL"], colnames=["PREDITO"], margins=True)) - - metrics_output = output + 'metrics_test.csv' - print('Saving Metrics - Test set: ' + metrics_output + '...') - metr_report = pd.DataFrame(report).transpose() - metr_report.to_csv(metrics_output) - - matrix_output_test = output + 'test_confusion_matrix.csv' - matrix_test.to_csv(matrix_output_test) - print('Saving confusion matrix in ' + matrix_output_test + '...') - print('Task completed - results generated in ' + output + '!') - - else: - print('There are no test labels for evaluation, check parameters...') - # sys.stdout = tmp - # log_file.close() - else: - print('There are no test sequences for evaluation, check parameters...') - print('Task completed - results generated in ' + output + '!') - # sys.stdout = tmp - # log_file.close() - - return + """Saving prediction - test set""" + + map(str, prediction) + + # From where does "nameseq_test" come from???? + with open(pred_output, 'a') as f: + if os.path.exists(nameseq_test) is True: + map(str, nameseqs) + for seq, pred in zip(nameseqs, prediction): + f.write(f"{seq},{pred}\n") + else: + for pred in prediction: + f.write(f"{pred}\n") + + +def randomize_samples(targets, _class, n_samples=1): + + """Get a given number of samples which match with class '_class'""" + + enum_targets = enumerate(targets) + class_samples = np.array(list(filter(lambda x: x[1] == _class, enum_targets))) + + try: + chosen = np.random.choice(len(class_samples), size=n_samples, replace=False) + return map(int, class_samples[chosen,0]) + except ValueError: + raise ValueError( + f"Error: There's not enough samples of class {_class} in targets " +\ + f"(n_samples={n_samples} is too high)." + ) + + +def generate_waterfall_plot(id_plot, class_shap_values, class_name, sample_row, base_value, + feature_names, path): + + """Generates a waterfall plot for a given sample""" + + local_name = os.path.join(path, f"waterfall_random_{class_name}_{id_plot}.png") + + exp = shap.Explanation(values=class_shap_values, base_values=base_value, + data=sample_row, feature_names=feature_names) + fig = plt.figure() + plt.title(f"Waterfall plot for sample {id_plot} of class '{class_name}'", fontsize=16) + shap.waterfall_plot(exp, show=False) + plt.savefig(local_name, bbox_inches = "tight") + + return local_name + + +def generate_summary_plot(class_shap_values, class_name, data, feature_names, path): + + """Generates a summary plot for a given class of the multiclass classification""" + + local_name = os.path.join(path, f"summary_{class_name}.png") + + fig = plt.figure() + plt.title(f"Summary plot for class '{class_name}'", fontsize=16) + shap.summary_plot(class_shap_values, data, feature_names=feature_names, show=False) + plt.savefig(local_name, bbox_inches = "tight") + + return local_name + + +def generate_all_plots(model, train, test, preds, feature_names, path='explanations', n_samples=3): + + """Used to generate each of the plots used to explain the model's decision""" + + generated_plt = {} + + print("Training the explainer model...") + explainer = shap.TreeExplainer(model, data=train, feature_names=feature_names) + shap_values = np.array(explainer.shap_values(test)) + print("Explainer trained successfully!") + + classes = sorted(set(preds)) + if len(classes) <= 2: + raise ValueError( + (f"{os.path.basename(__file__)} shouldn't be used to handle binary classification problems. To generate " + "the interpretability report properly, please execute the binary classification script with the " + "same configuration.") + ) + assert len(shap_values) == len(classes),\ + "Error: Classes generated by the explainer of 'model' doesn't match the distinct number " +\ + f"of classes in 'targets'. [Explainer={len(shap_values)}, Target={len(classes)}]" + + if not os.path.exists(path): + print(f"Creating explanations directory: {path}...") + os.mkdir(path) + else: + print(f"Directory {path} already exists. Will proceed using it...") + + generated_plt[SUMMARY] = [] + generated_plt[WATERFALL] = [] + + print("Plotting each class with summary and waterfall plots...") + for i, cl in enumerate(classes): + generated_plt[SUMMARY].append( + generate_summary_plot(shap_values[i], cl, test, feature_names, path) + ) + + random_samples = randomize_samples(preds, cl, n_samples=n_samples) + for j, sample in enumerate(random_samples): + generated_plt[WATERFALL].append( + generate_waterfall_plot(j+1, shap_values[i][sample], cl, test[sample], + explainer.expected_value[i], feature_names, path) + ) + + return generated_plt + + +def build_interpretability_report(generated_plt, n_samples, report_name="interpretability.pdf", directory="."): + report = Report(report_name, directory=directory) + + root_dir = os.path.abspath(os.path.join(__file__, os.pardir)) + report.insert_doc_header(REPORT_MAIN_TITLE_MULTICLASS, logo_fig=os.path.join(root_dir, "img/BioAutoML.png")) + report.insert_text_on_doc(REPORT_SHAP_PREAMBLE, font_size=12, pos_margin=1) + + report.insert_text_on_doc(REPORT_SUMMARY_TITLE, font_size=14, style="Center", pre_margin=18, pos_margin=12, bold=True) + report.insert_figure_on_doc(generated_plt[SUMMARY]) + report.insert_text_on_doc(REPORT_SHAP_SUMMARY_1, font_size=12) + report.insert_text_on_doc(REPORT_SHAP_SUMMARY_2, font_size=12, pos_margin=1) + + report.insert_text_on_doc(REPORT_WATERFALL_TITLE, font_size=14, style="Center", pre_margin=18, pos_margin=12, bold=True) + report.insert_text_on_doc(REPORT_SHAP_WATERFALL_1(n_samples), font_size=12) + report.insert_text_on_doc(REPORT_SHAP_WATERFALL_2, font_size=12) + report.insert_figure_on_doc(generated_plt[WATERFALL]) + + report.build() + + +def multiclass_pipeline(test, test_labels, test_nameseq, norm, classifier, tuning, output, exp_n_samples): + + global clf, train, train_labels + + if not os.path.exists(output): + os.mkdir(output) + + train = train_read + train_labels = train_labels_read + column_train = train.columns + column_test = '' + + # tmp = sys.stdout + # log_file = open(output + 'task.log', 'a') + # sys.stdout = log_file + + """Number of Samples and Features: Train and Test""" + + print('Number of samples (train): ' + str(len(train))) + + """Number of labels""" + + print('Number of Labels (train):') + df_label = pd.DataFrame(train_labels) + print(str(pd.value_counts(df_label.values.flatten()))) + + if os.path.exists(ftest) is True: + column_test = test.columns + print('Number of samples (test): ' + str(len(test))) + print('Number of Labels (test):') + df_label = pd.DataFrame(test_labels) + print(str(pd.value_counts(df_label.values.flatten()))) + + print('Number of features (train): ' + str(len(column_train))) + + if os.path.exists(ftest_labels) is True: + print('Number of features (test): ' + str(len(column_test))) + + """Preprocessing: Label Encoding""" + + lb_encoder = LabelEncoder() + train_labels = lb_encoder.fit_transform(train_labels) + + """Preprocessing: Missing Values""" + + print('Checking missing values...') + missing = train.isnull().values.any() + inf = train.isin([np.inf, -np.inf]).values.any() + missing_test = False + inf_test = False + if os.path.exists(ftest) is True: + missing_test = test.isnull().values.any() + inf_test = test.isin([np.inf, -np.inf]).values.any() + if missing or inf or missing_test or inf_test: + print('There are missing values...') + print('Applying SimpleImputer - strategy (mean)...') + imp = SimpleImputer(missing_values=np.nan, strategy='mean') + train = pd.DataFrame(imp.fit_transform(train), columns=column_train) + if os.path.exists(ftest) is True: + test = pd.DataFrame(imp.transform(test), columns=column_test) + else: + print('There are no missing values...') + + """Preprocessing: StandardScaler()""" + + if norm is True: + print('Applying StandardScaler()....') + sc = StandardScaler() + train = pd.DataFrame(sc.fit_transform(train), columns=column_train) + if os.path.exists(ftest) is True: + test = pd.DataFrame(sc.transform(test), columns=column_test) + + """Choosing Classifier """ + + print('Choosing Classifier...') + if classifier == 0: + print('Tuning: ' + str(tuning)) + print('Classifier: XGBClassifier') + clf = xgb.XGBClassifier(eval_metric='mlogloss', n_jobs=n_cpu, random_state=63, use_label_encoder=False) + # train, train_labels = imbalanced_function(clf, train, train_labels) + if tuning is True: + print('Tuning not yet available for XGBClassifier') + elif classifier == 1: + print('Tuning: ' + str(tuning)) + print('Classifier: Random Forest') + clf = RandomForestClassifier(n_estimators=200, n_jobs=n_cpu, random_state=63) + # train, train_labels = imbalanced_function(clf, train, train_labels) + if tuning is True: + best_tuning, clf = tuning_rf_bayesian() + print('Finished Tuning') + elif classifier == 2: + print('Tuning: ' + str(tuning)) + print('Classifier: LightGBM') + clf = lgb.LGBMClassifier(n_estimators=500, n_jobs=n_cpu, random_state=63) + # train, train_labels = imbalanced_function(clf, train, train_labels) + if tuning is True: + best_tuning, clf = tuning_lightgbm_bayesian() + print('Finished Tuning') + else: + sys.exit('This classifier option does not exist - Try again') + + """Preprocessing: Feature Importance-Based Feature Selection""" + + print('Applying Feature Importance-Based Feature Selection...') + # best_t, best_baac = feature_importance_fs(clf, train, train_labels, column_train) + best_t = feature_importance_fs_bayesian(clf, train, train_labels) + fs = SelectFromModel(clf, threshold=best_t) + fs.fit(train, train_labels) + feature_idx = fs.get_support() + feature_name = column_train[feature_idx] + train = pd.DataFrame(fs.transform(train), columns=feature_name) + if os.path.exists(ftest) is True: + test = pd.DataFrame(fs.transform(test), columns=feature_name) + + print('Best Feature Subset: ' + str(len(feature_name))) + print('Reduction: ' + str(len(column_train)-len(feature_name)) + ' features') + fs_train = os.path.join(output, 'best_feature_train.csv') + fs_test = os.path.join(output, 'best_feature_test.csv') + print('Saving dataset with selected feature subset - train: ' + fs_train) + train.to_csv(fs_train, index=False) + if os.path.exists(ftest) is True: + print('Saving dataset with selected feature subset - test: ' + fs_test) + test.to_csv(fs_test, index=False) + print('Feature Selection - Finished...') + + """Training - StratifiedKFold (cross-validation = 10)...""" + + print('Training: StratifiedKFold (cross-validation = 10)...') + train_output = os.path.join(output, 'training_kfold(10)_metrics.csv') + matrix_output = os.path.join(output, 'training_confusion_matrix.csv') + model_output = os.path.join(output, 'trained_model.sav') + evaluate_model_cross(train, train_labels, clf, train_output, matrix_output) + + clf.fit(train, train_labels) + joblib.dump(clf, model_output) + print('Saving results in ' + train_output + '...') + print('Saving confusion matrix in ' + matrix_output + '...') + print('Saving trained model in ' + model_output + '...') + print('Training: Finished...') + + """Generating Feature Importance - Selected feature subset...""" + + print('Generating Feature Importance - Selected feature subset...') + importance_output = os.path.join(output, 'feature_importance.csv') + features_importance_ensembles(clf, feature_name, importance_output) + print('Saving results in ' + importance_output + '...') + + """Testing model...""" + + if os.path.exists(ftest) is True: + print('Generating Performance Test...') + preds = lb_encoder.inverse_transform(clf.predict(test)) + + pred_output = os.path.join(output, 'test_predictions.csv') + print('Saving prediction in ' + pred_output + '...') + save_prediction(preds, test_nameseq, pred_output) + + """Generating Explainable Machine Learning plots from the test set...""" + + try: + plot_output = os.path.join(output, 'explanations') + generated_plt = generate_all_plots(clf, train.values, test.values, preds, + test.columns, path=plot_output, n_samples=exp_n_samples) + build_interpretability_report(generated_plt, exp_n_samples, directory=output) + except ValueError as e: + print(e) + print("If you believe this is a bug, please report it to https://github.com/Bonidia/BioAutoML.") + print("Generation of explanation plots and report failed. Proceeding without it...") + except AssertionError as e: + print(e) + print("This is certainly a bug. Please report it to https://github.com/Bonidia/BioAutoML.") + print("Generation of explanation plots and report failed. Proceeding without it...") + else: + print("Explanation plots and report generated successfully!") + + if os.path.exists(ftest_labels) is True: + print('Generating Metrics - Test set...') + report = classification_report(test_labels, preds, output_dict=True) + matrix_test = (pd.crosstab(test_labels, preds, rownames=["REAL"], + colnames=["PREDITO"], margins=True)) + + metrics_output = os.path.join(output, 'metrics_test.csv') + print('Saving Metrics - Test set: ' + metrics_output + '...') + metr_report = pd.DataFrame(report).transpose() + metr_report.to_csv(metrics_output) + + matrix_output_test = os.path.join(output, 'test_confusion_matrix.csv') + matrix_test.to_csv(matrix_output_test) + print('Saving confusion matrix in ' + matrix_output_test + '...') + print('Task completed - results generated in ' + output + '!') + + else: + print('There are no test labels for evaluation, check parameters...') + # sys.stdout = tmp + # log_file.close() + else: + print('There are no test sequences for evaluation, check parameters...') + print('Task completed - results generated in ' + output + '!') + # sys.stdout = tmp + # log_file.close() ########################################################################## ########################################################################## if __name__ == '__main__': - print('\n') - print('###################################################################################') - print('###################################################################################') - print('##################### BioAutoML - MultiClass #######################') - print('########## Author: Robson Parmezan Bonidia ###########') - print('########## WebPage: https://bonidia.github.io/website/ ###########') - print('###################################################################################') - print('###################################################################################') - print('\n') - parser = argparse.ArgumentParser() - parser.add_argument('-train', '--train', help='csv format file, e.g., train.csv') - parser.add_argument('-train_label', '--train_label', default='', help='csv format file, e.g., labels.csv') - parser.add_argument('-test', '--test', help='csv format file, e.g., train.csv') - parser.add_argument('-test_label', '--test_label', default='', help='csv format file, e.g., labels.csv') - parser.add_argument('-test_nameseq', '--test_nameseq', default='', help='csv with sequence names') - parser.add_argument('-nf', '--normalization', type=bool, default=False, - help='Normalization - Features (default = False)') - parser.add_argument('-n_cpu', '--n_cpu', default=1, help='number of cpus - default = 1') - parser.add_argument('-classifier', '--classifier', default=0, - help='Classifier - 0: AdaBoost, 1: Random Forest ' - '2: LightGBM') - parser.add_argument('-tuning', '--tuning_classifier', type=bool, default=False, - help='Tuning Classifier - True = Yes, False = No, default = False') - parser.add_argument('-output', '--output', help='results directory, e.g., result/') - args = parser.parse_args() - ftrain = str(args.train) - ftrain_labels = str(args.train_label) - ftest = str(args.test) - ftest_labels = str(args.test_label) - nameseq_test = str(args.test_nameseq) - norm = args.normalization - n_cpu = int(args.n_cpu) - classifier = int(args.classifier) - tuning = args.tuning_classifier - foutput = str(args.output) - start_time = time.time() - - if os.path.exists(ftrain) is True: - train_read = pd.read_csv(ftrain) - print('Train - %s: Found File' % ftrain) - else: - print('Train - %s: File not exists' % ftrain) - sys.exit() - - if os.path.exists(ftrain_labels) is True: - train_labels_read = pd.read_csv(ftrain_labels).values.ravel() - print('Train_labels - %s: Found File' % ftrain_labels) - else: - print('Train_labels - %s: File not exists' % ftrain_labels) - sys.exit() - - test_read = '' - if ftest != '': - if os.path.exists(ftest) is True: - test_read = pd.read_csv(ftest) - print('Test - %s: Found File' % ftest) - else: - print('Test - %s: File not exists' % ftest) - sys.exit() - - test_labels_read = '' - if ftest_labels != '': - if os.path.exists(ftest_labels) is True: - test_labels_read = pd.read_csv(ftest_labels).values.ravel() - print('Test_labels - %s: Found File' % ftest_labels) - else: - print('Test_labels - %s: File not exists' % ftest_labels) - sys.exit() - - test_nameseq_read = '' - if nameseq_test != '': - if os.path.exists(nameseq_test) is True: - test_nameseq_read = pd.read_csv(nameseq_test).values.ravel() - print('Test_nameseq - %s: Found File' % nameseq_test) - else: - print('Test_nameseq - %s: File not exists' % nameseq_test) - sys.exit() - - multiclass_pipeline(test_read, test_labels_read, test_nameseq_read, norm, classifier, tuning, foutput) - cost = (time.time() - start_time)/60 - print('Computation time - Pipeline: %s minutes' % cost) + print('\n') + print('###################################################################################') + print('###################################################################################') + print('##################### BioAutoML - MultiClass #######################') + print('########## Author: Robson Parmezan Bonidia ###########') + print('########## WebPage: https://bonidia.github.io/website/ ###########') + print('###################################################################################') + print('###################################################################################') + print('\n') + parser = argparse.ArgumentParser() + parser.add_argument('-train', '--train', help='csv format file, e.g., train.csv') + parser.add_argument('-train_label', '--train_label', default='', help='csv format file, e.g., labels.csv') + parser.add_argument('-test', '--test', help='csv format file, e.g., train.csv') + parser.add_argument('-test_label', '--test_label', default='', help='csv format file, e.g., labels.csv') + parser.add_argument('-test_nameseq', '--test_nameseq', default='', help='csv with sequence names') + parser.add_argument('-nf', '--normalization', type=bool, default=False, + help='Normalization - Features (default = False)') + parser.add_argument('-n_cpu', '--n_cpu', default=1, help='number of cpus - default = 1') + parser.add_argument('-classifier', '--classifier', default=0, + help='Classifier - 0: XGBoost, 1: Random Forest ' + '2: LightGBM') + parser.add_argument('-tuning', '--tuning_classifier', type=bool, default=False, + help='Tuning Classifier - True = Yes, False = No, default = False') + parser.add_argument('-output', '--output', help='results directory, e.g., result/') + parser.add_argument('-n_exp_samples', '--n_exp_samples', default=3, + help='number of samples taken for each class in explanation analysis') + args = parser.parse_args() + ftrain = str(args.train) + ftrain_labels = str(args.train_label) + ftest = str(args.test) + ftest_labels = str(args.test_label) + nameseq_test = str(args.test_nameseq) + norm = args.normalization + n_cpu = int(args.n_cpu) + classifier = int(args.classifier) + tuning = args.tuning_classifier + foutput = str(args.output) + n_exp_samples = int(args.n_exp_samples) + start_time = time.time() + + if os.path.exists(ftrain) is True: + train_read = pd.read_csv(ftrain) + print('Train - %s: Found File' % ftrain) + else: + print('Train - %s: File not exists' % ftrain) + sys.exit() + + if os.path.exists(ftrain_labels) is True: + train_labels_read = pd.read_csv(ftrain_labels).values.ravel() + print('Train_labels - %s: Found File' % ftrain_labels) + else: + print('Train_labels - %s: File not exists' % ftrain_labels) + sys.exit() + + test_read = '' + if ftest != '': + if os.path.exists(ftest) is True: + test_read = pd.read_csv(ftest) + print('Test - %s: Found File' % ftest) + else: + print('Test - %s: File not exists' % ftest) + sys.exit() + + test_labels_read = '' + if ftest_labels != '': + if os.path.exists(ftest_labels) is True: + test_labels_read = pd.read_csv(ftest_labels).values.ravel() + print('Test_labels - %s: Found File' % ftest_labels) + else: + print('Test_labels - %s: File not exists' % ftest_labels) + sys.exit() + + test_nameseq_read = '' + if nameseq_test != '': + if os.path.exists(nameseq_test) is True: + test_nameseq_read = pd.read_csv(nameseq_test).values.ravel() + print('Test_nameseq - %s: Found File' % nameseq_test) + else: + print('Test_nameseq - %s: File not exists' % nameseq_test) + sys.exit() + + multiclass_pipeline( + test_read, test_labels_read, test_nameseq_read, norm, classifier, + tuning, foutput, n_exp_samples + ) + cost = (time.time() - start_time)/60 + print('Computation time - Pipeline: %s minutes' % cost) ########################################################################## ########################################################################## diff --git a/interpretability_report.py b/interpretability_report.py new file mode 100644 index 0000000..3b22df8 --- /dev/null +++ b/interpretability_report.py @@ -0,0 +1,203 @@ +import logging +from reportlab.lib.enums import TA_JUSTIFY, TA_CENTER +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, Table, TableStyle +from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle +from reportlab.lib.pagesizes import A4 +from reportlab.lib.units import inch +from reportlab.lib.utils import ImageReader +from itertools import zip_longest +from os.path import join, basename, exists +from sys import stdout + +report_handler = logging.StreamHandler(stream=stdout) +report_handler.setLevel(logging.WARNING) +report_handler.setFormatter(logging.Formatter("%(name)s - %(levelname)s - %(message)s")) + +report_logger = logging.getLogger(basename(__file__)) +report_logger.addHandler(report_handler) + +REPORT_MAIN_TITLE_MULTICLASS = "Model Interpretability Report (Multiclass)" +REPORT_SHAP_PREAMBLE = ( + "This report sustains the idea of being able to interpret and explain how and why the chosen model is classifying " + "each entry as it is. All of Interpretability are based in the SHAP method, in which calculates what's the importance " + "level for each feature in the classification process. It uses Shapley Values, a Game Theory concept, " + "as a descriptive metric to create an hierarquical structure between the features." +) +REPORT_SUMMARY_TITLE = "Summary Plots" +REPORT_SHAP_SUMMARY_1 = ( + "The above plot is called Summary plot and it shows, for each class, how low/high values of each feature " + "contributed for the classification with that class. The features are ranked from most descriptive to " + "least descriptive. This plot is a summarization of all the entries in the test set." +) +REPORT_SHAP_SUMMARY_2 = ( + "An impact with positive SHAP value means that a high (red dots), medium (purple dots) or low (blue dots) " + "feature value contributes positively of an entry to be classified with that class. The inverse happens with " + "negative SHAP values." +) +REPORT_WATERFALL_TITLE = "Waterfall Plots" +REPORT_SHAP_WATERFALL_1 = lambda n_samples: ( + "A Waterfall plot shows, for some entries, how each of the features contributed for it to be classified with " + f"its classified class. In this case, {n_samples} samples for each class were chosen randomly to be analyzed." +) +REPORT_SHAP_WATERFALL_2 = ( + "From a base expected value, E[f(x)], each feature contributes positively of negatively towards the entry's given " + "class. At the end, when all the contributions are summed with E[f(x)], we get the final value of f(x) which led " + "to the classification result." +) + + +REPORT_MAIN_TITLE_BINARY = "Model Interpretability Report (BioAutoML)" +REPORT_SHAP_PREAMBLE_BINARY = "SHAP: For each sample the SHAP do calculate the feature importance for the classification decision." + +REPORT_SHAP_BAR_BINARY = """ +This graph shows the average contribution of each feature, for then highlighting the best features for the model. +Through this graph it is possible to understand which are the features most important for the problem. +""" + +REPORT_SHAP_BEESWARM_BINARY= """ +Each line in this graph represents a feature and each dot a sample of the trainament conjunction. +Through this graph it is possible to try to establish a correlation between the value of the sample, being high or low, +with your contribution to the prediction. +""" + +REPORT_SHAP_WATERFALL_BINARY = """ +Each graph above it is referent to a specific sample, being that the title describes the sample label. +Each line shows a feature, on the left side can see the sample value for this feature and in the colorful bars can see the contribution value for the classification in this class. +And can see the limite E[f(x)], values below this number belong one class and values above this same number belong the other class. +""" + +make_bold = lambda s: f"{s}" +make_font_size = lambda s, size: f"{s}" + +class Report: + styles = None + story = None + doc = None + text_width = None + + def __init__(self, report_name, directory=".", lr_margin=float(0.5*inch), tb_margin=float(0.25*inch)): + + """Create a new PDF report with filename 'report_name'""" + + self.styles = getSampleStyleSheet() + self.story = [] + self.doc = SimpleDocTemplate( + join(directory, report_name), + leftMargin=lr_margin, + rightMargin=lr_margin, + topMargin=tb_margin, + bottomMargin=tb_margin, + pagesize=A4 + ) + self.styles.add(ParagraphStyle(name='Justify', fontName="Helvetica", + alignment=TA_JUSTIFY, firstLineIndent=0.3*inch)) + self.styles.add(ParagraphStyle(name='Center', fontName="Helvetica", + alignment=TA_CENTER)) + + page_width, _ = A4 + self.text_width = page_width - 2*lr_margin + + + def __get_image_preserving_ratio(self, path, width, **kwargs): + + """Load and resize an image preserving aspect ratio""" + + img = ImageReader(path) + w, h = img.getSize() + return Image(path, width=width, height=(width * (h / float(w))), **kwargs) + + + def insert_doc_header(self, title, font_size=16, logo_fig=None, pre_margin=1, pos_margin=18, bold=True): + + """Insert a header with given title and logo on the file""" + + if not logo_fig: + self.insert_text_on_doc(title, font_size=font_size, style='Center', pos_margin=pos_margin, bold=bold) + return + + if pre_margin > 0: + self.story.append(Spacer(1, pre_margin)) + else: + report_logger.warning(f"'pre_margin' can't be negative. Ignoring it " +\ + f"and using default value (1). [pre_margin={pre_margin}]") + + assert exists(logo_fig), f"Logo figure in path {logo_fig} does not exist." + fmt = make_font_size(make_bold(title) if bold else title, font_size) + self.story.append(Table( + [ + [Paragraph(fmt, self.styles['Center']), + self.__get_image_preserving_ratio(logo_fig, 0.15*self.text_width)] + ], + style=TableStyle([('VALIGN', (0,0), (1,0), 'MIDDLE')]), + colWidths=[0.8*self.text_width, 0.2*self.text_width] + )) + + if pos_margin > 0: + self.story.append(Spacer(1, pos_margin)) + else: + report_logger.warning(f"'pos_margin' can't be negative. Ignoring it " +\ + f"and using default value (18). [pos_margin={pos_margin}]") + + + def insert_text_on_doc(self, text, font_size=12, style='Justify', pre_margin=1, pos_margin=12, bold=False): + + """Insert a new paragraph on report with given text customization""" + + if pre_margin > 0: + self.story.append(Spacer(1, pre_margin)) + else: + report_logger.warning(f"'pre_margin' can't be negative. Ignoring it " +\ + f"and using default value (1). [pre_margin={pre_margin}]") + + assert font_size > 0, f"Error: 'font_size' can't be negative. Aborted. [font_size={font_size}]" + fmt = make_font_size(make_bold(text) if bold else text, font_size) + self.story.append(Paragraph(fmt, self.styles[style])) + + if pos_margin > 0: + self.story.append(Spacer(1, pos_margin)) + else: + report_logger.warning(f"'pos_margin' can't be negative. Ignoring it " +\ + f"and using default value (12). [pos_margin={pos_margin}]") + + + def insert_figure_on_doc(self, fig_paths, pre_margin=1, pos_margin=24): + """ + Insert a list of figures pairwise into the report + If the size of the list is odd, the last one will be centered + """ + pairwise = lambda iterable: list(zip_longest(*[iter(iterable)] * 2, fillvalue=None)) + + assert len(fig_paths) > 0, "List of figures (fig_paths) is empty." + w, h = ImageReader(fig_paths[0]).getSize() + ratio = h / float(w) + + for fig, fig2 in pairwise(fig_paths): + assert exists(fig), f"Figure in path {fig} does not exist." + + if pre_margin > 0: + self.story.append(Spacer(1, pre_margin)) + else: + report_logger.warning(f"'pre_margin' can't be negative. Ignoring it " +\ + f"and using default value (1). [pre_margin={pre_margin}]") + + if not fig2: + self.story.append(Image(fig, width=0.5*self.text_width, height=0.5*self.text_width * ratio)) + else: + assert exists(fig2), f"Figure in path {fig2} does not exist." + self.story.append(Table( + [[Image(fig, width=0.5*self.text_width, height=0.5*self.text_width * ratio),\ + Image(fig2, width=0.5*self.text_width, height=0.5*self.text_width * ratio)]] + )) + + if pos_margin > 0: + self.story.append(Spacer(1, pos_margin)) + else: + report_logger.warning(f"'pos_margin' can't be negative. Ignoring it " +\ + f"and using default value (12). [pos_margin={pos_margin}]") + + + def build(self): + + """Build report from built story list""" + + self.doc.build(self.story)