Skip to content

Commit

Permalink
Do None check before publishing metric (#2291)
Browse files Browse the repository at this point in the history
* Do None check before publishing metric

* upgrade component

* add another bug fix

* add another try catch

* Don't use spark run job to track the run metrics
  • Loading branch information
yunjie-hub authored Feb 12, 2024
1 parent e424387 commit cdd0653
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: pipeline
name: data_drift_signal_monitor
display_name: Data Drift - Signal Monitor
description: Computes the data drift between a baseline and production data assets.
version: 0.3.30
version: 0.3.31
is_deterministic: true

inputs:
Expand Down Expand Up @@ -190,7 +190,7 @@ jobs:
type: aml_token
output_signal_metrics:
type: spark
component: azureml://registries/azureml/components/model_monitor_output_metrics/versions/0.3.9
component: azureml://registries/azureml/components/model_monitor_output_metrics/versions/0.3.11
inputs:
signal_metrics:
type: mltable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: pipeline
name: data_quality_signal_monitor
display_name: Data Quality - Signal Monitor
description: Computes the data quality of a target dataset with reference to a baseline.
version: 0.3.28
version: 0.3.29
is_deterministic: true

inputs:
Expand Down Expand Up @@ -188,7 +188,7 @@ jobs:
type: aml_token
output_signal_metrics:
type: spark
component: azureml://registries/azureml/components/model_monitor_output_metrics/versions/0.3.9
component: azureml://registries/azureml/components/model_monitor_output_metrics/versions/0.3.11
inputs:
signal_metrics:
type: mltable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: pipeline
name: feature_attribution_drift_signal_monitor
display_name: Feature Attribution Drift - Signal Monitor
description: Computes the feature attribution between a baseline and production data assets.
version: 0.3.21
version: 0.3.22
is_deterministic: true

inputs:
Expand Down Expand Up @@ -101,7 +101,7 @@ jobs:
type: aml_token
output_signal_metrics:
type: spark
component: azureml://registries/azureml/components/model_monitor_output_metrics/versions/0.3.9
component: azureml://registries/azureml/components/model_monitor_output_metrics/versions/0.3.11
inputs:
signal_metrics:
type: mltable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: spark
name: model_monitor_output_metrics
display_name: Model Monitor - Output Metrics
description: Output the computed model monitor metrics to the default datastore.
version: 0.3.10
version: 0.3.11
is_deterministic: true

code: ../../src/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: pipeline
name: prediction_drift_signal_monitor
display_name: Prediction Drift - Signal Monitor
description: Computes the prediction drift between a baseline and a target data assets.
version: 0.4.5
version: 0.4.6
is_deterministic: true

inputs:
Expand Down Expand Up @@ -164,7 +164,7 @@ jobs:
type: aml_token
output_signal_metrics:
type: spark
component: azureml://registries/azureml/components/model_monitor_output_metrics/versions/0.3.9
component: azureml://registries/azureml/components/model_monitor_output_metrics/versions/0.3.11
inputs:
signal_metrics:
type: mltable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ def publish_metrics(self, step: int):
threshold = None
if "threshold" in run_metric:
threshold = run_metric["threshold"]
publish_metric(
run_metric["runId"], float(run_metric["value"]), threshold, step
)
if run_metric.get("value") is not None:
publish_metric(
run_metric["runId"], float(run_metric["value"]), threshold, step
)

def to_dict(self) -> dict:
"""Convert to a dictionary object."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,7 @@
RECALL_METRIC_NAME = "Recall"
MEAN_ABSOLUTE_ERROR_METRIC_NAME = "MeanAbsoluteError"
ROOT_MEAN_SQUARED_ERROR_METRIC_NAME = "RootMeanSquaredError"

# util
MLFLOW_RUN_ID = "MLFLOW_RUN_ID"
MAX_RETRY_COUNT = 3
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
# Licensed under the MIT License.

"""Contains functionality for publishing run metrics."""
from .constants import MAX_RETRY_COUNT, MLFLOW_RUN_ID
import mlflow
import os
import traceback
import time


def _get_experiment_id():
Expand Down Expand Up @@ -100,6 +103,9 @@ def get_or_create_run_id(
with mlflow.start_run(run_id=_get_or_create_parent_run_id(monitor_name)) as current_run:
print(f"Current parent run id: {current_run.info.run_id}")
run_name = f"{signal_name}_{metric_name}"
if MLFLOW_RUN_ID in os.environ:
print(f"environment variable MLFLOW_RUN_ID:{os.environ[MLFLOW_RUN_ID]}")
del os.environ[MLFLOW_RUN_ID]
with mlflow.start_run(
nested=True,
run_name=run_name,
Expand All @@ -122,11 +128,20 @@ def publish_metric(run_id: str, value: float, threshold, step: int):
metrics["value"] = value
if threshold is not None:
metrics["threshold"] = float(threshold)
publish_metrics(run_id=run_id, metrics=metrics, step=step)
try:
publish_metrics(run_id=run_id, metrics=metrics, step=step)
except Exception:
print(f"Exception occurred in publish_metric: {traceback.format_exc()}")


def publish_metrics(run_id: str, metrics: dict, step: int):
"""Publish metrics to the run metrics store."""
print(f"Publishing metrics to run id '{run_id}'.")
with mlflow.start_run(run_id=run_id, nested=True):
mlflow.log_metrics(metrics=metrics, step=step)
for _ in range(MAX_RETRY_COUNT):
try:
mlflow.log_metrics(metrics=metrics, step=step)
return
except Exception:
print(f"Exception occurred in publish_metrics: {traceback.format_exc()}")
time.sleep(1)

0 comments on commit cdd0653

Please sign in to comment.