From bc031547ed601446a01f5851a62e06e2dcff59c7 Mon Sep 17 00:00:00 2001 From: Richard Li Date: Tue, 25 Jun 2024 23:35:16 -0700 Subject: [PATCH 1/4] handle MLTable not found error --- .../src/shared_utilities/io_utils.py | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/assets/model_monitoring/components/src/shared_utilities/io_utils.py b/assets/model_monitoring/components/src/shared_utilities/io_utils.py index 07a03c13cb..e946c26375 100644 --- a/assets/model_monitoring/components/src/shared_utilities/io_utils.py +++ b/assets/model_monitoring/components/src/shared_utilities/io_utils.py @@ -8,6 +8,7 @@ import time import yaml from azureml.dataprep.api.errorhandlers import ExecutionError +from azureml.dataprep.api.mltable._mltable_helper import UserErrorException from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import StructType from .constants import MAX_RETRY_COUNT @@ -124,19 +125,6 @@ def process_input_not_found(input_not_found_category: InputNotFoundCategory): return df if df and not df.isEmpty() else process_input_not_found(InputNotFoundCategory.NO_INPUT_IN_WINDOW) -def _verify_mltable_paths(mltable_path: str, ws=None, mltable_dict: dict = None): - """Verify paths in mltable is supported.""" - mltable_dict = mltable_dict or yaml.safe_load(StoreUrl(mltable_path, ws).read_file_content("MLTable")) - for path in mltable_dict.get("paths", []): - path_val = path.get("file") or path.get("folder") or path.get("pattern") - try: - path_url = StoreUrl(path_val, ws) # path_url itself must be valid - if not path_url.is_local_path(): # and it must be either local(absolute or relative) path - _ = path_url.get_credential() # or credential azureml path - except InvalidInputError as iie: - raise InvalidInputError(f"Invalid or unsupported path {path_val} in MLTable {mltable_path}") from iie - - def _write_mltable_yaml(mltable_obj, folder_path: str): try: store_url = StoreUrl(folder_path) @@ -153,17 +141,31 @@ def _write_mltable_yaml(mltable_obj, folder_path: str): def read_mltable_in_spark(mltable_path: str): """Read mltable in spark.""" - _verify_mltable_paths(mltable_path) + if mltable_path is None: + raise InvalidInputError("MLTable path is None.") spark = init_spark() try: return spark.read.mltable(mltable_path) + except UserErrorException as ue: + ue_str = str(ue) + if 'Not able to find MLTable file' in ue_str: + raise InvalidInputError(f"Failed to read MLTable {mltable_path}, it is not found or not accessible.") + elif 'MLTable yaml is invalid' in ue_str: + raise InvalidInputError(f"Invalid MLTable yaml content in {mltable_path}, please make sure all paths " + "defined in MLTable is in correct format and supported scheme.") except ExecutionError as ee: - if 'AuthenticationError("RuntimeError: Non-matching ' in str(ee): + ee_str = str(ee) + if 'AuthenticationError("RuntimeError: Non-matching ' in ee_str: raise InvalidInputError(f"Failed to read MLTable {mltable_path}, " "please make sure only data defined in the same AML workspace is used in MLTable.") + elif 'StreamError(NotFound)' in ee_str and 'The requested stream was not found' in ee_str: + raise InvalidInputError(f"One or more paths defined in MLTable {mltable_path} is not found.") except ValueError as ve: if 'AuthenticationError("RuntimeError: Non-matching ' in str(ve): raise InvalidInputError(f"Failed to read MLTable {mltable_path}, it is not from the same AML workspace.") + except SystemError as se: + if 'Name or service not known' in str(se): + raise InvalidInputError(f"Failed to read MLTable {mltable_path}, the storage account is not found.") def save_spark_df_as_mltable(metrics_df, folder_path: str): From 22d2f76b22669c713de5f0f2e4da9923b462563f Mon Sep 17 00:00:00 2001 From: Richard Li Date: Wed, 26 Jun 2024 00:22:43 -0700 Subject: [PATCH 2/4] handle permission denied error --- .../components/src/shared_utilities/io_utils.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/assets/model_monitoring/components/src/shared_utilities/io_utils.py b/assets/model_monitoring/components/src/shared_utilities/io_utils.py index e946c26375..24599a8f4d 100644 --- a/assets/model_monitoring/components/src/shared_utilities/io_utils.py +++ b/assets/model_monitoring/components/src/shared_utilities/io_utils.py @@ -153,6 +153,8 @@ def read_mltable_in_spark(mltable_path: str): elif 'MLTable yaml is invalid' in ue_str: raise InvalidInputError(f"Invalid MLTable yaml content in {mltable_path}, please make sure all paths " "defined in MLTable is in correct format and supported scheme.") + else: + raise ue except ExecutionError as ee: ee_str = str(ee) if 'AuthenticationError("RuntimeError: Non-matching ' in ee_str: @@ -160,12 +162,24 @@ def read_mltable_in_spark(mltable_path: str): "please make sure only data defined in the same AML workspace is used in MLTable.") elif 'StreamError(NotFound)' in ee_str and 'The requested stream was not found' in ee_str: raise InvalidInputError(f"One or more paths defined in MLTable {mltable_path} is not found.") + else: + raise ee except ValueError as ve: - if 'AuthenticationError("RuntimeError: Non-matching ' in str(ve): + ve_str = str(ve) + if 'AuthenticationError("RuntimeError: Non-matching ' in ve_str: raise InvalidInputError(f"Failed to read MLTable {mltable_path}, it is not from the same AML workspace.") + elif 'StreamError(PermissionDenied(' in ve_str: + # TODO add link to doc + raise InvalidInputError(f"No permission to read MLTable {mltable_path}, please make it as credential data." + " Or you can run Model Monitor job with managed identity and grant proper data " + "access permission to the user managed identity attached to this AML workspace.") + else: + raise ve except SystemError as se: if 'Name or service not known' in str(se): raise InvalidInputError(f"Failed to read MLTable {mltable_path}, the storage account is not found.") + else: + raise se def save_spark_df_as_mltable(metrics_df, folder_path: str): From 819b5019ddd85c468e908db950098a11639050dc Mon Sep 17 00:00:00 2001 From: Richard Li Date: Wed, 26 Jun 2024 11:22:33 -0700 Subject: [PATCH 3/4] remove verify_mltable UT --- .../components/tests/unit/test_io_utils.py | 82 ------------------- 1 file changed, 82 deletions(-) delete mode 100644 assets/model_monitoring/components/tests/unit/test_io_utils.py diff --git a/assets/model_monitoring/components/tests/unit/test_io_utils.py b/assets/model_monitoring/components/tests/unit/test_io_utils.py deleted file mode 100644 index 4e4f342a62..0000000000 --- a/assets/model_monitoring/components/tests/unit/test_io_utils.py +++ /dev/null @@ -1,82 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -"""Test file for io_utils.""" - -import pytest -from shared_utilities.io_utils import _verify_mltable_paths -from shared_utilities.momo_exceptions import InvalidInputError -from unittest.mock import Mock, patch -from azureml.core import Datastore - - -@pytest.mark.unit -class TestIOUtils: - """Test class for io_utils.""" - - @pytest.mark.parametrize( - "mltable_path", - [ - ({"file": "azureml:my_data:1"}) - ] - ) - def test_verify_mltable_paths_error(self, mltable_path): - """Test _verify_mltable_paths, negative cases.""" - mltable_dict = { - "type": "mltable", - "paths": [mltable_path] - } - with pytest.raises(InvalidInputError, match=r"Invalid or unsupported path .*"): - _verify_mltable_paths("foo_path", mltable_dict=mltable_dict) - - def test_verify_mltable_paths_pass(self): - """Test _verify_mltable_paths, for azureml paths, positive cases.""" - mltable_dict = { - "type": "mltable", - "paths": [ - {"file": "azureml://datastores/my_datastore/paths/path/to/data.parquet"}, - {"folder": "azureml://subscriptions/sub_id/resourceGroups/my_rg/workspaces/my_ws/datastores/my_datastore/paths/path/to/folder"}, # noqa: E501 - {"pattern": "azureml://datastores/my_datastore/paths/path/to/folder/**/*.jsonl"}, - {"pattern": "./path/to/folder/*.csv"}, - {"file": "baseline_data.csv"}, - {"file": "https://my_account.blob.core.windows.net/my_container/path/to/data.parquet"}, - {"folder": "wasbs://my_container@my_account.blob.core.windows.net/path/to/folder"}, - {"pattern": "abfss://my_container@my_account.dfs.core.windows.net/path/to/folder/*/*.jsonl"}, - {"file": "http://my_account.blob.core.windows.net/my_container/path/to/data.parquet"}, - {"folder": "wasb://my_container@my_account.blob.core.windows.net/path/to/folder"}, - {"pattern": "abfs://my_container@my_account.dfs.core.windows.net/path/to/folder/*/*.jsonl"} - ] - } - mock_datastore = Mock(datastore_type="AzureBlob", protocol="https", endpoint="core.windows.net", - account_name="my_account", container_name="my_container") - mock_datastore.name = "my_datastore" - mock_datastore.credential_type = "Sas" - mock_datastore.sas_token = "my_sas_token" - mock_ws = Mock() - - with patch.object(Datastore, "get", return_value=mock_datastore): - _verify_mltable_paths("foo_path", mock_ws, mltable_dict) - - @pytest.mark.parametrize( - "mltable_path, datastore_type", - [ - ({"file": "azureml://datastores/my_datastore/paths/path/to/data.parquet"}, "AzureDataLakeGen2"), - ({"folder": "azureml://subscriptions/sub_id/resourceGroups/my_rg/workspaces/my_ws/datastores/my_datastore/paths/path/to/folder"}, "AzureBlob"), # noqa: E501 - ({"pattern": "azureml://datastores/my_datastore/paths/path/to/folder/**/*.jsonl"}, "AzureDataLakeGen2"), - ] - ) - def test_verify_mltable_paths_azureml_path_credentialless_datastore(self, mltable_path, datastore_type): - """Test _verify_mltable_paths, for azureml paths, negative cases.""" - mock_datastore = Mock(datastore_type=datastore_type, protocol="https", endpoint="core.windows.net", - account_name="my_account", container_name="my_container") - mock_datastore.name = "my_datastore" - mock_datastore.tenant_id = None - mock_datastore.credential_type = "None" - mock_ws = Mock() - mltable_dict = { - "type": "mltable", - "paths": [mltable_path] - } - - with patch.object(Datastore, "get", return_value=mock_datastore): - _verify_mltable_paths("foo_path", mock_ws, mltable_dict) From 8af73bad979730e31b7fd32e2249a5949bff72a5 Mon Sep 17 00:00:00 2001 From: Richard Li Date: Wed, 26 Jun 2024 16:51:58 -0700 Subject: [PATCH 4/4] remove one useless UT for old mdc preprocessor --- .../components/tests/unit/test_mdc_preprocessor.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/assets/model_monitoring/components/tests/unit/test_mdc_preprocessor.py b/assets/model_monitoring/components/tests/unit/test_mdc_preprocessor.py index 1e27833119..45ecb78aeb 100644 --- a/assets/model_monitoring/components/tests/unit/test_mdc_preprocessor.py +++ b/assets/model_monitoring/components/tests/unit/test_mdc_preprocessor.py @@ -129,16 +129,15 @@ def test_uri_folder_to_spark_df_with_complex_type(self, mdc_preprocessor_test_se pdf.show() @pytest.mark.parametrize( - "window_start_time, window_end_time, root_folder_exists", + "window_start_time, window_end_time", [ - ("2023-11-03T15:00:00", "2023-11-03T16:00:00", True), # no window folder - ("2023-11-06T15:00:00", "2023-11-06T16:00:00", True), # has window folder, no file - ("2023-11-06T17:00:00", "2023-11-06T18:00:00", True), # has window folder and file, but empty file - ("2023-11-08T14:00:00", "2023-11-08T16:00:00", False), # root folder not exists + ("2023-11-03T15:00:00", "2023-11-03T16:00:00"), # no window folder + ("2023-11-06T15:00:00", "2023-11-06T16:00:00"), # has window folder, no file + ("2023-11-06T17:00:00", "2023-11-06T18:00:00"), # has window folder and file, but empty file ] ) def test_uri_folder_to_spark_df_no_data(self, mdc_preprocessor_test_setup, - window_start_time, window_end_time, root_folder_exists): + window_start_time, window_end_time): """Test uri_folder_to_spark_df().""" def my_add_tags(tags: dict): print("my_add_tags:", tags) @@ -149,7 +148,7 @@ def my_add_tags(tags: dict): tests_path = os.path.abspath(f"{os.path.dirname(__file__)}/../../tests") preprocessed_output = f"{tests_path}/unit/preprocessed_mdc_data" shutil.rmtree(f"{preprocessed_output}temp", True) - root_folder = f"{tests_path}/unit/raw_mdc_data/" if root_folder_exists else f"{tests_path}/unit/raw_mdc_data1/" + root_folder = f"{tests_path}/unit/raw_mdc_data/" with pytest.raises(DataNotFoundError): df = _raw_mdc_uri_folder_to_preprocessed_spark_df(