Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error handling for read_mltable_in_spark() #3088

Merged
merged 6 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import yaml
from azureml.dataprep.api.errorhandlers import ExecutionError
from azureml.dataprep.api.mltable._mltable_helper import UserErrorException
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType
from .constants import MAX_RETRY_COUNT
Expand Down Expand Up @@ -124,19 +125,6 @@ def process_input_not_found(input_not_found_category: InputNotFoundCategory):
return df if df and not df.isEmpty() else process_input_not_found(InputNotFoundCategory.NO_INPUT_IN_WINDOW)


def _verify_mltable_paths(mltable_path: str, ws=None, mltable_dict: dict = None):
"""Verify paths in mltable is supported."""
mltable_dict = mltable_dict or yaml.safe_load(StoreUrl(mltable_path, ws).read_file_content("MLTable"))
for path in mltable_dict.get("paths", []):
path_val = path.get("file") or path.get("folder") or path.get("pattern")
try:
path_url = StoreUrl(path_val, ws) # path_url itself must be valid
if not path_url.is_local_path(): # and it must be either local(absolute or relative) path
_ = path_url.get_credential() # or credential azureml path
except InvalidInputError as iie:
raise InvalidInputError(f"Invalid or unsupported path {path_val} in MLTable {mltable_path}") from iie


def _write_mltable_yaml(mltable_obj, folder_path: str):
try:
store_url = StoreUrl(folder_path)
Expand All @@ -153,17 +141,45 @@ def _write_mltable_yaml(mltable_obj, folder_path: str):

def read_mltable_in_spark(mltable_path: str):
"""Read mltable in spark."""
_verify_mltable_paths(mltable_path)
if mltable_path is None:
raise InvalidInputError("MLTable path is None.")
spark = init_spark()
try:
return spark.read.mltable(mltable_path)
except UserErrorException as ue:
ue_str = str(ue)
if 'Not able to find MLTable file' in ue_str:
raise InvalidInputError(f"Failed to read MLTable {mltable_path}, it is not found or not accessible.")
elif 'MLTable yaml is invalid' in ue_str:
raise InvalidInputError(f"Invalid MLTable yaml content in {mltable_path}, please make sure all paths "
"defined in MLTable is in correct format and supported scheme.")
else:
raise ue
except ExecutionError as ee:
if 'AuthenticationError("RuntimeError: Non-matching ' in str(ee):
ee_str = str(ee)
if 'AuthenticationError("RuntimeError: Non-matching ' in ee_str:
raise InvalidInputError(f"Failed to read MLTable {mltable_path}, "
"please make sure only data defined in the same AML workspace is used in MLTable.")
elif 'StreamError(NotFound)' in ee_str and 'The requested stream was not found' in ee_str:
raise InvalidInputError(f"One or more paths defined in MLTable {mltable_path} is not found.")
else:
raise ee
except ValueError as ve:
if 'AuthenticationError("RuntimeError: Non-matching ' in str(ve):
ve_str = str(ve)
if 'AuthenticationError("RuntimeError: Non-matching ' in ve_str:
raise InvalidInputError(f"Failed to read MLTable {mltable_path}, it is not from the same AML workspace.")
elif 'StreamError(PermissionDenied(' in ve_str:
# TODO add link to doc
raise InvalidInputError(f"No permission to read MLTable {mltable_path}, please make it as credential data."
" Or you can run Model Monitor job with managed identity and grant proper data "
"access permission to the user managed identity attached to this AML workspace.")
else:
raise ve
except SystemError as se:
if 'Name or service not known' in str(se):
raise InvalidInputError(f"Failed to read MLTable {mltable_path}, the storage account is not found.")
else:
raise se


def save_spark_df_as_mltable(metrics_df, folder_path: str):
Expand Down
82 changes: 0 additions & 82 deletions assets/model_monitoring/components/tests/unit/test_io_utils.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,15 @@ def test_uri_folder_to_spark_df_with_complex_type(self, mdc_preprocessor_test_se
pdf.show()

@pytest.mark.parametrize(
"window_start_time, window_end_time, root_folder_exists",
"window_start_time, window_end_time",
[
("2023-11-03T15:00:00", "2023-11-03T16:00:00", True), # no window folder
("2023-11-06T15:00:00", "2023-11-06T16:00:00", True), # has window folder, no file
("2023-11-06T17:00:00", "2023-11-06T18:00:00", True), # has window folder and file, but empty file
("2023-11-08T14:00:00", "2023-11-08T16:00:00", False), # root folder not exists
("2023-11-03T15:00:00", "2023-11-03T16:00:00"), # no window folder
("2023-11-06T15:00:00", "2023-11-06T16:00:00"), # has window folder, no file
("2023-11-06T17:00:00", "2023-11-06T18:00:00"), # has window folder and file, but empty file
]
)
def test_uri_folder_to_spark_df_no_data(self, mdc_preprocessor_test_setup,
window_start_time, window_end_time, root_folder_exists):
window_start_time, window_end_time):
"""Test uri_folder_to_spark_df()."""
def my_add_tags(tags: dict):
print("my_add_tags:", tags)
Expand All @@ -149,7 +148,7 @@ def my_add_tags(tags: dict):
tests_path = os.path.abspath(f"{os.path.dirname(__file__)}/../../tests")
preprocessed_output = f"{tests_path}/unit/preprocessed_mdc_data"
shutil.rmtree(f"{preprocessed_output}temp", True)
root_folder = f"{tests_path}/unit/raw_mdc_data/" if root_folder_exists else f"{tests_path}/unit/raw_mdc_data1/"
root_folder = f"{tests_path}/unit/raw_mdc_data/"

with pytest.raises(DataNotFoundError):
df = _raw_mdc_uri_folder_to_preprocessed_spark_df(
Expand Down
Loading