Skip to content

Commit

Permalink
Create correct histogram buckets when data column has a single distin…
Browse files Browse the repository at this point in the history
…ct value (#2100)

* working on new testcase for histogram failure

* dataset col numerical

* check for buckets with single values. update spec

* style and documentation edits

* syntax

* syntax fix

* fix column to not be inferred as bool

* fix column to not be inferred as bool

* consider edge-case min_value is 0

* handle negative min values for delta

* add histogram UT

* fix linter issues

* linter

* test expected values in single bucket

* fix ut

* revert spec version

* revert compute histogram spec version

* fix UT

* fix UT
  • Loading branch information
alanpo1 authored Jan 12, 2024
1 parent 15daeb8 commit a1f9dfd
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: pipeline
name: data_drift_signal_monitor
display_name: Data Drift - Signal Monitor
description: Computes the data drift between a baseline and production data assets.
version: 0.3.22
version: 0.3.23
is_deterministic: true

inputs:
Expand Down Expand Up @@ -116,7 +116,7 @@ jobs:
type: aml_token
compute_histogram_buckets:
type: spark
component: azureml://registries/azureml/components/model_monitor_compute_histogram_buckets/versions/0.3.3
component: azureml://registries/azureml/components/model_monitor_compute_histogram_buckets/versions/0.3.4
inputs:
input_data_1:
type: mltable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type: spark
name: model_monitor_compute_histogram_buckets
display_name: Model Monitor - Compute Histogram Buckets
description: Compute histogram buckets given up to two datasets.
version: 0.3.3
version: 0.3.4
is_deterministic: true

code: ../../src
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ def _get_smaller_df(baseline_df, production_df, baseline_count, production_count
return baseline_df if baseline_count < production_count else production_df


def _get_bin_width(baseline_df, production_df, baseline_count, production_count):
"""Calculate bin width using struges alogorithm."""
def _get_optimal_number_of_bins(baseline_df, production_df, baseline_count, production_count):
"""Calculate number of bins for histogram using struges alogorithm."""
# TODO: Unnecessary calculation, use count from summary and remove _get_smaller_df()
smaller_df = _get_smaller_df(
baseline_df, production_df, baseline_count, production_count
Expand All @@ -25,7 +25,7 @@ def get_dual_histogram_bin_edges(
baseline_df, production_df, baseline_count, production_count, numerical_columns
):
"""Get histogram edges using fixed bin width."""
num_bins = _get_bin_width(
num_bins = _get_optimal_number_of_bins(
baseline_df, production_df, baseline_count, production_count
)
all_bin_edges = {}
Expand All @@ -43,6 +43,13 @@ def get_dual_histogram_bin_edges(

bin_width = (max_value - min_value) / num_bins

# If histogram has only one value then we only need a single bucket and
# should skip the for-loop below.
if min_value == max_value:
delta = 0.005 if min_value == 0 else abs(min_value * 0.005)
all_bin_edges[col] = [min_value - delta, min_value + delta]
continue

edges = []
for bin in range(num_bins):
bin = bin + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ sepal_length,sepal_width,petal_length,petal_width,target
4,3,1,5,virginica
5,3,1,6,versicolor
4,3,1,5,virginica
5,3,1,6,versicolor
5,3,1.0,6,versicolor
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
type: mltable

paths:
- pattern: ./*.csv
transformations:
- read_delimited:
delimiter: ','
encoding: ascii
header: all_files_same_headers

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
sepal_length,sepal_width,petal_length,petal_width,target,pickup_month
5,3,1,4,setosa,1
5,3,1,1,setosa,1
2,3,1,2,setosa,1
5,3,1,3,setosa,1
4,3,1,4,setosa,1
4,3,1,5,virginica,1
5,3,1,6,versicolor,1
4,3,1,5,virginica,1
5,3,1,6,versicolor,1
4,3,1,5,virginica,1
5,3,1,6,versicolor,1
4,3,1,5,virginica,1
5,3,1.0,6,versicolor,1.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: mltable

paths:
- pattern: ./*.csv
transformations:
- read_delimited:
delimiter: ','
encoding: ascii
header: all_files_same_headers
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
sepal_length,sepal_width,petal_length,petal_width,pickup_month
5,3,1.6,0.2,1
5,3.4,1.6,0.4,1
5,3.5,1.5,0.2,1
5,3.4,1.4,0.2,1
4,3.2,1.6,0.2,1
4,3.1,1.6,0.2,1
5,3.4,1.5,0.4,1
5,4.1,1.5,0.1,1
5,4.2,1.4,0.2,1
4,3.1,1.5,0.1,1.0
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
DATA_ASSET_EMPTY,
DATA_ASSET_IRIS_BASELINE_INT_DATA_TYPE,
DATA_ASSET_IRIS_PREPROCESSED_MODEL_INPUTS_NO_DRIFT_INT_DATA,
DATA_ASSET_IRIS_PREPROCESSED_MODEL_INPUTS_NO_COMMON_COLUMNS
DATA_ASSET_IRIS_PREPROCESSED_MODEL_INPUTS_NO_COMMON_COLUMNS,
DATA_ASSET_IRIS_BASELINE_INT_SINGLE_VALUE_HISTOGRAM,
DATA_ASSET_IRIS_PREPROCESSED_MODEL_INPUTS_INT_SINGLE_VALUE_HISTOGRAM
)


Expand Down Expand Up @@ -139,3 +141,18 @@ def test_monitoring_run_empty_production_and_baseline_data(

# empty production and target data should fail the job
assert pipeline_job.status == "Failed"

def test_monitoring_run_int_single_distinct_value_histogram(
self, ml_client: MLClient, get_component, download_job_output,
test_suite_name
):
"""Test the scenario where the production data has a column with only one distinct value."""
pipeline_job = _submit_data_drift_model_monitor_job(
ml_client,
get_component,
test_suite_name,
DATA_ASSET_IRIS_BASELINE_INT_SINGLE_VALUE_HISTOGRAM,
DATA_ASSET_IRIS_PREPROCESSED_MODEL_INPUTS_INT_SINGLE_VALUE_HISTOGRAM,
)

assert pipeline_job.status == "Completed"
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@
"azureml:mltable_iris_preprocessed_model_inputs_no_common_columns:1"
)

# used for checking against histogram regressions where a numerical data-column has a single distinct value
DATA_ASSET_IRIS_PREPROCESSED_MODEL_INPUTS_INT_SINGLE_VALUE_HISTOGRAM = (
"azureml:mltable_iris_preprocessed_model_inputs_int_single_value_histogram:1"
)
DATA_ASSET_IRIS_BASELINE_INT_SINGLE_VALUE_HISTOGRAM = (
"azureml:mltable_iris_baseline_int_single_value_histogram:1"
)

# MDC-generated target dataset of an iris model which contains both the input features as well as the inferred results.
# The data contains no drift. Output logs have been generated for 2023/01/01/00 and 2023/02/01/00.
DATA_ASSET_IRIS_MODEL_INPUTS_OUTPUTS_WITH_NO_DRIFT = (
Expand Down
207 changes: 207 additions & 0 deletions assets/model_monitoring/components/tests/unit/test_histogram_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""This file contains unit tests for the df utilities."""

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as pyspark_f
from src.shared_utilities.df_utils import get_numerical_cols_with_df
from src.shared_utilities.histogram_utils import (
get_dual_histogram_bin_edges
)
import math
import pandas as pd
import pytest


@pytest.mark.unit
class TestDFUtils:
"""Test class for histogram utilities."""

def _num_bins_by_struges_algorithm(self, df: DataFrame) -> int:
"""For testing suite, calculate number of bins for a dataset using struges algorithm."""
num_bins = math.log2(df.count()) + 1
return math.ceil(num_bins)

def test_get_dual_histogram_bin_edges(self):
"""Test with mixed columns expect succeed."""
column_dtype_map = {
'col1': 'int',
'col2': 'float',
'col3': 'double',
'col4': 'decimal',
'col5': 'string'
}
baseline_df = pd.DataFrame({
'col1': [1, 2, 3, 4, 5],
'col2': [1.1, 2.2, 3.3, 4.4, 5.5],
'col3': [1.11, 2.22, 3.33, 4.44, 5.55],
'col4': [1.111, 2.222, 3.333, 4.444, 5.555]
})
production_df = pd.DataFrame({
'col1': [1, 2, 3, 4, 5, 6, 7],
'col2': [1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7],
'col3': [1.11, 2.22, 3.33, 4.44, 5.55, 6.66, 7.77],
'col4': [1.111, 2.222, 3.333, 4.444, 5.555, 6.666, 7.777]
})
baseline_df = self.init_spark().createDataFrame(baseline_df)
production_df = self.init_spark().createDataFrame(production_df)
numerical_columns = get_numerical_cols_with_df(column_dtype_map, baseline_df)

all_edges = get_dual_histogram_bin_edges(
baseline_df, production_df, baseline_df.count(), production_df.count(), numerical_columns
)

assert all_edges is not None
for col in numerical_columns:
assert all_edges.get(col, None) is not None
assert len(all_edges[col]) == self._num_bins_by_struges_algorithm(baseline_df) + 1

calculate_distinct_values_df = pd.DataFrame({col: all_edges[col]})
distinct_df = self.init_spark().createDataFrame(calculate_distinct_values_df)
assert distinct_df.distinct().count() == len(all_edges[col])

def test_get_dual_histogram_bin_edges_single_distinct_value_bucket(self):
"""Test scenario where we have a single bucket."""
column_dtype_map = {
'col1': 'int',
'col2': 'float',
'col3': 'double',
'col4': 'decimal',
'col5': 'string'
}
baseline_df = pd.DataFrame({
'col1': [1, 1, 1, 1, 1],
'col2': [1.1, 2.2, 3.3, 4.4, 5.5],
'col3': [1.11, 2.22, 3.33, 4.44, 5.55],
'col4': [1.111, 2.222, 3.333, 4.444, 5.555]
})
production_df = pd.DataFrame({
'col1': [1, 1, 1, 1, 1],
'col2': [1.1, 2.2, 3.3, 4.4, 5.5],
'col3': [1.11, 2.22, 3.33, 4.44, 5.55],
'col4': [1.111, 2.222, 3.333, 4.444, 5.555]
})
baseline_df = self.init_spark().createDataFrame(baseline_df)
production_df = self.init_spark().createDataFrame(production_df)
numerical_columns = get_numerical_cols_with_df(column_dtype_map,
baseline_df)

all_edges = get_dual_histogram_bin_edges(
baseline_df, production_df, baseline_df.count(), production_df.count(), numerical_columns
)

assert all_edges is not None
for col in numerical_columns:
assert all_edges.get(col, None) is not None

if col == 'col1':
assert len(all_edges[col]) == 2
min_value = min(baseline_df.agg(pyspark_f.min(col)).collect()[0])
expected_delta = min_value * 0.005
assert all_edges[col][0] == (min_value - expected_delta)
assert all_edges[col][1] == (min_value + expected_delta)
else:
assert len(all_edges[col]) == self._num_bins_by_struges_algorithm(baseline_df) + 1

calculate_distinct_values_df = pd.DataFrame({col: all_edges[col]})
distinct_df = self.init_spark().createDataFrame(calculate_distinct_values_df)
assert distinct_df.distinct().count() == len(all_edges[col])

def test_get_dual_histogram_bin_edges_single_distinct_value_bucket_negative(self):
"""Test scenario where we have a single bucket with a negative value."""
column_dtype_map = {
'col1': 'int',
'col2': 'float',
'col3': 'double',
'col4': 'decimal',
'col5': 'string'
}
baseline_df = pd.DataFrame({
'col1': [-31, -31, -31, -31, -31],
'col2': [1.1, 2.2, 3.3, 4.4, 5.5],
'col3': [1.11, 2.22, 3.33, 4.44, 5.55],
'col4': [1.111, 2.222, 3.333, 4.444, 5.555]
})
production_df = pd.DataFrame({
'col1': [-31, -31, -31, -31, -31],
'col2': [1.1, 2.2, 3.3, 4.4, 5.5],
'col3': [1.11, 2.22, 3.33, 4.44, 5.55],
'col4': [1.111, 2.222, 3.333, 4.444, 5.555]
})
baseline_df = self.init_spark().createDataFrame(baseline_df)
production_df = self.init_spark().createDataFrame(production_df)
numerical_columns = get_numerical_cols_with_df(column_dtype_map,
baseline_df)

all_edges = get_dual_histogram_bin_edges(
baseline_df, production_df, baseline_df.count(), production_df.count(), numerical_columns
)

assert all_edges is not None
for col in numerical_columns:
assert all_edges.get(col, None) is not None

if col == 'col1':
assert len(all_edges[col]) == 2
min_value = min(baseline_df.agg(pyspark_f.min(col)).collect()[0])
expected_delta = abs(min_value * 0.005)
assert all_edges[col][0] == (min_value - expected_delta)
assert all_edges[col][1] == (min_value + expected_delta)
else:
assert len(all_edges[col]) == self._num_bins_by_struges_algorithm(baseline_df) + 1

calculate_distinct_values_df = pd.DataFrame({col: all_edges[col]})
distinct_df = self.init_spark().createDataFrame(calculate_distinct_values_df)
assert distinct_df.distinct().count() == len(all_edges[col])

def test_get_dual_histogram_bin_edges_single_distinct_value_bucket_zero(self):
"""Test scenario where we have a single bucket with the value as zero."""
column_dtype_map = {
'col1': 'int',
'col2': 'float',
'col3': 'double',
'col4': 'decimal',
'col5': 'string'
}
baseline_df = pd.DataFrame({
'col1': [0, 0, 0, 0, 0],
'col2': [1.1, 2.2, 3.3, 4.4, 5.5],
'col3': [1.11, 2.22, 3.33, 4.44, 5.55],
'col4': [1.111, 2.222, 3.333, 4.444, 5.555]
})
production_df = pd.DataFrame({
'col1': [0, 0, 0, 0, 0],
'col2': [1.1, 2.2, 3.3, 4.4, 5.5],
'col3': [1.11, 2.22, 3.33, 4.44, 5.55],
'col4': [1.111, 2.222, 3.333, 4.444, 5.555]
})
baseline_df = self.init_spark().createDataFrame(baseline_df)
production_df = self.init_spark().createDataFrame(production_df)
numerical_columns = get_numerical_cols_with_df(column_dtype_map,
baseline_df)

all_edges = get_dual_histogram_bin_edges(
baseline_df, production_df, baseline_df.count(), production_df.count(), numerical_columns
)

assert all_edges is not None
for col in numerical_columns:
assert all_edges.get(col, None) is not None

if col == 'col1':
assert len(all_edges[col]) == 2
min_value = min(baseline_df.agg(pyspark_f.min(col)).collect()[0])
expected_delta = 0.005
assert all_edges[col][0] == (min_value - expected_delta)
assert all_edges[col][1] == (min_value + expected_delta)
else:
assert len(all_edges[col]) == self._num_bins_by_struges_algorithm(baseline_df) + 1

calculate_distinct_values_df = pd.DataFrame({col: all_edges[col]})
distinct_df = self.init_spark().createDataFrame(calculate_distinct_values_df)
assert distinct_df.distinct().count() == len(all_edges[col])

def init_spark(self):
"""Get or create spark session."""
spark = SparkSession.builder.appName("test").getOrCreate()
return spark

0 comments on commit a1f9dfd

Please sign in to comment.