Skip to content

Commit

Permalink
Throw user error when no common columns in compute data drift compone…
Browse files Browse the repository at this point in the history
…nt (#2173)

* create common function to throw error on get_common_columns()

* syntax fix

* fix unit test

* syntax

* unused import

* Update compute_data_drift.py

* fix ut

* uncomment ut
  • Loading branch information
alanpo1 authored Jan 25, 2024
1 parent 1b10a4f commit 2203f7e
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
type: aml_token
compute_drift_metrics:
type: spark
component: azureml://registries/azureml/components/data_drift_compute_metrics/versions/0.3.10
component: azureml://registries/azureml/components/data_drift_compute_metrics/versions/0.3.11
inputs:
production_dataset:
type: mltable
Expand All @@ -130,7 +130,7 @@ jobs:
type: aml_token
compute_histogram_buckets:
type: spark
component: azureml://registries/azureml/components/model_monitor_compute_histogram_buckets/versions/0.3.5
component: azureml://registries/azureml/components/model_monitor_compute_histogram_buckets/versions/0.3.6
inputs:
input_data_1:
type: mltable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
type: aml_token
compute_drift_metrics:
type: spark
component: azureml://registries/azureml/components/data_drift_compute_metrics/versions/0.3.10
component: azureml://registries/azureml/components/data_drift_compute_metrics/versions/0.3.11
inputs:
production_dataset:
type: mltable
Expand All @@ -104,7 +104,7 @@ jobs:
type: aml_token
compute_histogram_buckets:
type: spark
component: azureml://registries/azureml/components/model_monitor_compute_histogram_buckets/versions/0.3.5
component: azureml://registries/azureml/components/model_monitor_compute_histogram_buckets/versions/0.3.6
inputs:
input_data_1:
type: mltable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from categorical_data_drift_metrics import compute_categorical_data_drift_measures_tests
from io_utils import get_output_spark_df
from shared_utilities.df_utils import (
get_common_columns,
try_get_common_columns_with_error,
get_numerical_and_categorical_cols
)

Expand All @@ -26,7 +26,7 @@ def compute_data_drift_measures_tests(
override_categorical_features: str
):
"""Compute Data drift metrics and tests."""
common_columns_dict = get_common_columns(baseline_df, production_df)
common_columns_dict = try_get_common_columns_with_error(baseline_df, production_df)

numerical_columns_names, categorical_columns_names = get_numerical_and_categorical_cols(
baseline_df,
Expand All @@ -48,8 +48,8 @@ def compute_data_drift_measures_tests(

if len(numerical_columns_names) == 0 and \
len(categorical_columns_names) == 0:
raise ValueError("No common columns found between production data and baseline"
"data. We dont support this scenario.")
raise ValueError("No numerical or categorical columns detected in common between production data and baseline"
" data. We dont support this scenario.")

if len(numerical_columns_names) != 0:
numerical_df = compute_numerical_data_drift_measures_tests(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
import pyspark.sql as pyspark_sql
from shared_utilities.df_utils import get_numerical_cols_with_df_with_override
from shared_utilities.histogram_utils import get_dual_histogram_bin_edges
from shared_utilities.df_utils import get_common_columns
from shared_utilities.df_utils import try_get_common_columns_with_error
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
)
from shared_utilities.io_utils import init_spark
from shared_utilities.momo_exceptions import InvalidInputError


def compute_numerical_bins(
Expand All @@ -26,13 +25,7 @@ def compute_numerical_bins(
) -> tuple:
"""Compute numerical bins given two data frames."""
# Generate histograms only for columns in both baseline and target dataset
common_columns_dict = get_common_columns(df1, df2)
if not common_columns_dict:
raise InvalidInputError(
"Found no common columns between input datasets. Try double-checking" +
" if there are common columns between the input datasets." +
" Common columns must have the same names (case-sensitive) and similar data types."
)
common_columns_dict = try_get_common_columns_with_error(df1, df2)
numerical_columns = get_numerical_cols_with_df_with_override(df1,
override_numerical_features,
override_categorical_features,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@

"""This file contains additional utilities that are applicable to dataframe."""
import pyspark.sql as pyspark_sql
from enum import Enum
from shared_utilities.momo_exceptions import InvalidInputError
from shared_utilities.event_utils import post_warning_event


class NoCommonColumnsApproach(Enum):
"""Enum for no common columns approach."""

IGNORE = 0
WARNING = 1
ERROR = 2


data_type_long_group = ["long", "int", "bigint", "short", "tinyint", "smallint"]
data_type_numerical_group = ["float", "double", "decimal"]
Expand Down Expand Up @@ -170,3 +182,51 @@ def add_value_if_present(
if row_has_value(row, row_name):
dict[target_property_name] = row[row_name]
return dict


def try_get_common_columns_with_warning(
baseline_df: pyspark_sql.DataFrame, production_df: pyspark_sql.DataFrame
) -> dict:
"""Get common columns. Post warning to the job and return empty dict."""
return try_get_common_columns(baseline_df, production_df, NoCommonColumnsApproach.WARNING)


def try_get_common_columns_with_error(
baseline_df: pyspark_sql.DataFrame, production_df: pyspark_sql.DataFrame
) -> dict:
"""Get common columns. Raise error if dictionary is empty."""
return try_get_common_columns(baseline_df, production_df, NoCommonColumnsApproach.ERROR)


def try_get_common_columns(
baseline_df: pyspark_sql.DataFrame,
production_df: pyspark_sql.DataFrame,
no_common_columns_approach=NoCommonColumnsApproach.IGNORE
) -> dict:
"""
Compute the common columns between baseline and production dataframes.
If common columns are not found, conduct different error handling based on no_common_columns_approach.
"""
common_columns_dict = get_common_columns(baseline_df, production_df)
if not common_columns_dict:
error_message = (
"Found no common columns between input datasets. Try double-checking"
" if there are common columns between the input datasets."
" Common columns must have the same names (case-sensitive) and similar data types."
)
if no_common_columns_approach == NoCommonColumnsApproach.ERROR:
raise InvalidInputError(
error_message
)
elif no_common_columns_approach == NoCommonColumnsApproach.WARNING:
post_warning_event(
error_message
+ " Please visit aka.ms/mlmonitoringhelp for more information."
)
return {}
# no_common_columns_approach == NoCommonColumnsApproach.IGNORE:
else:
return {}
# returns found common columns.
return common_columns_dict
102 changes: 92 additions & 10 deletions assets/model_monitoring/components/tests/unit/test_df_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@
StructField,
StructType)
from pyspark.sql import SparkSession
from shared_utilities.df_utils import (
get_common_columns,
get_feature_type_override_map,
is_numerical,
is_categorical,
get_numerical_cols_with_df_with_override,
get_categorical_cols_with_df_with_override,
get_numerical_and_categorical_cols,
modify_categorical_columns
)
from src.shared_utilities.df_utils import (
get_common_columns,
try_get_common_columns_with_error,
try_get_common_columns,
get_feature_type_override_map,
is_numerical,
is_categorical,
get_numerical_cols_with_df_with_override,
get_categorical_cols_with_df_with_override,
get_numerical_and_categorical_cols,
modify_categorical_columns
)
from tests.e2e.utils.io_utils import create_pyspark_dataframe
from tests.unit.test_compute_data_quality_statistics import df_with_timestamp
import pandas as pd
Expand Down Expand Up @@ -366,3 +368,83 @@ def init_spark(self):
"""Get or create spark session."""
spark = SparkSession.builder.appName("test").getOrCreate()
return spark

def test_try_get_common_columns_error(self):
"""Test scenarios for common_columns with error."""
# Test with two empty dataframes
spark = SparkSession.builder.appName("test").getOrCreate()
emp_RDD = spark.sparkContext.emptyRDD()
# Create empty schema
columns = StructType([])

# Create an empty RDD with empty schema
baseline_df = create_pyspark_dataframe(emp_RDD, columns)
production_df = create_pyspark_dataframe(emp_RDD, columns)
with pytest.raises(Exception) as ex:
try_get_common_columns_with_error(baseline_df, production_df)
assert "Found no common columns between input datasets." in str(ex.value)

# Test with two dataframes that have common columns but datatype is not in the same type
float_data = [(3.55,), (6.88,), (7.99,)]
schema = StructType([
StructField("target", FloatType(), True)])
baseline_df = create_pyspark_dataframe(float_data, schema)
double_data = [(3.55,), (6.88,), (7.99,)]
schema = StructType([
StructField("target", DoubleType(), True)])
production_df = create_pyspark_dataframe(double_data, schema)
assert try_get_common_columns_with_error(baseline_df, production_df) == {'target': 'double'}

# Test with two dataframes that have no common columns
baseline_df = create_pyspark_dataframe([(1, "a"), (2, "b")],
["id", "name"])
production_df = create_pyspark_dataframe([(3, "c"), (4, "d")],
["age", "gender"])
with pytest.raises(Exception) as ex:
try_get_common_columns_with_error(baseline_df, production_df)
assert "Found no common columns between input datasets." in str(ex.value)

# Test with two dataframes that have different Types in common columns
baseline_df = create_pyspark_dataframe([(1.0, "a", 10), (2.0, "b", 20)],
["id", "name", "age"])
production_df = create_pyspark_dataframe([(1, "c", 30), (2, "d", 40)],
["id", "name", "age"])
assert try_get_common_columns_with_error(baseline_df, production_df) == {"name": "string", "age": "bigint"}

def test_try_get_common_columns_ignore(self):
"""Test scenarios for common columns with ignore."""
# Test with two empty dataframes
spark = SparkSession.builder.appName("test").getOrCreate()
emp_RDD = spark.sparkContext.emptyRDD()
# Create empty schema
columns = StructType([])

# Create an empty RDD with empty schema
baseline_df = create_pyspark_dataframe(emp_RDD, columns)
production_df = create_pyspark_dataframe(emp_RDD, columns)
assert try_get_common_columns(baseline_df, production_df) == {}

# Test with two dataframes that have common columns but datatype is not in the same type
float_data = [(3.55,), (6.88,), (7.99,)]
schema = StructType([
StructField("target", FloatType(), True)])
baseline_df = create_pyspark_dataframe(float_data, schema)
double_data = [(3.55,), (6.88,), (7.99,)]
schema = StructType([
StructField("target", DoubleType(), True)])
production_df = create_pyspark_dataframe(double_data, schema)
assert try_get_common_columns(baseline_df, production_df) == {'target': 'double'}

# Test with two dataframes that have no common columns
baseline_df = create_pyspark_dataframe([(1, "a"), (2, "b")],
["id", "name"])
production_df = create_pyspark_dataframe([(3, "c"), (4, "d")],
["age", "gender"])
assert try_get_common_columns(baseline_df, production_df) == {}

# Test with two dataframes that have different Types in common columns
baseline_df = create_pyspark_dataframe([(1.0, "a", 10), (2.0, "b", 20)],
["id", "name", "age"])
production_df = create_pyspark_dataframe([(1, "c", 30), (2, "d", 40)],
["id", "name", "age"])
assert try_get_common_columns(baseline_df, production_df) == {"name": "string", "age": "bigint"}

0 comments on commit 2203f7e

Please sign in to comment.