diff --git a/assets/model_monitoring/components/model_monitor/model_monitor_data_joiner/spec.yaml b/assets/model_monitoring/components/model_monitor/model_monitor_data_joiner/spec.yaml index 5f3854caa9..48cc7bf245 100644 --- a/assets/model_monitoring/components/model_monitor/model_monitor_data_joiner/spec.yaml +++ b/assets/model_monitoring/components/model_monitor/model_monitor_data_joiner/spec.yaml @@ -4,7 +4,7 @@ type: spark name: model_monitor_data_joiner display_name: Model Monitor - Data Joiner description: Joins two data assets on the given columns for model monitor. -version: 0.3.4 +version: 0.3.5 is_deterministic: true code: ../../src/ diff --git a/assets/model_monitoring/components/model_performance/model_performance_compute_metrics/spec.yaml b/assets/model_monitoring/components/model_performance/model_performance_compute_metrics/spec.yaml index f6c84b4669..475347918f 100644 --- a/assets/model_monitoring/components/model_performance/model_performance_compute_metrics/spec.yaml +++ b/assets/model_monitoring/components/model_performance/model_performance_compute_metrics/spec.yaml @@ -4,7 +4,7 @@ type: spark name: model_performance_compute_metrics display_name: Model Performance - Compute Metrics description: Compute model performance metrics leveraged by the model performance monitor. -version: 0.0.3 +version: 0.0.4 is_deterministic: true code: ../../src @@ -20,19 +20,15 @@ inputs: tabular-regression ] description: "Task type" - baseline_data: - type: mltable - mode: direct baseline_data_target_column: type: string - optional: true description: "Column name which contains ground truths in provided input for baseline_data. (Optional if we have one column name.)" production_data: type: mltable mode: direct + description: "The mltable generated by data joiner to join ground truth and prediction data in a table" production_data_target_column: type: string - optional: true description: "Column name which contains predictions in provided input for production_data_target_column. (Optional if we have one column name.)" regression_rmse_threshold: type: number @@ -79,10 +75,9 @@ conf: name: momo-base-spark args: >- --task ${{inputs.task}} - --baseline_data ${{inputs.baseline_data}} - $[[--baseline_data_target_column ${{inputs.baseline_data_target_column}}]] + --baseline_data_target_column ${{inputs.baseline_data_target_column}} --production_data ${{inputs.production_data}} - $[[--production_data_target_column ${{inputs.production_data_target_column}}]] + --production_data_target_column ${{inputs.production_data_target_column}} $[[--regression_rmse_threshold ${{inputs.regression_rmse_threshold}}]] $[[--regression_meanabserror_threshold ${{inputs.regression_meanabserror_threshold}}]] $[[--classification_precision_threshold ${{inputs.classification_precision_threshold}}]] diff --git a/assets/model_monitoring/components/model_performance/model_performance_signal_monitor/spec.yaml b/assets/model_monitoring/components/model_performance/model_performance_signal_monitor/spec.yaml index 24d8aef297..0beea631ee 100644 --- a/assets/model_monitoring/components/model_performance/model_performance_signal_monitor/spec.yaml +++ b/assets/model_monitoring/components/model_performance/model_performance_signal_monitor/spec.yaml @@ -4,7 +4,7 @@ type: pipeline name: model_performance_signal_monitor display_name: Model Performance - Signal Monitor description: Computes the model performance -version: 0.0.3 +version: 0.0.4 is_deterministic: true inputs: task: @@ -15,8 +15,6 @@ inputs: tabular-regression ] description: "Task type" - baseline_data: - type: mltable baseline_data_target_column: type: string production_data: @@ -55,10 +53,9 @@ outputs: jobs: compute_metrics: type: spark - component: azureml://registries/azureml/components/model_performance_compute_metrics/versions/0.0.3 + component: azureml://registries/azureml/components/model_performance_compute_metrics/versions/0.0.4 inputs: task: ${{parent.inputs.task}} - baseline_data: ${{parent.inputs.baseline_data}} baseline_data_target_column: ${{parent.inputs.baseline_data_target_column}} production_data: ${{parent.inputs.production_data}} production_data_target_column: ${{parent.inputs.production_data_target_column}} diff --git a/assets/model_monitoring/components/src/model_monitor_data_joiner/run.py b/assets/model_monitoring/components/src/model_monitor_data_joiner/run.py index af85246ec1..0e2a3f162d 100644 --- a/assets/model_monitoring/components/src/model_monitor_data_joiner/run.py +++ b/assets/model_monitoring/components/src/model_monitor_data_joiner/run.py @@ -5,11 +5,11 @@ import argparse import pyspark.sql as pyspark_sql -from shared_utilities.event_utils import post_warning_event from shared_utilities.io_utils import ( try_read_mltable_in_spark_with_error, save_spark_df_as_mltable, ) +from shared_utilities.momo_exceptions import InvalidInputError def _validate_join_column_in_input_data( @@ -18,7 +18,7 @@ def _validate_join_column_in_input_data( input_data_name: str ): if join_column not in input_data_df.columns: - raise Exception(f"The join column '{join_column}' is not present in {input_data_name}.") + raise InvalidInputError(f"The join column '{join_column}' is not present in {input_data_name}.") def join_data( @@ -53,6 +53,11 @@ def join_data( 'inner' ) + # Throw exception if the result is empty + if joined_data_df.count() == 0: + raise InvalidInputError("""The data joiner resulted in an empty data asset. + Please check the input data to see if this is expected.""") + return joined_data_df @@ -78,11 +83,6 @@ def run(): args.right_join_column ) - # Raise warning if the result is empty - if joined_data_df.count() == 0: - warning_message = 'The data joiner resulted in an empty data asset. Please check the input data to see if this is expected.' # noqa - post_warning_event(warning_message) - # Write the joined data. save_spark_df_as_mltable(joined_data_df, args.joined_data) print('Successfully executed data joiner component.') diff --git a/assets/model_monitoring/components/src/model_performance_metrics/data_reader.py b/assets/model_monitoring/components/src/model_performance_metrics/data_reader.py index 2d46ed6e7e..dc74f1dad4 100644 --- a/assets/model_monitoring/components/src/model_performance_metrics/data_reader.py +++ b/assets/model_monitoring/components/src/model_performance_metrics/data_reader.py @@ -3,7 +3,7 @@ """This file contains the data reader for model performance compute metrics.""" -from shared_utilities.io_utils import read_mltable_in_spark +from shared_utilities.io_utils import try_read_mltable_in_spark, NoDataApproach class DataReaderFactory: @@ -54,39 +54,22 @@ def __init__(self, task): """ self.task = task - def _read_mltable_to_pd_dataframe(self, file_name, columns): - """ - Read mltable to pandas dataframe. - - Args: - file_name: str - columns: list - - Returns: pd.DataFrame - - """ - df = read_mltable_in_spark(file_name) - if columns is not None: - df = df.select(columns) # We might need to accept multiple columns in code-gen - return df.toPandas() - - def read_data(self, ground_truths, ground_truths_column_name, predictions, predictions_column_name): + def read_data(self, ground_truths_column_name, file_name, predictions_column_name): """ Read data for the task. Args: - ground_truths: str ground_truths_column_name: str - predictions: str + file_name: str predictions_column_name: str Returns: MetricsDTO """ - ground_truth = self._read_mltable_to_pd_dataframe(ground_truths, - [ground_truths_column_name] - if ground_truths_column_name is not None else None) - predictions = self._read_mltable_to_pd_dataframe(predictions, - [predictions_column_name] - if predictions_column_name is not None else None) + df = try_read_mltable_in_spark(file_name, "production_data", NoDataApproach.ERROR) + + ground_truth = df.select(ground_truths_column_name).toPandas() + + predictions = df.select(predictions_column_name).toPandas() + return MetricsDTO(ground_truth, predictions) diff --git a/assets/model_monitoring/components/src/model_performance_metrics/run.py b/assets/model_monitoring/components/src/model_performance_metrics/run.py index fd9693dfd8..0f009986d7 100644 --- a/assets/model_monitoring/components/src/model_performance_metrics/run.py +++ b/assets/model_monitoring/components/src/model_performance_metrics/run.py @@ -16,12 +16,11 @@ def run(): # Parse argument parser = argparse.ArgumentParser() parser.add_argument("--task", type=str, dest="task", choices=constants.ALL_TASKS) - parser.add_argument("--baseline_data", type=str, dest="ground_truths", required=True) parser.add_argument("--baseline_data_target_column", type=str, - dest="ground_truths_column_name", required=False, default=None) + dest="ground_truths_column_name", default=None) parser.add_argument("--production_data", type=str, dest="predictions", required=True) parser.add_argument("--production_data_target_column", type=str, - dest="predictions_column_name", required=False, default=None) + dest="predictions_column_name", default=None) parser.add_argument("--regression_rmse_threshold", type=str, dest="regression_rmse_threshold", required=False, default=None) parser.add_argument("--regression_meanabserror_threshold", type=str, @@ -35,14 +34,12 @@ def run(): parser.add_argument("--signal_metrics", type=str) args = parser.parse_args() - metrics_data = DataReaderFactory().get_reader(args.task).read_data(args.ground_truths, - args.ground_truths_column_name, + metrics_data = DataReaderFactory().get_reader(args.task).read_data(args.ground_truths_column_name, args.predictions, args.predictions_column_name) metrics = EvaluatorFactory().get_evaluator(task_type=args.task, metrics_config={}).evaluate(metrics_data) construct_signal_metrics(metrics, args.signal_metrics, - args.predictions_column_name, args.regression_rmse_threshold, args.regression_meanabserror_threshold, args.classification_precision_threshold, diff --git a/assets/model_monitoring/components/src/model_performance_metrics/utils.py b/assets/model_monitoring/components/src/model_performance_metrics/utils.py index 82beba70ff..312c5e853a 100644 --- a/assets/model_monitoring/components/src/model_performance_metrics/utils.py +++ b/assets/model_monitoring/components/src/model_performance_metrics/utils.py @@ -50,7 +50,6 @@ def convert_pandas_to_spark(pandas_data): def construct_signal_metrics( metrics_artifacts, output_data_file_name, - predictions_column_name, regression_rmse_threshold=None, regression_meanabserror_threshold=None, classification_precision_threshold=None, diff --git a/assets/model_monitoring/components/tests/e2e/resources/mltable_model_performance_production_data/MLTable b/assets/model_monitoring/components/tests/e2e/resources/mltable_model_performance_production_data/MLTable new file mode 100644 index 0000000000..6c2bd8c423 --- /dev/null +++ b/assets/model_monitoring/components/tests/e2e/resources/mltable_model_performance_production_data/MLTable @@ -0,0 +1,9 @@ +type: mltable + +paths: + - pattern: ./*.csv +transformations: + - read_delimited: + delimiter: ',' + encoding: ascii + header: all_files_same_headers \ No newline at end of file diff --git a/assets/model_monitoring/components/tests/e2e/resources/mltable_model_performance_production_data/production_data.csv b/assets/model_monitoring/components/tests/e2e/resources/mltable_model_performance_production_data/production_data.csv new file mode 100644 index 0000000000..66041ebee3 --- /dev/null +++ b/assets/model_monitoring/components/tests/e2e/resources/mltable_model_performance_production_data/production_data.csv @@ -0,0 +1,11 @@ +classification,correlationid,regression,classification-targetvalue,regression-targetvalue +setosa,1,4.9,setosa,4.9 +rose,2,1.66,setosa,1.66 +setosa,3,4.66,setosa,9.8 +setosa,4,6.9,setosa,6.9 +rose,5,1.66,rose,1.66 +setosa,6,3.44,setosa,3.44 +setosa,7,3.44,setosa,3.21 +rose,8,3.44,rose,3.44 +setosa,9,3.55,setosa,3.55 +rose,10,3.44,rose,3.44 diff --git a/assets/model_monitoring/components/tests/e2e/test_data_joiner_e2e.py b/assets/model_monitoring/components/tests/e2e/test_data_joiner_e2e.py index d981039116..8265cf3c9f 100644 --- a/assets/model_monitoring/components/tests/e2e/test_data_joiner_e2e.py +++ b/assets/model_monitoring/components/tests/e2e/test_data_joiner_e2e.py @@ -6,6 +6,7 @@ import pytest from azure.ai.ml import Input, MLClient, Output from azure.ai.ml.entities import Spark, AmlTokenConfiguration +from azure.ai.ml.exceptions import JobException from azure.ai.ml.dsl import pipeline from tests.e2e.utils.constants import ( COMPONENT_NAME_DATA_JOINER, @@ -57,7 +58,12 @@ def _data_joiner_e2e(): ) # Wait until the job completes - ml_client.jobs.stream(pipeline_job.name) + try: + ml_client.jobs.stream(pipeline_job.name) + except JobException: + # ignore JobException to return job final status + pass + return ml_client.jobs.get(pipeline_job.name) @@ -81,7 +87,7 @@ def test_data_joiner_successful( assert pipeline_job.status == 'Completed' - def test_data_joiner_empty_result_successful( + def test_data_joiner_empty_result_failed( self, ml_client: MLClient, get_component, test_suite_name ): """Test data joiner that produces empty result.""" @@ -95,4 +101,4 @@ def test_data_joiner_empty_result_successful( DATA_ASSET_MODEL_OUTPUTS_JOIN_COLUMN_NAME ) - assert pipeline_job.status == 'Completed' + assert pipeline_job.status == 'Failed' diff --git a/assets/model_monitoring/components/tests/e2e/test_model_performance_e2e.py b/assets/model_monitoring/components/tests/e2e/test_model_performance_e2e.py index 21ff1b4350..2d63d7df64 100644 --- a/assets/model_monitoring/components/tests/e2e/test_model_performance_e2e.py +++ b/assets/model_monitoring/components/tests/e2e/test_model_performance_e2e.py @@ -8,7 +8,7 @@ from azure.ai.ml.dsl import pipeline from tests.e2e.utils.constants import ( COMPONENT_NAME_MODEL_PERFORMANCE_SIGNAL_MONITOR, - DATA_ASSET_IRIS_BASELINE_DATA, + DATA_ASSET_MODEL_PERFORMANCE_PRODUCTION_DATA, ) @@ -17,7 +17,6 @@ def _submit_model_performance_signal_monitor_job( get_component, experiment_name, task, - baseline_data, baseline_data_target_column, production_data, production_data_target_column, @@ -34,7 +33,6 @@ def _submit_model_performance_signal_monitor_job( def _model_performance_signal_monitor_e2e(): mp_signal_monitor_output = mp_signal_monitor( task=task, - baseline_data=baseline_data, baseline_data_target_column=baseline_data_target_column, production_data=production_data, production_data_target_column=production_data_target_column, @@ -75,10 +73,9 @@ def test_monitoring_regression_successful( get_component, test_suite_name, "tabular-regression", - DATA_ASSET_IRIS_BASELINE_DATA, - "sepal_length", - DATA_ASSET_IRIS_BASELINE_DATA, - "sepal_length", + "regression-targetvalue", + DATA_ASSET_MODEL_PERFORMANCE_PRODUCTION_DATA, + "regression", 0.1, 0.1 ) @@ -94,15 +91,14 @@ def test_monitoring_classification_successful( get_component, test_suite_name, "tabular-classification", - DATA_ASSET_IRIS_BASELINE_DATA, - "target", - DATA_ASSET_IRIS_BASELINE_DATA, - "target", + "classification-targetvalue", + DATA_ASSET_MODEL_PERFORMANCE_PRODUCTION_DATA, + "classification", None, None, - 0.9, - 0.9, - 0.9, + 0.1, + 0.1, + 0.1, ) assert pipeline_job.status == "Completed" diff --git a/assets/model_monitoring/components/tests/e2e/utils/constants.py b/assets/model_monitoring/components/tests/e2e/utils/constants.py index f611199957..867f942ce3 100644 --- a/assets/model_monitoring/components/tests/e2e/utils/constants.py +++ b/assets/model_monitoring/components/tests/e2e/utils/constants.py @@ -95,3 +95,5 @@ # For Data Quality with timestamp and boolean type in the MLTable DATA_ASSET_VALID_DATATYPE = 'azureml:mltable_validate_datatype_for_data_quality:1' + +DATA_ASSET_MODEL_PERFORMANCE_PRODUCTION_DATA = 'azureml:mltable_model_performance_production_data:1' diff --git a/assets/model_monitoring/components/tests/unit/test_model_monitor_data_joiner.py b/assets/model_monitoring/components/tests/unit/test_model_monitor_data_joiner.py index a57e638aac..037e32b231 100644 --- a/assets/model_monitoring/components/tests/unit/test_model_monitor_data_joiner.py +++ b/assets/model_monitoring/components/tests/unit/test_model_monitor_data_joiner.py @@ -7,6 +7,7 @@ from pyspark.sql.types import StringType, StructField, StructType from src.model_monitor_data_joiner.run import join_data +from shared_utilities.momo_exceptions import InvalidInputError from tests.e2e.utils.io_utils import create_pyspark_dataframe LEFT_JOIN_COLUMN = 'left_join_column' @@ -106,14 +107,13 @@ def test_join_data_empty_input_successful( else: right_data_df = _generate_right_data_df(True) - joined_data_df = join_data( - left_data_df, - LEFT_JOIN_COLUMN, - right_data_df, - RIGHT_JOIN_COLUMN - ) - - assert joined_data_df.count() == 0 + with pytest.raises(InvalidInputError): + join_data( + left_data_df, + LEFT_JOIN_COLUMN, + right_data_df, + RIGHT_JOIN_COLUMN + ) @pytest.mark.parametrize("is_left_data_empty, is_right_data_empty", test_data) def test_join_data_empty_input_without_schema_raises_exception( @@ -132,7 +132,7 @@ def test_join_data_empty_input_without_schema_raises_exception( else: right_data_df = _generate_right_data_df(True) - with pytest.raises(Exception): + with pytest.raises(InvalidInputError): join_data( left_data_df, LEFT_JOIN_COLUMN,