Skip to content

Commit

Permalink
update model performance interface for one input dataset (#2072)
Browse files Browse the repository at this point in the history
* update model performance interface

* update version

* address comments

* remove unused import

* throw exception when no data generated in data joiner

* fix flaky

* fix long string multiple lines

* fix e2e

* fix e2e
  • Loading branch information
VivienneTang authored Jan 8, 2024
1 parent 523d6aa commit 8426639
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: spark
name: model_monitor_data_joiner
display_name: Model Monitor - Data Joiner
description: Joins two data assets on the given columns for model monitor.
version: 0.3.4
version: 0.3.5
is_deterministic: true

code: ../../src/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: spark
name: model_performance_compute_metrics
display_name: Model Performance - Compute Metrics
description: Compute model performance metrics leveraged by the model performance monitor.
version: 0.0.3
version: 0.0.4
is_deterministic: true

code: ../../src
Expand All @@ -20,19 +20,15 @@ inputs:
tabular-regression
]
description: "Task type"
baseline_data:
type: mltable
mode: direct
baseline_data_target_column:
type: string
optional: true
description: "Column name which contains ground truths in provided input for baseline_data. (Optional if we have one column name.)"
production_data:
type: mltable
mode: direct
description: "The mltable generated by data joiner to join ground truth and prediction data in a table"
production_data_target_column:
type: string
optional: true
description: "Column name which contains predictions in provided input for production_data_target_column. (Optional if we have one column name.)"
regression_rmse_threshold:
type: number
Expand Down Expand Up @@ -79,10 +75,9 @@ conf:
name: momo-base-spark
args: >-
--task ${{inputs.task}}
--baseline_data ${{inputs.baseline_data}}
$[[--baseline_data_target_column ${{inputs.baseline_data_target_column}}]]
--baseline_data_target_column ${{inputs.baseline_data_target_column}}
--production_data ${{inputs.production_data}}
$[[--production_data_target_column ${{inputs.production_data_target_column}}]]
--production_data_target_column ${{inputs.production_data_target_column}}
$[[--regression_rmse_threshold ${{inputs.regression_rmse_threshold}}]]
$[[--regression_meanabserror_threshold ${{inputs.regression_meanabserror_threshold}}]]
$[[--classification_precision_threshold ${{inputs.classification_precision_threshold}}]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: pipeline
name: model_performance_signal_monitor
display_name: Model Performance - Signal Monitor
description: Computes the model performance
version: 0.0.3
version: 0.0.4
is_deterministic: true
inputs:
task:
Expand All @@ -15,8 +15,6 @@ inputs:
tabular-regression
]
description: "Task type"
baseline_data:
type: mltable
baseline_data_target_column:
type: string
production_data:
Expand Down Expand Up @@ -55,10 +53,9 @@ outputs:
jobs:
compute_metrics:
type: spark
component: azureml://registries/azureml/components/model_performance_compute_metrics/versions/0.0.3
component: azureml://registries/azureml/components/model_performance_compute_metrics/versions/0.0.4
inputs:
task: ${{parent.inputs.task}}
baseline_data: ${{parent.inputs.baseline_data}}
baseline_data_target_column: ${{parent.inputs.baseline_data_target_column}}
production_data: ${{parent.inputs.production_data}}
production_data_target_column: ${{parent.inputs.production_data_target_column}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

import argparse
import pyspark.sql as pyspark_sql
from shared_utilities.event_utils import post_warning_event
from shared_utilities.io_utils import (
try_read_mltable_in_spark_with_error,
save_spark_df_as_mltable,
)
from shared_utilities.momo_exceptions import InvalidInputError


def _validate_join_column_in_input_data(
Expand All @@ -18,7 +18,7 @@ def _validate_join_column_in_input_data(
input_data_name: str
):
if join_column not in input_data_df.columns:
raise Exception(f"The join column '{join_column}' is not present in {input_data_name}.")
raise InvalidInputError(f"The join column '{join_column}' is not present in {input_data_name}.")


def join_data(
Expand Down Expand Up @@ -53,6 +53,11 @@ def join_data(
'inner'
)

# Throw exception if the result is empty
if joined_data_df.count() == 0:
raise InvalidInputError("""The data joiner resulted in an empty data asset.
Please check the input data to see if this is expected.""")

return joined_data_df


Expand All @@ -78,11 +83,6 @@ def run():
args.right_join_column
)

# Raise warning if the result is empty
if joined_data_df.count() == 0:
warning_message = 'The data joiner resulted in an empty data asset. Please check the input data to see if this is expected.' # noqa
post_warning_event(warning_message)

# Write the joined data.
save_spark_df_as_mltable(joined_data_df, args.joined_data)
print('Successfully executed data joiner component.')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""This file contains the data reader for model performance compute metrics."""

from shared_utilities.io_utils import read_mltable_in_spark
from shared_utilities.io_utils import try_read_mltable_in_spark, NoDataApproach


class DataReaderFactory:
Expand Down Expand Up @@ -54,39 +54,22 @@ def __init__(self, task):
"""
self.task = task

def _read_mltable_to_pd_dataframe(self, file_name, columns):
"""
Read mltable to pandas dataframe.
Args:
file_name: str
columns: list
Returns: pd.DataFrame
"""
df = read_mltable_in_spark(file_name)
if columns is not None:
df = df.select(columns) # We might need to accept multiple columns in code-gen
return df.toPandas()

def read_data(self, ground_truths, ground_truths_column_name, predictions, predictions_column_name):
def read_data(self, ground_truths_column_name, file_name, predictions_column_name):
"""
Read data for the task.
Args:
ground_truths: str
ground_truths_column_name: str
predictions: str
file_name: str
predictions_column_name: str
Returns: MetricsDTO
"""
ground_truth = self._read_mltable_to_pd_dataframe(ground_truths,
[ground_truths_column_name]
if ground_truths_column_name is not None else None)
predictions = self._read_mltable_to_pd_dataframe(predictions,
[predictions_column_name]
if predictions_column_name is not None else None)
df = try_read_mltable_in_spark(file_name, "production_data", NoDataApproach.ERROR)

ground_truth = df.select(ground_truths_column_name).toPandas()

predictions = df.select(predictions_column_name).toPandas()

return MetricsDTO(ground_truth, predictions)
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ def run():
# Parse argument
parser = argparse.ArgumentParser()
parser.add_argument("--task", type=str, dest="task", choices=constants.ALL_TASKS)
parser.add_argument("--baseline_data", type=str, dest="ground_truths", required=True)
parser.add_argument("--baseline_data_target_column", type=str,
dest="ground_truths_column_name", required=False, default=None)
dest="ground_truths_column_name", default=None)
parser.add_argument("--production_data", type=str, dest="predictions", required=True)
parser.add_argument("--production_data_target_column", type=str,
dest="predictions_column_name", required=False, default=None)
dest="predictions_column_name", default=None)
parser.add_argument("--regression_rmse_threshold", type=str,
dest="regression_rmse_threshold", required=False, default=None)
parser.add_argument("--regression_meanabserror_threshold", type=str,
Expand All @@ -35,14 +34,12 @@ def run():
parser.add_argument("--signal_metrics", type=str)
args = parser.parse_args()

metrics_data = DataReaderFactory().get_reader(args.task).read_data(args.ground_truths,
args.ground_truths_column_name,
metrics_data = DataReaderFactory().get_reader(args.task).read_data(args.ground_truths_column_name,
args.predictions,
args.predictions_column_name)
metrics = EvaluatorFactory().get_evaluator(task_type=args.task, metrics_config={}).evaluate(metrics_data)
construct_signal_metrics(metrics,
args.signal_metrics,
args.predictions_column_name,
args.regression_rmse_threshold,
args.regression_meanabserror_threshold,
args.classification_precision_threshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def convert_pandas_to_spark(pandas_data):
def construct_signal_metrics(
metrics_artifacts,
output_data_file_name,
predictions_column_name,
regression_rmse_threshold=None,
regression_meanabserror_threshold=None,
classification_precision_threshold=None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: mltable

paths:
- pattern: ./*.csv
transformations:
- read_delimited:
delimiter: ','
encoding: ascii
header: all_files_same_headers
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
classification,correlationid,regression,classification-targetvalue,regression-targetvalue
setosa,1,4.9,setosa,4.9
rose,2,1.66,setosa,1.66
setosa,3,4.66,setosa,9.8
setosa,4,6.9,setosa,6.9
rose,5,1.66,rose,1.66
setosa,6,3.44,setosa,3.44
setosa,7,3.44,setosa,3.21
rose,8,3.44,rose,3.44
setosa,9,3.55,setosa,3.55
rose,10,3.44,rose,3.44
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from azure.ai.ml import Input, MLClient, Output
from azure.ai.ml.entities import Spark, AmlTokenConfiguration
from azure.ai.ml.exceptions import JobException
from azure.ai.ml.dsl import pipeline
from tests.e2e.utils.constants import (
COMPONENT_NAME_DATA_JOINER,
Expand Down Expand Up @@ -57,7 +58,12 @@ def _data_joiner_e2e():
)

# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)
try:
ml_client.jobs.stream(pipeline_job.name)
except JobException:
# ignore JobException to return job final status
pass

return ml_client.jobs.get(pipeline_job.name)


Expand All @@ -81,7 +87,7 @@ def test_data_joiner_successful(

assert pipeline_job.status == 'Completed'

def test_data_joiner_empty_result_successful(
def test_data_joiner_empty_result_failed(
self, ml_client: MLClient, get_component, test_suite_name
):
"""Test data joiner that produces empty result."""
Expand All @@ -95,4 +101,4 @@ def test_data_joiner_empty_result_successful(
DATA_ASSET_MODEL_OUTPUTS_JOIN_COLUMN_NAME
)

assert pipeline_job.status == 'Completed'
assert pipeline_job.status == 'Failed'
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from azure.ai.ml.dsl import pipeline
from tests.e2e.utils.constants import (
COMPONENT_NAME_MODEL_PERFORMANCE_SIGNAL_MONITOR,
DATA_ASSET_IRIS_BASELINE_DATA,
DATA_ASSET_MODEL_PERFORMANCE_PRODUCTION_DATA,
)


Expand All @@ -17,7 +17,6 @@ def _submit_model_performance_signal_monitor_job(
get_component,
experiment_name,
task,
baseline_data,
baseline_data_target_column,
production_data,
production_data_target_column,
Expand All @@ -34,7 +33,6 @@ def _submit_model_performance_signal_monitor_job(
def _model_performance_signal_monitor_e2e():
mp_signal_monitor_output = mp_signal_monitor(
task=task,
baseline_data=baseline_data,
baseline_data_target_column=baseline_data_target_column,
production_data=production_data,
production_data_target_column=production_data_target_column,
Expand Down Expand Up @@ -75,10 +73,9 @@ def test_monitoring_regression_successful(
get_component,
test_suite_name,
"tabular-regression",
DATA_ASSET_IRIS_BASELINE_DATA,
"sepal_length",
DATA_ASSET_IRIS_BASELINE_DATA,
"sepal_length",
"regression-targetvalue",
DATA_ASSET_MODEL_PERFORMANCE_PRODUCTION_DATA,
"regression",
0.1,
0.1
)
Expand All @@ -94,15 +91,14 @@ def test_monitoring_classification_successful(
get_component,
test_suite_name,
"tabular-classification",
DATA_ASSET_IRIS_BASELINE_DATA,
"target",
DATA_ASSET_IRIS_BASELINE_DATA,
"target",
"classification-targetvalue",
DATA_ASSET_MODEL_PERFORMANCE_PRODUCTION_DATA,
"classification",
None,
None,
0.9,
0.9,
0.9,
0.1,
0.1,
0.1,
)

assert pipeline_job.status == "Completed"
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,5 @@

# For Data Quality with timestamp and boolean type in the MLTable
DATA_ASSET_VALID_DATATYPE = 'azureml:mltable_validate_datatype_for_data_quality:1'

DATA_ASSET_MODEL_PERFORMANCE_PRODUCTION_DATA = 'azureml:mltable_model_performance_production_data:1'
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from pyspark.sql.types import StringType, StructField, StructType
from src.model_monitor_data_joiner.run import join_data
from shared_utilities.momo_exceptions import InvalidInputError
from tests.e2e.utils.io_utils import create_pyspark_dataframe

LEFT_JOIN_COLUMN = 'left_join_column'
Expand Down Expand Up @@ -106,14 +107,13 @@ def test_join_data_empty_input_successful(
else:
right_data_df = _generate_right_data_df(True)

joined_data_df = join_data(
left_data_df,
LEFT_JOIN_COLUMN,
right_data_df,
RIGHT_JOIN_COLUMN
)

assert joined_data_df.count() == 0
with pytest.raises(InvalidInputError):
join_data(
left_data_df,
LEFT_JOIN_COLUMN,
right_data_df,
RIGHT_JOIN_COLUMN
)

@pytest.mark.parametrize("is_left_data_empty, is_right_data_empty", test_data)
def test_join_data_empty_input_without_schema_raises_exception(
Expand All @@ -132,7 +132,7 @@ def test_join_data_empty_input_without_schema_raises_exception(
else:
right_data_df = _generate_right_data_df(True)

with pytest.raises(Exception):
with pytest.raises(InvalidInputError):
join_data(
left_data_df,
LEFT_JOIN_COLUMN,
Expand Down

0 comments on commit 8426639

Please sign in to comment.