-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathModel Monitoring.py
386 lines (282 loc) · 14 KB
/
Model Monitoring.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# Databricks notebook source
# MAGIC %md
# MAGIC ## Model Monitoring
# COMMAND ----------
# MAGIC %run ./includes/utilities
# COMMAND ----------
# MAGIC %run ./includes/configuration
# COMMAND ----------
from datetime import datetime as dt
from datetime import timedelta
dbutils.widgets.removeAll()
dbutils.widgets.dropdown("00.Airport_Code", "JFK", ["JFK","SEA","BOS","ATL","LAX","SFO","DEN","DFW","ORD","CVG","CLT","DCA","IAH"])
dbutils.widgets.text('01.training_start_date', "2018-01-01")
dbutils.widgets.text('02.training_end_date', "2019-03-15")
dbutils.widgets.text('03.inference_date', (dt.strptime(str(dbutils.widgets.get('02.training_end_date')), "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d"))
dbutils.widgets.text('04.promote_model', "No")
airport_code = str(dbutils.widgets.get('00.Airport_Code'))
training_start_date = str(dbutils.widgets.get('01.training_start_date'))
training_end_date = str(dbutils.widgets.get('02.training_end_date'))
inference_date = str(dbutils.widgets.get('03.inference_date'))
if dbutils.widgets.get("04.promote_model")=='Yes':
promote_model = True
else:
promote_model = False
print(airport_code,training_start_date,training_end_date,inference_date,promote_model)
# COMMAND ----------
def abbrToID(data):
"""
This function is designed to convert abbreviations to their airportID. For use with non-SparkDF datatypes.
Input: A String, representing the airport Code
Output: an Integer, representing the airportID
"""
if data == "ATL":
data = 10397
elif data == "BOS":
data = 10721
elif data == "CLT":
data = 11057
elif data == "ORD":
data = 13930
elif data == "CVG":
data = 11193
elif data == "DFW":
data = 11298
elif data == "DEN":
data = 11292
elif data == "IAH":
data = 12266
elif data == "LAX":
data = 12892
elif data == "JFK":
data = 12478
elif data == "SFO":
data = 14771
elif data == "SEA":
data = 14747
elif data == "DCA":
data = 11278
else:
data = 99999
return(data)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Forecast flight delay at selected airport
# COMMAND ----------
import mlflow
from pprint import pprint
from mlflow.tracking import MlflowClient
import plotly.express as px
from datetime import timedelta, datetime
import numpy as np
import pandas as pd
client = MlflowClient()
# COMMAND ----------
# Select whether you would like to monitor arrival or depature models using "arr" or "dep":
model_type = "arr"
# Select model names for staging and production:
# group05_arr_sig_newfeatures
# group05_dep_sig_newfeatures
# g05_Arr_LinearModel
# g05_Dep_LinearModel
model_name_staging = "group05_arr_sig_newfeatures"
model_name_production = "g05_Arr_LinearModel"
# COMMAND ----------
mlflow.set_experiment('/Users/[email protected]/flight_delay/dscc202_group05_experiment')
# COMMAND ----------
stage_version = None
# get the respective versions
for mv in client.search_model_versions(f"name='{model_name_staging}'"):
if dict(mv)['current_stage'] == 'Staging':
stage_version=dict(mv)['version']
if stage_version is not None:
stage_model = mlflow.pyfunc.load_model(f"models:/{model_name_staging}/Staging")
print("Staging Model: ", stage_model)
# COMMAND ----------
prod_version = None
# get the respective versions
for mv in client.search_model_versions(f"name='{model_name_production}'"):
if dict(mv)['current_stage'] == 'Production':
prod_version=dict(mv)['version']
if prod_version is not None:
prod_model = mlflow.pyfunc.load_model(f"models:/{model_name_production}/Production")
print("Production Model: ", prod_model)
# COMMAND ----------
# assemble dataset for forecasting
databaseName = GROUP_DBNAME
airport_id = abbrToID(airport_code)
fdf = spark.sql('''
SELECT *
FROM {0}.silver{1}_delta
WHERE ORIGIN_AIRPORT_ID = {2}
AND
FL_DATE BETWEEN '{3}' AND '{4}'
'''.format(databaseName, model_type, airport_id, training_end_date, inference_date)
)
# COMMAND ----------
# Forecast using the production and staging models
df_forecast_staging = fdf.toPandas().fillna(method='ffill').fillna(method='bfill')
df_forecast_staging['model'] = 'Staging'
if model_type == "arr":
df_forecast_staging['yhat'] = stage_model.predict(pd.DataFrame(df_forecast_staging.drop(["model","FL_DATE","ARR_DELAY"], axis=1).values, columns=['ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'DEP_DELAY', 'DAY_OF_WEEK', 'MONTH', 'YEAR', 'hour', 'QUARTER', 'DAY_OF_MONTH', 'avg_temp_f', 'tot_precip_mm', 'avg_wnd_mps', 'avg_vis_m', 'avg_slp_hpa', 'avg_dewpt_f'], dtype=np.int32))
if model_type == "dep":
df_forecast_staging['yhat'] = stage_model.predict(pd.DataFrame(df_forecast_staging.drop(["model","FL_DATE","DEP_DELAY"], axis=1).values, columns=['ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'ARR_DELAY', 'DAY_OF_WEEK', 'MONTH', 'YEAR', 'hour', 'QUARTER', 'DAY_OF_MONTH', 'avg_temp_f', 'tot_precip_mm', 'avg_wnd_mps', 'avg_vis_m', 'avg_slp_hpa', 'avg_dewpt_f'], dtype=np.int32))
df_forecast_production = fdf.toPandas().fillna(method='ffill').fillna(method='bfill')
df_forecast_production['model'] = 'Production'
if model_type == "arr":
df_forecast_production['yhat'] = prod_model.predict(pd.DataFrame(df_forecast_production.drop(["model","FL_DATE","ARR_DELAY"], axis=1).values, columns=['ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'DEP_DELAY', 'DAY_OF_WEEK', 'MONTH', 'YEAR', 'hour', 'QUARTER', 'DAY_OF_MONTH','avg_temp_f', 'tot_precip_mm', 'avg_wnd_mps', 'avg_vis_m', 'avg_slp_hpa', 'avg_dewpt_f'], dtype=np.int32))
if model_type == "dep":
df_forecast_production['yhat'] = prod_model.predict(pd.DataFrame(df_forecast_production.drop(["model","FL_DATE","DEP_DELAY"], axis=1).values, columns=['ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'ARR_DELAY', 'DAY_OF_WEEK', 'MONTH', 'YEAR', 'hour', 'QUARTER', 'DAY_OF_MONTH','avg_temp_f', 'tot_precip_mm', 'avg_wnd_mps', 'avg_vis_m', 'avg_slp_hpa', 'avg_dewpt_f'], dtype=np.int32))
# COMMAND ----------
df = pd.concat([df_forecast_staging,df_forecast_production]).reset_index()
df = df.sort_values(['hour'])
labels={
"hour": "Forecast Time",
"yhat": "Forecasted Delay",
"model": "Model Stage"
}
fig = px.line(df, x="hour", y="yhat", color='model', title=f"{airport_code} Delay Forecast by Model Stage", labels=labels)
fig.show()
# COMMAND ----------
# MAGIC %md
# MAGIC ## Monitoring the model performance in training period
# COMMAND ----------
stage_version = None
# get the respective versions
for mv in client.search_model_versions(f"name='{model_name_staging}'"):
if dict(mv)['current_stage'] == 'Staging':
stage_version=dict(mv)['version']
if stage_version is not None:
# load the training data assocaited with the staging model
stage_model = mlflow.pyfunc.load_model(f"models:/{model_name_staging}/Staging")
print("Production Model: ", stage_model)
sdf = spark.sql(f"""SELECT * FROM {databaseName}.silver{model_type}_delta WHERE ORIGIN_AIRPORT_ID = {airport_id} AND
FL_DATE BETWEEN '{training_start_date}' AND '{training_end_date}';""").toPandas()
# COMMAND ----------
prod_version = None
# get the respective versions
for mv in client.search_model_versions(f"name='{model_name_production}'"):
if dict(mv)['current_stage'] == 'Production':
prod_version=dict(mv)['version']
if prod_version is not None:
# load the training data associated with the production model
prod_model = mlflow.pyfunc.load_model(f"models:/{model_name_production}/Production")
print("Production Model: ", prod_model)
pdf = spark.sql(f"""SELECT * FROM {databaseName}.silver{model_type}_delta WHERE ORIGIN_AIRPORT_ID = {airport_id} AND
FL_DATE BETWEEN '{training_start_date}' AND '{training_end_date}';""").toPandas()
# COMMAND ----------
# assemble dataset for training
airport_id = abbrToID(airport_code)
train_df = spark.sql('''
SELECT *
FROM {0}.silver{1}_delta
WHERE ORIGIN_AIRPORT_ID = {2}
AND
FL_DATE BETWEEN '{3}' AND '{4}'
'''.format(databaseName, model_type, airport_id, training_start_date, training_end_date)
)
# COMMAND ----------
# Train using the production and staging models
df_training_staging = train_df.toPandas().fillna(method='ffill').fillna(method='bfill')
df_training_staging['model'] = 'Staging'
if model_type == "arr":
df_training_staging['yhat'] = stage_model.predict(pd.DataFrame(df_training_staging.drop(["model","FL_DATE","ARR_DELAY"], axis=1).values, columns=['ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'DEP_DELAY', 'DAY_OF_WEEK', 'MONTH', 'YEAR', 'hour', 'QUARTER', 'DAY_OF_MONTH', 'avg_temp_f', 'tot_precip_mm', 'avg_wnd_mps', 'avg_vis_m', 'avg_slp_hpa', 'avg_dewpt_f'], dtype=np.int32))
if model_type == "dep":
df_training_staging['yhat'] = stage_model.predict(pd.DataFrame(df_training_staging.drop(["model","FL_DATE","DEP_DELAY"], axis=1).values, columns=['ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'ARR_DELAY', 'DAY_OF_WEEK', 'MONTH', 'YEAR', 'hour', 'QUARTER', 'DAY_OF_MONTH', 'avg_temp_f', 'tot_precip_mm', 'avg_wnd_mps', 'avg_vis_m', 'avg_slp_hpa', 'avg_dewpt_f'], dtype=np.int32))
df_training_production = train_df.toPandas().fillna(method='ffill').fillna(method='bfill')
df_training_production['model'] = 'Production'
if model_type == "arr":
df_training_production['yhat'] = prod_model.predict(pd.DataFrame(df_training_production.drop(["model","FL_DATE","ARR_DELAY"], axis=1).values, columns=['ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'DEP_DELAY', 'DAY_OF_WEEK', 'MONTH', 'YEAR', 'hour', 'QUARTER', 'DAY_OF_MONTH','avg_temp_f', 'tot_precip_mm', 'avg_wnd_mps', 'avg_vis_m', 'avg_slp_hpa', 'avg_dewpt_f'], dtype=np.int32))
if model_type == "dep":
df_training_production['yhat'] = prod_model.predict(pd.DataFrame(df_training_production.drop(["model","FL_DATE","DEP_DELAY"], axis=1).values, columns=['ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID', 'ARR_DELAY', 'DAY_OF_WEEK', 'MONTH', 'YEAR', 'hour', 'QUARTER', 'DAY_OF_MONTH','avg_temp_f', 'tot_precip_mm', 'avg_wnd_mps', 'avg_vis_m', 'avg_slp_hpa', 'avg_dewpt_f'], dtype=np.int32))
# COMMAND ----------
sdf['stage']="staging"
if model_type == "arr":
sdf['residual']=sdf['ARR_DELAY']-df_training_staging['yhat']
if model_type == "dep":
sdf['residual']=sdf['DEP_DELAY']-df_training_staging['yhat']
sdf['yhat']=df_training_staging['yhat']
pdf['stage']="prod"
if model_type == "arr":
pdf['residual']=pdf['ARR_DELAY']-df_training_production['yhat']
if model_type == "dep":
pdf['residual']=pdf['DEP_DELAY']-df_training_production['yhat']
pdf['yhat']=df_training_production['yhat']
df=pd.concat([sdf,pdf])
# COMMAND ----------
fig = px.scatter(
df, x='yhat', y='residual',
marginal_y='violin',
color='stage', trendline='ols',
title=f"{airport_code} Delay Forecast Model Performance Comparison for Training Period"
)
fig.show()
# COMMAND ----------
# MAGIC %md
# MAGIC ## Use Tensorflow Validation Library
# MAGIC - check schema between the training and serving periods of time
# MAGIC - check for data drift and skew between training and serving
# COMMAND ----------
from sklearn.model_selection import train_test_split
import tensorflow_data_validation as tfdv
from tensorflow_data_validation.utils.display_util import get_statistics_html
import warnings
warnings.filterwarnings("ignore", message=r"Passing", category=FutureWarning)
stats_train=tfdv.generate_statistics_from_dataframe(dataframe=train_df.toPandas())
stats_serve=tfdv.generate_statistics_from_dataframe(dataframe=fdf.toPandas())
schema = tfdv.infer_schema(statistics=stats_train)
tfdv.display_schema(schema=schema)
# COMMAND ----------
# Compare evaluation data with training data
displayHTML(get_statistics_html(lhs_statistics=stats_serve, rhs_statistics=stats_train,
lhs_name='SERVE_DATASET', rhs_name='TRAIN_DATASET'))
# COMMAND ----------
anomalies = tfdv.validate_statistics(statistics=stats_serve, schema=schema)
tfdv.display_anomalies(anomalies)
# COMMAND ----------
# Add skew and drift comparators
temp_f = tfdv.get_feature(schema, 'avg_temp_f')
temp_f.skew_comparator.jensen_shannon_divergence.threshold = 0
temp_f.drift_comparator.jensen_shannon_divergence.threshold = 0
precip_mm = tfdv.get_feature(schema, 'tot_precip_mm')
precip_mm.skew_comparator.jensen_shannon_divergence.threshold = 0
precip_mm.drift_comparator.jensen_shannon_divergence.threshold = 0
_anomalies = tfdv.validate_statistics(stats_train, schema, serving_statistics=stats_serve)
tfdv.display_anomalies(_anomalies)
# COMMAND ----------
hour = tfdv.get_feature(schema, 'hour')
hour.skew_comparator.jensen_shannon_divergence.threshold = 0
hour.drift_comparator.jensen_shannon_divergence.threshold = 0
dayofweek = tfdv.get_feature(schema, 'DAY_OF_WEEK')
dayofweek.skew_comparator.jensen_shannon_divergence.threshold = 0
dayofweek.drift_comparator.jensen_shannon_divergence.threshold = 0
_anomalies = tfdv.validate_statistics(stats_train, schema, serving_statistics=stats_serve)
tfdv.display_anomalies(_anomalies)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Promote model if selected
# COMMAND ----------
# Criteria for retraining and promotion to production:
# 1.) View the "Delay Forecast Model Performance Comparison for Training Period" graph
# 2.) Analyze the graph to determine which model stage has a higher density of residuals around the horizontal line at residual=0. Note: The residuals are the difference between model predicted values and ground truth values.
# 3a.) If the staging model observes a higher density of residuals around the horizontal line at residual=0 than the production model, then promote the current staging model to production and archive the old production model.
# OR
# 3b.) If the staging model does NOT observe a higher density of residuals around horizontal line at residual=0 than the production model, then retrain staging model and repeat the above process.
# COMMAND ----------
# promote staging to production
if promote_model and stage_version is not None and prod_version is not None:
# Archive the production model
client.transition_model_version_stage(
name=model_name,
version=prod_version,
stage="Archived"
)
# Staging --> Production
client.transition_model_version_stage(
name=model_name,
version=stage_version,
stage="Production"
)
# COMMAND ----------
import json
dbutils.notebook.exit("Success")