Skip to content

Commit ae2ee25

Browse files
committed
[REF-6198] XGBoost: Generic training method to generate xgboost model
1 parent 488bb94 commit ae2ee25

File tree

4 files changed

+556
-0
lines changed

4 files changed

+556
-0
lines changed

chapter-XG/pipeline.json

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
{
2+
"name": "DB fetch + XGB Train",
3+
"engineType": "Generic",
4+
"pipe": [
5+
{
6+
"name": "db_to_dataframe",
7+
"id": 1,
8+
"type": "db_to_dataframe",
9+
"parents": [],
10+
"arguments":
11+
{
12+
"host": "localhost",
13+
"user": "pmuser",
14+
"password": "P3M3Admin!",
15+
"db": "dataset",
16+
"table": "dloan"
17+
}
18+
},
19+
{
20+
"name": "XGBoostTrain",
21+
"id": 2,
22+
"type": "XGBoostTrain",
23+
"parents": [{"parent": 1, "output": 0, "input": 0}],
24+
"arguments": {
25+
"validation_split": 0.3,
26+
"auc_threshold": 0.5,
27+
"ks_threshold": 0.3,
28+
"psi_threshold": 1,
29+
"n_estimators": 300,
30+
"max_depth": 7,
31+
"learning_rate": 0.1,
32+
"min_child_weight": 1,
33+
"objective": "binary:logistic",
34+
"gamma": 0,
35+
"max_delta_step": 0,
36+
"subsample": 1,
37+
"reg_alpha": 0,
38+
"reg_lambda": 0,
39+
"scale_pos_weight": 1.2,
40+
"categorical_cols": ["verification_status", "addr_state", "purpose", "home_ownership", "term"],
41+
"input_file": "/tmp/dloan_train.csv",
42+
"output-model": "/tmp/def-loan-model.pkl"
43+
}
44+
}
45+
]
46+
}
Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
import numpy as np
2+
import pandas as pd
3+
import argparse
4+
import pickle
5+
import multiprocessing
6+
import xgboost as xgb
7+
8+
from parallelm.mlops import StatCategory as st
9+
from parallelm.mlops import mlops as mlops
10+
from parallelm.components import ConnectableComponent
11+
from parallelm.ml_engine.python_engine import PythonEngine
12+
from parallelm.mlops.stats.bar_graph import BarGraph
13+
from parallelm.mlops.stats.table import Table
14+
from parallelm.mlops.stats.graph import MultiGraph
15+
16+
from scipy.stats import ks_2samp
17+
from sklearn.metrics import accuracy_score, roc_auc_score, roc_curve, confusion_matrix, classification_report
18+
from sklearn.model_selection import train_test_split
19+
from sklearn.pipeline import Pipeline
20+
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
21+
from sklearn_pandas import DataFrameMapper
22+
23+
24+
25+
class XGBoostTrain(ConnectableComponent):
26+
27+
def __init__(self, engine):
28+
super(self.__class__, self).__init__(engine)
29+
30+
def _materialize(self, parent_data_objs, user_data):
31+
model_file = self._prep_and_train(parent_data_objs[0])
32+
self._logger.info("Model file saved: {}".format(model_file))
33+
return [model_file]
34+
35+
def _prep_and_train(self, df_dataset):
36+
self.min_auc_requirement = self._params["auc_threshold"]
37+
self.max_ks_requirement = self._params["ks_threshold"]
38+
self.min_psi_requirement = self._params["psi_threshold"]
39+
train_on_col = self._params["train_on_column"]
40+
41+
#mlops Init
42+
mlops.init()
43+
44+
mlops.set_data_distribution_stat(df_dataset)
45+
y = df_dataset[train_on_col]
46+
self._logger.info("train_on_col= {}".format(train_on_col))
47+
self._logger.info("df_dataset {}".format(df_dataset.shape[1]))
48+
X = df_dataset.drop(train_on_col, axis=1)
49+
self._logger.info("df_dataset {}".format(X.shape[1]))
50+
51+
# Splitting the data to train and test sets:
52+
X_train, X_test, y_train, y_test = train_test_split(X,
53+
y,
54+
test_size=self._params["validation_split"],
55+
random_state=42)
56+
All_columns = X_train.columns.tolist()
57+
categorical_columns = self._params["categorical_cols"]
58+
mapper_list =[]
59+
for d in All_columns:
60+
if d in categorical_columns:
61+
mapper_list.append(([d], OneHotEncoder(handle_unknown='ignore')))
62+
else:
63+
mapper_list.append(([d], MinMaxScaler()))
64+
65+
mapper = DataFrameMapper(mapper_list)
66+
67+
## Training
68+
# XGBoost Training:
69+
n_cpu = multiprocessing.cpu_count()
70+
71+
xgboost_model = xgb.XGBClassifier(max_depth=int(self._params["max_depth"]),
72+
min_child_weight=int(self._params["min_child_weight"]),
73+
learning_rate=float(self._params["learning_rate"]),
74+
n_estimators=int(self._params["n_estimators"]),
75+
silent=True,
76+
objective=self._params["objective"],
77+
gamma=float(self._params["gamma"]),
78+
max_delta_step=int(self._params["max_delta_step"]),
79+
subsample=float(self._params["subsample"]),
80+
colsample_bytree=1,
81+
colsample_bylevel=1,
82+
reg_alpha=float(self._params["reg_alpha"]),
83+
reg_lambda=float(self._params["reg_lambda"]),
84+
scale_pos_weight=float(self._params["scale_pos_weight"]),
85+
seed=1,
86+
n_jobs=n_cpu,
87+
missing=None)
88+
89+
final_model = Pipeline([("mapper", mapper), ("xgboost", xgboost_model)])
90+
final_model.fit(X_train, y_train)
91+
92+
# Prediction and prediction distribution
93+
pred_labels = final_model.predict(X_test)
94+
pred_probs = final_model.predict_proba(X_test)
95+
96+
# Accuracy calculation
97+
# Accuracy for the xgboost model
98+
accuracy = accuracy_score(y_test, pred_labels)
99+
self._logger.info("XGBoost Accuracy value: {0}".format(accuracy))
100+
# Output accuracy of the chosen model using MCenter
101+
mlops.set_stat("XGBoost Accuracy", accuracy, st.TIME_SERIES)
102+
103+
# Label distribution:
104+
# Label distribution in training
105+
value, counts = np.unique(y_test, return_counts=True)
106+
label_distribution = np.asarray((value, counts)).T
107+
self._logger.info("Validation Actual Label distributions: \n {0}".format(label_distribution))
108+
# Output Label distribution as a BarGraph using MCenter
109+
export_bar_table(label_distribution[:,0], label_distribution[:,1], "Validation - Actual Label Distribution")
110+
111+
# Prediction distribution and prediction confidence distribution
112+
# Pred Label distribution in training
113+
pred_value, pred_counts = np.unique(pred_labels, return_counts=True)
114+
pred_label_distribution = np.asarray((pred_value, pred_counts)).T
115+
self._logger.info("XGBoost Validation Prediction Label Distributions: \n {0}".format(pred_label_distribution))
116+
# Output Pred label distribution as a BarGraph using MCenter
117+
export_bar_table(pred_label_distribution[:,0], pred_label_distribution[:,1], "Validation - XGBoost Prediction Distribution")
118+
119+
# Pred confidence per label
120+
label_number = len(pred_counts)
121+
average_confidence = np.zeros(label_number)
122+
max_pred_probs = pred_probs.max(axis=1)
123+
for i in range(0, label_number):
124+
index_class = np.where(pred_labels == i)[0]
125+
if pred_counts[i] > 0:
126+
average_confidence[i] = np.sum(max_pred_probs[index_class])/(float(pred_counts[i]))
127+
else:
128+
average_confidence[i] = 0
129+
self._logger.info("XGBoost Validation Average Prediction confidence per label: \n {0}".format(average_confidence))
130+
131+
# Output Pred label distribution as a BarGraph using MCenter
132+
export_bar_table(pred_value, average_confidence, "Validation - XGBoost Average confidence per class")
133+
134+
# Confusion Matrix
135+
# XGBoost Confusion Matrix
136+
confmat = confusion_matrix(y_true=y_test, y_pred=pred_labels)
137+
self._logger.info("Confusion Matrix for XGBoost: \n {0}".format(confmat))
138+
# Output Confusion Matrix as a Table using MCenter
139+
export_confusion_table(confmat, "XGBoost")
140+
141+
# Classification Report
142+
# XGBoost Classification Report
143+
class_rep = classification_report(y_true=y_test, y_pred=pred_labels, output_dict=True)
144+
self._logger.info("XGBoost Classification Report: \n {0}".format(class_rep))
145+
146+
# AUC and ROC Curves
147+
# ROC for XGBoost model
148+
roc_auc = roc_auc_score(y_test, pred_probs[:, 1])
149+
self._logger.info("XGBoost ROC AUC value: {}".format(roc_auc))
150+
151+
# Output ROC of the chosen model using MCenter
152+
mlops.set_stat("XGBoost ROC AUC", roc_auc, st.TIME_SERIES)
153+
154+
if roc_auc <= self.min_auc_requirement:
155+
mlops.health_alert("[Training] AUC Violation From Training Node",
156+
"AUC Went Below {}. Current AUC Is {}".format(self.min_auc_requirement, roc_auc))
157+
158+
# ROC curve
159+
fpr, tpr, thr = roc_curve(y_test, pred_probs[:, 1])
160+
161+
cg = MultiGraph().name("Receiver Operating Characteristic ").set_continuous()
162+
cg.add_series(label='Random curve ''', x=fpr.tolist(), y=fpr.tolist())
163+
cg.add_series(label='XGBoost ROC curve (area = {0:0.2f})'''.format(roc_auc), x=fpr.tolist(), y=tpr.tolist())
164+
cg.x_title('False Positive Rate')
165+
cg.y_title('True Positive Rate')
166+
mlops.set_stat(cg)
167+
168+
# Feature importance comparison
169+
# XGBoost Feature importance
170+
export_feature_importance(final_model, list(X_train.columns), 5, "XGBoost")
171+
172+
# KS Analysis
173+
max_pred_probs = pred_probs.max(axis=1)
174+
y_test0=np.where(y_test == 0)[0]
175+
y_test1=np.where(y_test == 1)[0]
176+
177+
# KS for the XGBoost model
178+
ks = ks_2samp(max_pred_probs[y_test0], max_pred_probs[y_test1])
179+
ks_stat = ks.statistic
180+
ks_pvalue = ks.pvalue
181+
self._logger.info("KS values for XGBoost: \n Statistics: {} \n pValue: {}\n".format(ks_stat, ks_pvalue))
182+
183+
# Output KS Stat of the chosen model using MCenter
184+
mlops.set_stat("KS Stats for CGBoost", ks_stat, st.TIME_SERIES)
185+
186+
# raising alert if ks-stat goes above required threshold
187+
if ks_stat >= self.max_ks_requirement:
188+
mlops.health_alert("[Training] KS Violation From Training Node",
189+
"KS Stat Went Above {}. Current KS Stat Is {}".format(self.max_ks_requirement, ks_stat))
190+
191+
ks_table = Table().name("KS Stats for XGBoost").cols(["Statistic", "pValue"])
192+
ks_table.add_row([ks_stat, ks_pvalue])
193+
mlops.set_stat(ks_table)
194+
195+
# PSI Analysis
196+
# Calculating PSI
197+
total_psi, psi_table = get_psi(self, max_pred_probs[y_test0], max_pred_probs[y_test1])
198+
psi_table_stat = Table().name("PSI Stats for XGBoost").cols(
199+
["Base Pop", "Curr Pop", "Lower Bound", "Upper Bound", "Base Percent", "Curr Percent",
200+
"Segment PSI"])
201+
row_num = 1
202+
for each_value in psi_table.values:
203+
str_values = [str(i) for i in each_value]
204+
psi_table_stat.add_row(str(row_num), str_values)
205+
row_num += 1
206+
mlops.set_stat(psi_table_stat)
207+
self._logger.info("Total XGBoost PSI values: \n {}".format(total_psi))
208+
# Output Total PSI of the chosen model using MCenter
209+
mlops.set_stat("Total XGBoost PSI ", total_psi, st.TIME_SERIES)
210+
211+
if total_psi >= self.min_psi_requirement:
212+
mlops.health_alert("[Training] PSI Violation From Training Node",
213+
"PSI Went Below {}. Current PSI Is {}".format(self.min_psi_requirement,
214+
total_psi))
215+
216+
# ## Save the XGBoost Model
217+
model_file = open(self._params["output-model"], 'wb')
218+
pickle.dump(final_model, model_file)
219+
model_file.close()
220+
221+
# ## Finish the program
222+
mlops.done()
223+
224+
return(model_file)
225+
226+
227+
def export_bar_table(bar_names, bar_data, title_name):
228+
"""
229+
This function provides a bar_graph for a bar type data at MCenter data scientist view
230+
:param bar_names: Bar graph names
231+
:param bar_data: Bar graph data.
232+
:param title_name: Title of the bar Graph
233+
:return:
234+
"""
235+
bar_graph_data = BarGraph().name(title_name).cols(
236+
bar_names.astype(str).tolist()).data(
237+
bar_data.tolist())
238+
mlops.set_stat(bar_graph_data)
239+
240+
def export_confusion_table(confmat, algo):
241+
"""
242+
This function provides the confusion matrix as a table in at MCenter data scientist view
243+
:param confmat: Confusion matrix
244+
:param algo: text for the algorithm type
245+
:return:
246+
"""
247+
248+
tbl = Table() \
249+
.name("Confusion Matrix for " + str(algo)) \
250+
.cols(["Predicted label: " + str(i) for i in range(0, confmat.shape[0])])
251+
for i in range(confmat.shape[1]):
252+
tbl.add_row("True Label: " + str(i), [str(confmat[i, j]) for j in range(0, confmat.shape[0])])
253+
mlops.set_stat(tbl)
254+
255+
def export_feature_importance(final_model, column_names, num_features, title_name):
256+
"""
257+
This function provides a feature importance at MCenter data scientist view
258+
:param final_model: Pipeline model (Assume - Feature_Eng + Algo)
259+
:param column_names: Column names of the input dataframe.
260+
:param num_features: Number of fefatures to shpw.
261+
:param title_name: Title of the bar Graph
262+
:return:
263+
"""
264+
model_oh = final_model.steps[0][1].features
265+
trans_feature_names = []
266+
for mod_el in range(0,len(model_oh)):
267+
if("OneHotEncoder" in model_oh[mod_el][1].__class__.__name__):
268+
trans_feature_names += list(model_oh[mod_el][1].get_feature_names([column_names[mod_el]]))
269+
else:
270+
trans_feature_names.append(column_names[mod_el])
271+
trans_feature_names1 = np.asarray(trans_feature_names)
272+
model_FE_index = np.argsort(final_model.steps[-1][1].feature_importances_)[::-1][:num_features]
273+
feat_eng = pd.DataFrame({'Name': trans_feature_names1[model_FE_index],
274+
'Importance': final_model.steps[-1][1].feature_importances_[model_FE_index]})
275+
print("Feature Importance for " + str(title_name) + "\n", feat_eng)
276+
# Output Feature Importance as a BarGraph using MCenter
277+
export_bar_table(trans_feature_names1[model_FE_index],
278+
final_model.steps[-1][1].feature_importances_[model_FE_index],
279+
"Feature Importance for " + str(title_name))
280+
281+
def get_psi(caller_ctx, v1, v2, num1=10):
282+
"""
283+
calculate PSI.
284+
285+
:param v1: vector 1
286+
:param v2: vector 2
287+
:param num1: number of bins
288+
:return: PSI Value
289+
"""
290+
rank1 = pd.qcut(v1, num1, labels=False, duplicates="drop") + 1
291+
num = min(num1, max(rank1))
292+
293+
basepop1 = pd.DataFrame({'v1': v1, 'rank1': rank1})
294+
295+
quantiles = basepop1.groupby('rank1').agg({'min', 'max'})
296+
quantiles.loc[1, 'v1'][0] = 0
297+
298+
currpop = pd.DataFrame({'v2': v2, 'rank1': [1] * v2.shape[0]})
299+
for i in range(2, num + 1):
300+
currpop.loc[currpop['v2'] >= quantiles['v1'].loc[i][0], 'rank1'] = i
301+
quantiles.loc[i - 1, 'v1'][1] = quantiles.loc[i, 'v1'][0]
302+
quantiles.loc[num, 'v1'][1] = 1
303+
304+
basepop2 = basepop1.groupby('rank1').agg({'count'})
305+
basepop2 = basepop2.rename(columns={'count': 'basenum'})
306+
307+
currpop2 = currpop.groupby('rank1').agg({'count'})
308+
currpop2 = currpop2.rename(columns={'count': 'currnum'})
309+
310+
nbase = basepop1.shape[0]
311+
ncurr = currpop.shape[0]
312+
313+
mrged1 = basepop2['v1'].join(currpop2['v2'], how='left')
314+
if mrged1.shape[0] > 1:
315+
mrged1.loc[mrged1.currnum.isna(), "currnum"] = 0
316+
317+
mrged2 = mrged1.join(quantiles['v1'], how='left')
318+
319+
mrged3 = mrged2
320+
mrged3['basepct'] = mrged3.basenum / nbase
321+
mrged3['currpct'] = mrged3.currnum / ncurr
322+
323+
mrged4 = mrged3
324+
mrged4['psi'] = (mrged4.currpct - mrged4.basepct) * np.log((mrged4.currpct / mrged4.basepct))
325+
326+
caller_ctx._logger.info("Merged DF: {}".format(mrged4))
327+
328+
tot_PSI = sum(mrged4.psi[mrged4.psi != float('inf')])
329+
final_table = mrged4
330+
return tot_PSI, final_table
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

0 commit comments

Comments
 (0)