From 6d12c702faa3d5cac7f30f6710b59dc0f9ba7fc9 Mon Sep 17 00:00:00 2001 From: jimdale Date: Mon, 30 Sep 2024 15:01:48 +0200 Subject: [PATCH] integrated new views_tensor utils and implemented drift self-test --- pyproject.toml | 4 +- viewser/commands/queryset/config_drift.py | 79 ++-- viewser/commands/queryset/drift_detection.py | 90 ++++- viewser/commands/queryset/integrity_checks.py | 42 +- viewser/commands/queryset/operations.py | 14 +- viewser/commands/queryset/self_test.py | 375 ++++++++++++++++++ 6 files changed, 542 insertions(+), 62 deletions(-) create mode 100644 viewser/commands/queryset/self_test.py diff --git a/pyproject.toml b/pyproject.toml index 29592b1..e17d8c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "viewser" -version = "6.5.3" +version = "6.6.0" description = "The Views 3 CLI tool" authors = ["peder2911 "] readme = "README.md" @@ -28,7 +28,7 @@ strconv = "^0.4.2" pyarrow = ">9.0.0" views-storage = "^1.1.0" tqdm = "^4.66.0" -views_tensor_utilities = "<1.0.0" +views_tensor_utilities = ">=1.0.0" pyod = "<1.1.0" [tool.poetry.scripts] diff --git a/viewser/commands/queryset/config_drift.py b/viewser/commands/queryset/config_drift.py index 18afb93..a888faa 100644 --- a/viewser/commands/queryset/config_drift.py +++ b/viewser/commands/queryset/config_drift.py @@ -1,65 +1,76 @@ import numpy as np -from . import integrity_checks as ic default_dne = -np.inf default_missing = np.nan - default_config_dict = { 'global_missingness': {'threshold': 0.05, - 'test_function': ic.get_global_nan_fracs, - 'message': 'dataset missingness'}, + 'test_function': 'global_nan_fracs', + 'message': 'dataset missingness', + 'self_test': 0.10}, - 'global_zeros': {'threshold': 0.95, - 'test_function': ic.get_global_zero_fracs, - 'message': 'dataset zero'}, + 'global_zeros': {'threshold': 0.75, + 'test_function': 'global_zero_fracs', + 'message': 'dataset zero', + 'self_test': 0.999}, 'time_missingness': {'threshold': 0.01, - 'test_function': ic.get_time_nan_fracs, - 'message': 'time-unit missingness'}, + 'test_function': 'time_nan_fracs', + 'message': 'time-unit missingness', + 'self_test': 0.02}, 'space_missingness': {'threshold': 0.03, - 'test_function': ic.get_space_nan_fracs, - 'message': 'space-unit missingness'}, + 'test_function': 'space_nan_fracs', + 'message': 'space-unit missingness', + 'self_test': 0.06}, 'feature_missingness': {'threshold': 0.01, - 'test_function': ic.get_feature_nan_fracs, - 'message': 'feature missingness'}, + 'test_function': 'feature_nan_fracs', + 'message': 'feature missingness', + 'self_test': 0.02}, - 'time_zeros': {'threshold': 0.95, - 'test_function': ic.get_time_zero_fracs, - 'message': 'time-unit zero'}, + 'time_zeros': {'threshold': 0.75, + 'test_function': 'time_zero_fracs', + 'message': 'time-unit zero', + 'self_test': 0.9999}, 'space_zeros': {'threshold': 0.95, - 'test_function': ic.get_space_zero_fracs, - 'message': 'space-unit zero'}, + 'test_function': 'space_zero_fracs', + 'message': 'space-unit zero', + 'self_test': 0.99}, - 'feature_zeros': {'threshold': 0.95, - 'test_function': ic.get_feature_zero_fracs, - 'message': 'feature zero'}, + 'feature_zeros': {'threshold': 0.75, + 'test_function': 'feature_zero_fracs', + 'message': 'feature zero', + 'self_test': 0.9999}, - 'delta_completeness': {'threshold': 1.25, - 'test_function': ic.get_delta_completeness, - 'message': 'feature delta_completeness'}, + 'delta_completeness': {'threshold': 1.01, + 'test_function': 'delta_completeness', + 'message': 'feature delta_completeness', + 'self_test': 0.99}, - 'delta_zeroes': {'threshold': 1.25, - 'test_function': ic.get_delta_zeroes, - 'message': 'feature delta_zeroes'}, + 'delta_zeroes': {'threshold': 1.01, + 'test_function': 'delta_zeroes', + 'message': 'feature delta_zeroes', + 'self_test': 0.99}, 'extreme_values': {'threshold': 4.0, - 'test_function': ic.get_extreme_values, - 'message': 'feature extreme values'}, + 'test_function': 'extreme_values', + 'message': 'feature extreme values', + 'self_test': 8.0}, 'ks_drift': {'threshold': 100., - 'test_function': ic.get_ks_drift, - 'message': 'feature KS drift'}, + 'test_function': 'ks_drift', + 'message': 'feature KS drift', + 'self_test': None}, 'ecod_drift': {'threshold': 0.05, - 'test_function': ic.get_ecod_drift, - 'message': 'dataset ECOD drift'}, + 'test_function': 'ecod_drift', + 'message': 'dataset ECOD drift', + 'self_test': None}, 'standard_partition_length': 10, - 'test_partition_length': 1 + 'test_partition_length': 1, } diff --git a/viewser/commands/queryset/drift_detection.py b/viewser/commands/queryset/drift_detection.py index 8967dae..5358fce 100644 --- a/viewser/commands/queryset/drift_detection.py +++ b/viewser/commands/queryset/drift_detection.py @@ -1,7 +1,9 @@ import numpy as np -import scipy from views_tensor_utilities import objects, mappings from . import config_drift as config +from . import integrity_checks as ic +from . import self_test as st +#import viewser.commands.queryset.models.self_test_data as std import datetime @@ -60,7 +62,10 @@ def generate_alarms(self): """ - results, translation_dict = self.test_function( +# print(self.test_function,self.test_partition_length,self.standard_partition_length,self.data.shape, +# self.features) + + results, translation_dict = getattr(ic, self.test_function)( tensor=self.data, index=self.index, features=self.features, @@ -107,17 +112,90 @@ class InputGate: """ - def __init__(self, df, drift_config_dict=None): + def __init__(self, df, drift_config_dict=None, self_test=False, self_test_data=None): self.config_dict = drift_config_dict - self.tensor_container = objects.ViewsDataframe(df, cast_to_dtype=np.float64).to_numpy_time_space() - self.numeric_part = self.tensor_container.get_numeric_part() + self.default_config_dict = config.default_config_dict + + if self_test: + self.__self_test(self_test_data) + + self.tensor_container = (objects.ViewsDataframe(df, split_strategy='float_string', cast_strategy='to_64'). + to_numpy_time_space()) + self.numeric_part = self.tensor_container.get_numeric_views_tensors()[0] self.tensor = self.numeric_part.tensor self.index = self.tensor_container.index self.columns = self.numeric_part.columns - self.default_config_dict = config.default_config_dict self.testers = [] + def __self_test(self, self_test_data): + + """ + ___self_test + + Method driving the self-test machinery for the drift detection system + + A standard dataframe is fetched and custom perturbation functions, one per integrity-checking function, are + called upon to perturb the standard data in ways designed to trigger alerts from the drift-detector. + Perturbed data is passed to the Tester method as normal and alerts are collected. + By design, all integrity checks should be failed. If this is not the case, it implies a problem with one + or more of the integrity-checking routines, or with the input data, which must be investigated. + + """ + + self_test_container = (objects.ViewsDataframe(self_test_data, + split_strategy='float_string', + cast_strategy='to_64').to_numpy_time_space()) + + self_test_index = self_test_container.index + self_test_features = self_test_container.get_numeric_views_tensors()[0].columns + + self_test_data = self_test_container.get_numeric_numpy_tensors()[0] + + testers = [] + + for key in self.config_dict.keys(): + try: + + self_test_dict = self.default_config_dict[key] + + self_test_dict['index'] = self_test_index + + self_test_dict['test_partition_length'] = self.config_dict['test_partition_length'] + + self_test_dict['standard_partition_length'] = self.config_dict['standard_partition_length'] + + perturbed_self_test_data = getattr(st, 'perturb_'+self_test_dict['test_function'])(self_test_data, + **self_test_dict) + + testers.append(Tester(test_function=self_test_dict['test_function'], + test_partition_length=self.config_dict['test_partition_length'], + standard_partition_length=self.config_dict['standard_partition_length'], + threshold=self_test_dict['threshold'], + message=self_test_dict['message'], + data=perturbed_self_test_data, + index=self_test_index, + features=self_test_features, + )) + + except: + pass + + alerts = [tester.generate_alarms() for tester in testers] + + nfailures = 0 + for alert in alerts: + if 'alarm' in str(alert[0]): + nfailures += 1 + + print(f'{nfailures}/{len(testers)} tests failed') + + print() + print() + print('******END*******') + print() + print() + def assemble_alerts(self): """ assemble_alerts diff --git a/viewser/commands/queryset/integrity_checks.py b/viewser/commands/queryset/integrity_checks.py index ebd2d8f..9fc522a 100644 --- a/viewser/commands/queryset/integrity_checks.py +++ b/viewser/commands/queryset/integrity_checks.py @@ -1,6 +1,6 @@ import numpy as np from . import config_drift as config -from views_tensor_utilities import mappings, objects +from views_tensor_utilities import mappings import scipy from pyod.models.ecod import ECOD @@ -18,6 +18,7 @@ def get_valid_uoa_mask(tensor): def partitioner(tensor, index, test_partition_length, standard_partition_length): + """ partitioner @@ -43,7 +44,7 @@ def partitioner(tensor, index, test_partition_length, standard_partition_length) return standard_data, test_data -def get_global_nan_fracs(**kwargs): +def global_nan_fracs(**kwargs): tensor = kwargs['tensor'] @@ -54,7 +55,7 @@ def get_global_nan_fracs(**kwargs): return np.array([results, 0.0]), None -def get_global_zero_fracs(**kwargs): +def global_zero_fracs(**kwargs): tensor = kwargs['tensor'] @@ -65,9 +66,9 @@ def get_global_zero_fracs(**kwargs): return np.array([results, 0.0]), None -def get_time_nan_fracs(**kwargs): +def time_nan_fracs(**kwargs): """ - get_time_nan_fracs + time_nan_fracs Compute missing fractions for all time units @@ -88,9 +89,9 @@ def get_time_nan_fracs(**kwargs): return np.array(time_nan_fracs), times.index_to_time -def get_space_nan_fracs(**kwargs): +def space_nan_fracs(**kwargs): """ - get_space_nan_fracs + space_nan_fracs Compute missing fractions for all space units @@ -111,9 +112,9 @@ def get_space_nan_fracs(**kwargs): return np.array(space_nan_fracs), spaces.index_to_space -def get_feature_nan_fracs(**kwargs): +def feature_nan_fracs(**kwargs): """ - get_feature_nan_fracs + feature_nan_fracs Compute missing fractions for all features @@ -134,9 +135,9 @@ def get_feature_nan_fracs(**kwargs): return np.array(feature_nan_fracs), index_to_feature -def get_time_zero_fracs(**kwargs): +def time_zero_fracs(**kwargs): """ - get_time_nan_fracs + time_nan_fracs Compute missing fractions for all time units @@ -157,7 +158,7 @@ def get_time_zero_fracs(**kwargs): return np.array(time_zero_fracs), times.index_to_time -def get_space_zero_fracs(**kwargs): +def space_zero_fracs(**kwargs): """ get_space_nan_fracs @@ -180,7 +181,7 @@ def get_space_zero_fracs(**kwargs): return np.array(space_zero_fracs), spaces.index_to_space -def get_feature_zero_fracs(**kwargs): +def feature_zero_fracs(**kwargs): """ get_feature_nan_fracs @@ -204,7 +205,7 @@ def get_feature_zero_fracs(**kwargs): return np.array(feature_zero_fracs), index_to_feature -def get_delta_completeness(**kwargs): +def delta_completeness(**kwargs): """ get_delta_completeness @@ -240,7 +241,7 @@ def get_delta_completeness(**kwargs): return np.array(delta_completenesses), index_to_feature -def get_delta_zeroes(**kwargs): +def delta_zeroes(**kwargs): """ get_delta_zeroes @@ -276,7 +277,7 @@ def get_delta_zeroes(**kwargs): return np.array(delta_zeroes), index_to_feature -def get_extreme_values(**kwargs): +def extreme_values(**kwargs): tensor = kwargs['tensor'] index = kwargs['index'] @@ -291,11 +292,16 @@ def get_extreme_values(**kwargs): standard, test = partitioner(tensor, index, test_partition_length, standard_partition_length) standard_non_zero = np.where(np.logical_or(standard == config.default_dne, np.isnan(standard)), np.nan, standard) +# print(standard_partition_length,test_partition_length) +# print('tester',standard.shape,test.shape,tensor.shape) + for ifeature in range(tensor.shape[2]): standard_mean_non_zero = np.nanmean(standard_non_zero[:, :, ifeature]) standard_sigma_non_zero = np.nanstd(standard_non_zero[:, :, ifeature]) +# print(test[:, :, ifeature].shape) + test_max = np.max(test[:, :, ifeature]) extreme_values.append(abs(test_max - standard_mean_non_zero)/(standard_sigma_non_zero + 1e-20)) @@ -303,7 +309,7 @@ def get_extreme_values(**kwargs): return np.array(extreme_values), index_to_feature -def get_ks_drift(**kwargs): +def ks_drift(**kwargs): """ get_ks_drift @@ -348,7 +354,7 @@ def get_ks_drift(**kwargs): return np.array(ks_pvalues), index_to_feature -def get_ecod_drift(**kwargs): +def ecod_drift(**kwargs): """ get_ecod_drift diff --git a/viewser/commands/queryset/operations.py b/viewser/commands/queryset/operations.py index e25d1f0..98655a1 100644 --- a/viewser/commands/queryset/operations.py +++ b/viewser/commands/queryset/operations.py @@ -76,7 +76,7 @@ def fetch(self, queryset_name: str, start_date: str = None, end_date: str = None return f def fetch_with_drift_detection(self, queryset_name: str, start_date: str, end_date: str, drift_config_dict: - Optional[Dict]): + Optional[Dict] = None, self_test: Optional[bool] = False): """ fetch_with_drift_detection ===== @@ -92,9 +92,19 @@ def fetch_with_drift_detection(self, queryset_name: str, start_date: str, end_da """ + self_test_data = None + + if self_test: + try: + self_test_data = self.fetch("drift_detection_self_test", start_date, end_date) + except: + print(f'Attempt to fetch elf test qs failed. Self test queryset MUST be defined outside viewser') + raise RuntimeError + f = self.fetch(queryset_name, start_date, end_date) - input_gate = drift_detection.InputGate(f, drift_config_dict=drift_config_dict) + input_gate = drift_detection.InputGate(f, drift_config_dict=drift_config_dict, self_test=self_test, + self_test_data=self_test_data) alerts = input_gate.assemble_alerts() diff --git a/viewser/commands/queryset/self_test.py b/viewser/commands/queryset/self_test.py new file mode 100644 index 0000000..9f35045 --- /dev/null +++ b/viewser/commands/queryset/self_test.py @@ -0,0 +1,375 @@ +import numpy as np +from views_tensor_utilities import mappings + +#from .models import Queryset, Column + +#from viewser.commands.queryset.models import Queryset, Column + + +#def get_test_data(start_date, end_date): +# qs_self_test = (Queryset("drift_detection_self_test", "country_month") + +# .with_column(Column("ln_ged_ns", from_loa="country_month", +# from_column="ged_ns_best_sum_nokgi") +# .transform.ops.ln() +# .transform.missing.fill() +# ) + +# .with_column(Column("ln_ged_os", from_loa="country_month", +# from_column="ged_os_best_sum_nokgi") +# .transform.ops.ln() +# .transform.missing.fill() +# ) + +# .with_column(Column("ln_ged_sb", from_loa="country_month", +# from_column="ged_sb_best_sum_nokgi") +# .transform.ops.ln() +# .transform.missing.fill() +# ) + +# .with_column(Column("wdi_sp_pop_totl", from_loa="country_year", +# from_column="wdi_sp_pop_totl") +# .transform.missing.replace_na() +# .transform.ops.ln() +# ) +# ) + +# return qs_self_test.publish().fetch(start_date=start_date, end_date=end_date) + + +def perturbation_partitioner(tensor, index, test_partition_length, standard_partition_length): + """ + partitioner + + Partition the input data tensor according to partitions in drift_config - for the purposes of + perturbation, the beginning of the standard partition is the beginning of the dataset - otherwise + datasets will be partitioned twice. + + """ + + times = mappings.TimeUnits.from_pandas(index) + + tend = times.times[-1] + tboundary = tend - test_partition_length + tstart = times.times[0] + + tstart_index = times.time_to_index[tstart] + tboundary_index = times.time_to_index[tboundary] + tend_index = times.time_to_index[tend] + + standard_partition, test_partition = (tstart_index, tboundary_index), (tboundary_index, tend_index) + + standard_data = tensor[standard_partition[0]:standard_partition[1], :, :] + test_data = tensor[test_partition[0]:test_partition[1], :, :] + + return standard_data, test_data + + +def get_random_indices(array, fraction=0.01): + + """ + Draw a random selection of indices pointing to locations in the input array + + """ + + nsample = int(fraction*array.size) + ncoord = len(array.shape) + + coords = [] + for icoord in range(ncoord): + coord = [] + for sample in range(nsample): + co = np.random.choice(array.shape[icoord]) + coord.append(co) + coords.append(coord) + + return tuple(coords) + + +def perturb_global(data, what, **kwargs): + + """ + Perturb the whole input dataset so that it contains approximately (fraction) of (what) + """ + + fraction = kwargs['self_test'] + + data_copy = data.copy() + + indices = get_random_indices(data_copy, fraction=fraction) + + data_copy[indices] = what + + return data_copy + + +def perturb_global_zero_fracs(data, **kwargs): + + what = 0.0 + + perturbed_data = perturb_global(data, what, **kwargs) + + return perturbed_data + + +def perturb_global_nan_fracs(data, **kwargs): + + what = np.nan + + perturbed_data = perturb_global(data, what, **kwargs) + + return perturbed_data + + +def perturb_time(data, what, **kwargs): + + """ + Perturb a timestep so that it contains approximately (fraction) of (what) + + """ + + fraction = kwargs['self_test'] + + data_copy = data.copy() + + itime = int(data.shape[0] / 2) + + indices = get_random_indices(data_copy[itime, :, :], fraction=fraction) + + data_copy[itime, :, :][indices] = what + + return data_copy + + +def perturb_time_zero_fracs(data, **kwargs): + + what = 0.0 + + perturbed_data = perturb_time(data, what, **kwargs) + + return perturbed_data + + +def perturb_time_nan_fracs(data, **kwargs): + + what = np.nan + + perturbed_data = perturb_time(data, what, **kwargs) + + return perturbed_data + + +def perturb_space(data, what, **kwargs): + + """ + Perturb a spatial unit so that it contains approximately (fraction) of (what) + """ + + fraction = kwargs['self_test'] + + data_copy = data.copy() + + ispace = int(data.shape[1]/2) + + indices = get_random_indices(data_copy[:, ispace, :], fraction=fraction) + + data_copy[:, ispace, :][indices] = what + + return data_copy + + +def perturb_space_zero_fracs(data, **kwargs): + + what = 0.0 + + perturbed_data = perturb_space(data, what, **kwargs) + + return perturbed_data + + +def perturb_space_nan_fracs(data, **kwargs): + + what = np.nan + + perturbed_data = perturb_space(data, what, **kwargs) + + return perturbed_data + + +def perturb_feature(data, what, **kwargs): + + """ + Perturb a feature so that it contains approximately (feature) of (what) + """ + + fraction = kwargs['self_test'] + + data_copy = data.copy() + + ifeature = int(data.shape[2]/2) + + indices = get_random_indices(data_copy[:, :, ifeature], fraction=fraction) + + data_copy[:, :, ifeature][indices] = what + + return data_copy + + +def perturb_feature_zero_fracs(data, **kwargs): + + what = 0.0 + + perturbed_data = perturb_feature(data, what, **kwargs) + + return perturbed_data + + +def perturb_feature_nan_fracs(data, **kwargs): + + what = np.nan + + perturbed_data = perturb_feature(data, what, **kwargs) + + return perturbed_data + + +def perturb_delta(data, what, whatever, **kwargs): + + """ + Perturb standard and test partitions so that the test partition has a substantially higher fraction of + (what), filling in instances of (what) in the standard partition with (whatever) + + """ + + index = kwargs['index'] + test_partition_length = kwargs['test_partition_length'] + standard_partition_length = kwargs['standard_partition_length'] + fraction = kwargs['self_test'] + + data_copy = data.copy() + + standard, test = perturbation_partitioner(data_copy, index, test_partition_length, standard_partition_length) + + indices = get_random_indices(test, fraction=fraction) + + test[indices] = what + + indices = get_random_indices(standard, fraction=fraction) + + standard[indices] = whatever + + data_copy = np.concatenate((standard, test), axis=0) + + return data_copy + + +def perturb_delta_zeroes(data, **kwargs): + + what = 0.0 + + whatever = 1.0 + + perturbed_data = perturb_delta(data, what, whatever, **kwargs) + + return perturbed_data + + +def perturb_delta_completeness(data, **kwargs): + + what = np.nan + + whatever = 1.0 + + perturbed_data = perturb_delta(data, what, whatever, **kwargs) + + return perturbed_data + + +def perturb_extreme_values(data, **kwargs): + + """ + Perturb the most extreme value in the test partition + """ + + index = kwargs['index'] + test_partition_length = kwargs['test_partition_length'] + standard_partition_length = kwargs['standard_partition_length'] + fraction = kwargs['self_test'] + + data_copy = data.copy() + + standard, test = perturbation_partitioner(data_copy, index, test_partition_length, standard_partition_length) + + itime = int(test.shape[0]/2) + + ispace = int(test.shape[1]/2) + + ifeature = 0 + + standard = np.nan_to_num(standard, nan=0.0, posinf=0.0, neginf=0.0) + test = np.nan_to_num(test, nan=0.0, posinf=0.0, neginf=0.0) + + test_mean = np.nanmean(test[:, :, ifeature]) + + test_std = np.nanstd(test[:, :, ifeature]) + + test[itime, ispace, ifeature] = test_mean + (1 + fraction)*test_std + + data_copy = np.concatenate((standard, test), axis=0) + + return data_copy + + +def perturb_ks_drift(data, **kwargs): + + """ + Perturb the distribution of data in the test partition to a Normal, which is expected to be strongly + different from the standard partition + """ + + index = kwargs['index'] + test_partition_length = kwargs['test_partition_length'] + standard_partition_length = kwargs['standard_partition_length'] + + data_copy = data.copy() + + standard, test = perturbation_partitioner(data_copy, index, test_partition_length, standard_partition_length) + + ifeature = 0 + + test = np.nan_to_num(test, nan=0.0, posinf=0.0, neginf=0.0) + + test_mean = np.mean(test[:, :, ifeature]) + + test_std = np.std(test[:, :, ifeature]) + + test[:, :, ifeature] = np.random.normal(test_mean, test_std, (test.shape[0], test.shape[1])) + + data_copy = np.concatenate((standard, test), axis=0) + + return data_copy + + +def perturb_ecod_drift(data, **kwargs): + + """ + Perturb the test partition by switching two very unlike features + """ + + index = kwargs['index'] + test_partition_length = kwargs['test_partition_length'] + standard_partition_length = kwargs['standard_partition_length'] + + data_copy = data.copy() + + standard, test = perturbation_partitioner(data_copy, index, test_partition_length, standard_partition_length) + + f0copy = test[:, :, 0].copy() + + test[:, :, 0] = test[:, :, -1] + test[:, :, -1] = f0copy + + data_copy = np.concatenate((standard, test), axis=0) + + return data_copy +