From fafec96c0eb7a7177d96dc1ec32e1f17899e8a31 Mon Sep 17 00:00:00 2001 From: jimdale Date: Thu, 15 Feb 2024 13:57:30 +0100 Subject: [PATCH 1/8] first iteration of input drift detection --- viewser/commands/queryset/config_drift.py | 8 + viewser/commands/queryset/drift_alert.py | 5 + viewser/commands/queryset/drift_detection.py | 204 +++++++++++++++++++ viewser/commands/queryset/operations.py | 27 ++- viewser/tui/__init__.py | 0 viewser/tui/animations.py | 33 --- viewser/tui/ascii_art.py | 14 -- viewser/tui/formatting/__init__.py | 0 viewser/tui/formatting/abc.py | 112 ---------- viewser/tui/formatting/conventions.py | 5 - viewser/tui/formatting/errors.py | 26 --- viewser/tui/formatting/formatters.py | 12 -- viewser/tui/formatting/generic_models.py | 9 - viewser/tui/formatting/json_formatter.py | 7 - viewser/tui/formatting/sections.py | 10 - viewser/tui/formatting/styles.py | 4 - viewser/tui/models.py | 2 - viewser/tui/utils.py | 3 - 18 files changed, 234 insertions(+), 247 deletions(-) create mode 100644 viewser/commands/queryset/config_drift.py create mode 100644 viewser/commands/queryset/drift_alert.py create mode 100644 viewser/commands/queryset/drift_detection.py delete mode 100644 viewser/tui/__init__.py delete mode 100644 viewser/tui/animations.py delete mode 100644 viewser/tui/ascii_art.py delete mode 100644 viewser/tui/formatting/__init__.py delete mode 100644 viewser/tui/formatting/abc.py delete mode 100644 viewser/tui/formatting/conventions.py delete mode 100644 viewser/tui/formatting/errors.py delete mode 100644 viewser/tui/formatting/formatters.py delete mode 100644 viewser/tui/formatting/generic_models.py delete mode 100644 viewser/tui/formatting/json_formatter.py delete mode 100644 viewser/tui/formatting/sections.py delete mode 100644 viewser/tui/formatting/styles.py delete mode 100644 viewser/tui/models.py delete mode 100644 viewser/tui/utils.py diff --git a/viewser/commands/queryset/config_drift.py b/viewser/commands/queryset/config_drift.py new file mode 100644 index 0000000..591f3f7 --- /dev/null +++ b/viewser/commands/queryset/config_drift.py @@ -0,0 +1,8 @@ +threshold_global_nan_frac = 0.05 +threshold_feature_nan_frac = 0.15 +threshold_time_unit_nan_frac = 0.15 +threshold_space_unit_nan_frac = 0.15 +threshold_delta = 1.5 + +standard_partition_length = 10 +test_partition_length = 1 \ No newline at end of file diff --git a/viewser/commands/queryset/drift_alert.py b/viewser/commands/queryset/drift_alert.py new file mode 100644 index 0000000..2518929 --- /dev/null +++ b/viewser/commands/queryset/drift_alert.py @@ -0,0 +1,5 @@ +class DriftAlert: + + def __init__(self,offender,uoas): + self.offender = offender + self.uoas = uoas \ No newline at end of file diff --git a/viewser/commands/queryset/drift_detection.py b/viewser/commands/queryset/drift_detection.py new file mode 100644 index 0000000..6270ade --- /dev/null +++ b/viewser/commands/queryset/drift_detection.py @@ -0,0 +1,204 @@ +import numpy as np +import pandas as pd +from views_transform_library import utilities +import config_drift as config +from ForecastDrift import alarm +class InputGate: + + def __init__(self, df): + self.tensor = df_to_numpy(df) + self.index = df.index + self.columns = df.columns + self.times, self.time_to_index, self.index_to_time = utilities.map_times(self.index) + self.spaces, self.space_to_index, self.index_to_space = utilities.map_times(self.index) + self.uoa_mask = self.get_valid_uoa_mask() + + def get_valid_uoa_mask(self): + return ~np.where(self.tensor==-np.inf,True,False) + + def test_global_nan_frac(self): + if severity:=(np.count_nonzero(np.isnan(self.tensor[self.uoa_mask]))/np.count_nonzero(self.uoa_mask))/config.threshold_global_nan_frac>1: + return alarm.Alarm(f"Global missingness exceeded threshold {config.threshold_global_nan_frac}",int(severity)) + else: + return None + + def get_time_nan_fracs(self): + time_nan_fracs = [] + nans = np.where(np.isnan(self.tensor),1,0) + for itime in range(self.tensor.shape[0]): + time_nan_fracs.append( + np.count_nonzero(nans[itime,:,:])/np.count_nonzero(self.uoa_mask[itime,:,:]) + ) + + return np.array(time_nan_fracs) + + def test_time_nan_fracs(self): + + results = self.get_time_nan_fracs()/config.threshold_time_unit_nan_frac + offenders = np.where(results>1) + + if len(offenders) > 0: + alarms = [] + for offender,severity in zip(offenders,results): + al = alarm.Alarm( + f"Missingness for time unit {self.index_to_time[offender]} exceeded threshold {config.threshold_time_unit_nan_frac}", + int(severity)) + alarms.append(al) + + return alarms + else: + return None + + def get_space_nan_fracs(self): + space_nan_fracs = [] + nans = np.where(np.isnan(self.tensor), 1, 0) + for ispace in range(self.tensor.shape[1]): + space_nan_fracs.append( + np.count_nonzero(nans[:, ispace, :]) / np.count_nonzero(self.uoa_mask[:, ispace, :]) + ) + + return np.array(space_nan_fracs) + + def test_space_nan_fracs(self): + + results = self.get_space_nan_fracs() / config.threshold_space_unit_nan_frac + offenders = np.where(results > 1) + + if len(offenders) > 0: + alarms = [] + for offender, severity in zip(offenders, results): + al = alarm.Alarm( + f"Missingness for space unit {self.index_to_space[offender]} exceeded threshold {config.threshold_space_unit_nan_frac}", + int(severity)) + alarms.append(al) + + return alarms + else: + return None + + def get_feature_nan_fracs(self): + feature_nan_fracs = [] + nans = np.where(np.isnan(self.tensor), 1, 0) + for ifeature in range(self.tensor.shape[2]): + feature_nan_fracs.append( + np.count_nonzero(nans[:, :, ifeature]) / np.count_nonzero(self.uoa_mask[:, :, ifeature]) + ) + + return np.array(feature_nan_fracs) + + def test_feature_nan_fracs(self): + + results = self.get_feature_nan_fracs() / config.threshold_feature_nan_frac + offenders = np.where(results > 1) + + if len(offenders) > 0: + alarms = [] + for offender, severity in zip(offenders, results): + al = alarm.Alarm( + f"Missingness for feature {self.columns[offender]} exceeded threshold {config.threshold_feature_nan_frac}", + int(severity)) + alarms.append(al) + + return alarms + else: + return None + + def get_delta_completeness(self): + delta_completenesses = [] + standard_partition, test_partition = self.partitioner() + standard, test = self.partition(standard_partition, test_partition) + + for ifeature in range(self.tensor.shape[2]): + standard_nans = np.where(np.isnan(standard[:,:,ifeature]), 1, 0) + test_nans = np.where(np.isnan(test[:,:,ifeature]), 1, 0) + standard_uoa_mask = ~np.where(standard==-np.inf,True,False) + test_uoa_mask = ~np.where(test==-np.inf,True,False) + + standard_nan_frac = np.count_nonzero(standard_nans)/np.count_nonzero(standard_uoa_mask) + test_nan_frac = np.count_nonzero(test_nans)/np.count_nonzero(test_uoa_mask) + + delta_completenesses.append(np.abs(test_nan_frac-standard_nan_frac)/standard_nan_frac+1e-20) + + return np.array(delta_completenesses) + + def test_delta_completeness(self): + + results = self.get_delta_completeness()/config.threshold_delta + offenders = np.where(results > 1) + + if len(offenders) > 0: + alarms = [] + for offender, severity in zip(offenders, results): + al = alarm.Alarm( + f"Delta-completeness for feature {self.columns[offender]} exceeded threshold {config.threshold_delta}", + int(severity)) + alarms.append(al) + + return alarms + else: + return None + + + def partitioner(self): + tend = self.times[-1] + tboundary = tend - config.test_partition_length + tstart = tboundary - config.standard_partition_length + + return (tstart,tboundary), (tboundary, tend) + + def partition(self, standard_partition, test_partition): + + standard_data = self.tensor[standard_partition[0]:standard_partition[1], :, :] + test_data = self.tensor[test_partition[0]:test_partition[1], :, :] + + return standard_data, test_data + + + def assemble_alerts(self): + alerts = [] + alerts.append(self.test_global_nan_frac()) + alerts.append(self.test_time_nan_fracs()) + alerts.append(self.test_space_nan_fracs()) + alerts.append(self.test_feature_nan_fracs()) + alerts.append(self.test_delta_completeness()) + + return alerts + +def df_to_numpy(df): + """ + df: dataframe to be tensorised + + this_hash: fileneme corresponding to hash of column's path + + """ + + index = df.index + time_indices = index.levels[0].to_list() + space_indices = index.levels[1].to_list() + + original_index_tuples = index.to_list() + original_values = df.values + + nrow = len(original_index_tuples) + + ntime = len(time_indices) + nspace = len(space_indices) + nfeature = len(df.columns) + + if nrow != ntime * nspace: + + if df[df == -np.inf].count() > 0: + raise RuntimeError(f'Default does-not-exist token {-np.inf} found in input data') + + tensor = np.full((ntime, nspace, nfeature), -np.inf) + + for irow in range(len(original_index_tuples)): + idx = original_index_tuples[irow] + itime = time_indices.index(idx[0]) + ispace = space_indices.index(idx[1]) + tensor[itime, ispace, :] = original_values[irow] + + else: + tensor = utilities.df_to_tensor_strides(df) + + return tensor diff --git a/viewser/commands/queryset/operations.py b/viewser/commands/queryset/operations.py index 94e3bd1..714d758 100644 --- a/viewser/commands/queryset/operations.py +++ b/viewser/commands/queryset/operations.py @@ -13,15 +13,15 @@ import pandas as pd import io import requests -from toolz.functoolz import do, curry from pymonad.either import Either, Left, Right from pymonad.maybe import Just, Nothing, Maybe from views_schema import viewser as viewser_schema from views_schema import queryset_manager as queryset_schema from viewser import remotes from viewser.error_handling import errors, error_handling -from viewser.tui import animations -from IPython.display import display, clear_output +from . import drift_detection + +from IPython.display import clear_output from . import queryset_list @@ -39,7 +39,7 @@ def __init__(self, self._max_retries = max_retries self._error_handler = error_handler if error_handler else error_handling.ErrorDumper([]) - def fetch(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, start_date: Optional[date] = None, end_date: Optional[date] = None) -> Maybe[pd.DataFrame]: + def fetch(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, start_date: Optional[date] = None, end_date: Optional[date] = None) -> pd.DataFrame: """ fetch ===== @@ -58,10 +58,16 @@ def fetch(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, st self._remote_url, queryset_name, start_date, end_date) -# if out_file is not None: -# f.then(curry(do, lambda data: data.to_parquet(out_file))) + return f -# return f.either(self._error_handler.dump, Just) + + def fetch_with_drift_detection(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, start_date: Optional[date] = None, end_date: Optional[date] = None): + + df = self.fetch(queryset_name, out_file = out_file, start_date=start_date, end_date=end_date) + + input_alerts = drift_detection.InputGate(df).assemble_alerts() + + return df, input_alerts def list(self) -> Maybe[queryset_list.QuerysetList]: """ @@ -128,7 +134,7 @@ def _fetch( max_retries : int, base_url: str, name: str, start_date: Optional[date] = None, end_date: Optional[date] = None - ) -> Either[viewser_schema.Dump, pd.DataFrame]: + ) -> pd.DataFrame: """ _fetch ====== @@ -156,8 +162,9 @@ def _fetch( path = f"data/{name}" retries = 0 - anim = animations.LineAnimation() -# data = Right(None) + + data = None + message = None failed = False succeeded = False diff --git a/viewser/tui/__init__.py b/viewser/tui/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/viewser/tui/animations.py b/viewser/tui/animations.py deleted file mode 100644 index 212055b..0000000 --- a/viewser/tui/animations.py +++ /dev/null @@ -1,33 +0,0 @@ -from functools import partial -import itertools - -print_no_newline = partial(print, end = "", flush = True) - -class WaitingAnimation(): - CYCLE = ["-","\\", "|", "/"] - def __init__(self, message: str = ""): - self._animation_cycle = itertools.cycle(self.CYCLE) - self._previous_output = "" - self._message = message - - def _erase_prev(self): - print_no_newline("\b" * len(self._previous_output)) - - def print_next(self): - to_print = self._message + " " + next(self._animation_cycle) - self._erase_prev() - self._previous_output = to_print - print_no_newline(to_print) - - def end(self): - self._erase_prev() - -class LineAnimation(WaitingAnimation): - CYCLE = [ - ". ", - " o ", - " O ", - " O ", - " o ", - " .", - ] diff --git a/viewser/tui/ascii_art.py b/viewser/tui/ascii_art.py deleted file mode 100644 index 52b32a0..0000000 --- a/viewser/tui/ascii_art.py +++ /dev/null @@ -1,14 +0,0 @@ -""" -ASCII art assets -""" - -# Viewserspace logo -# Generated by https://patorjk.com/software/taag -# Font: Cybermedium - -VIEWSERSPACE_LOGO = """ -_ _ _ ____ _ _ _ ____ ____ ____ ____ ___ ____ ____ ____ -| | | |___ | | | [__ |___ |__/ [__ |__] |__| | |___ - \/ | |___ |_|_| ___] |___ | \ ___] | | | |___ |___ - -""" diff --git a/viewser/tui/formatting/__init__.py b/viewser/tui/formatting/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/viewser/tui/formatting/abc.py b/viewser/tui/formatting/abc.py deleted file mode 100644 index e2ea4c2..0000000 --- a/viewser/tui/formatting/abc.py +++ /dev/null @@ -1,112 +0,0 @@ -from contextlib import contextmanager -import re -from typing import Optional, List, Generic, TypeVar, Callable -from abc import ABC, abstractmethod -import click -import colorama - -T = TypeVar("T") - -class Section(ABC, Generic[T]): - """ - Section - ======= - - An output section. Subclass this to create sections that can be written as - output via a formatter. Subclass, and override the compile_output method, - which should turn a model T into a string. Override TITLE to add a section - title. - - example: - - class MyModel(pydantic.BaseModel): - name: str - value: int - - class MySection(abc.Section[MyModel]): - TITLE = "My Model" - def compile_output(self, model: MyModel): - return f"{model.name}: {model.value}" - """ - - TITLE: str = "" - - def add_section(self, model: T, formatter: "Formatter")-> None: - compiled = self.compile_output(model) - if compiled: - with formatter.text_in_section(self.TITLE): - formatter.write(formatter.indented(compiled)) - - @abstractmethod - def compile_output(self, model: T) -> Optional[str]: - pass - -class Formatter(click.HelpFormatter, ABC, Generic[T]): - """ - Formatter - ========= - - An ABC that can be used to create Formatter classes, which can be used to - print nice, consistent output across various commands. - - To create a subclass, overwrite the SECTIONS class property with a list of - Section subclasses. Sections are written as output sequentially. - - example: - - class MyFormatter(abc.Formatter): - SECTIONS = [ - HeaderSection, - SectionA, - SectionB, - FooterSection - ] - """ - - _max_data_len = 150 - _divider_length = 32 - _divider_char = "-" - SEPARATOR = "{{SEP}}" - - @property - @abstractmethod - def SECTIONS(self)-> List[Callable[[], Section[T]]]: - pass - - def __init__(self): - self.SECTIONS = [s() for s in self.SECTIONS] - colorama.init() - super().__init__() - - def indented(self, str) -> str: - return "\n".join([(" "*self.current_indent)+ ln for ln in str.split("\n")]) - - def divider(self) -> str: - return self.indented(self.SEPARATOR) - - def write_heading(self, heading: str): - self.write(self.indented( - colorama.Style.BRIGHT + heading.capitalize() + colorama.Style.RESET_ALL + "\n" - )) - self.write("\n") - - @contextmanager - def text_in_section(self, name: Optional[str] = None): - if name: - self.write(self.indented(">> "+ name)+"\n\n") - try: - yield - finally: - self.write("\n"+self.indented(self.SEPARATOR)+"\n") - - def _format(self, model: T): - for s in self.SECTIONS: - s.add_section(model, self) - - def formatted(self, model: T) -> str: - self.write("\n") - self.indent() - self._format(model) - rendered = self.getvalue() - longest_line = max([len(l) for l in rendered.split("\n")]) - return re.sub(self.SEPARATOR,"-"*longest_line,rendered) diff --git a/viewser/tui/formatting/conventions.py b/viewser/tui/formatting/conventions.py deleted file mode 100644 index a5660e3..0000000 --- a/viewser/tui/formatting/conventions.py +++ /dev/null @@ -1,5 +0,0 @@ - -from toolz.functoolz import partial -from tabulate import tabulate as vanilla_tabulate - -tabulate = partial(vanilla_tabulate, tablefmt = "pipe") diff --git a/viewser/tui/formatting/errors.py b/viewser/tui/formatting/errors.py deleted file mode 100644 index 6253234..0000000 --- a/viewser/tui/formatting/errors.py +++ /dev/null @@ -1,26 +0,0 @@ - -from typing import List -from views_schema import viewser as schema -from . import abc - -class ErrorSection(abc.Section[schema.Dump]): - def _messages_of_kind(self, model: schema.Dump, kind: schema.MessageType) -> List[schema.Message]: - return [m.content for m in model.messages if m.message_type is kind] - -class ErrorMessages(ErrorSection): - TITLE = "Error" - - def compile_output(self, model: schema.Dump) -> str: - return "\n".join(self._messages_of_kind(model, schema.MessageType.MESSAGE)) - -class RecoveryHints(ErrorSection): - TITLE = "Hints" - - def compile_output(self, model: schema.Dump) -> str: - return "\n".join(self._messages_of_kind(model, schema.MessageType.HINT)) - -class ErrorFormatter(abc.Formatter[schema.Dump]): - SECTIONS = [ - ErrorMessages, - RecoveryHints, - ] diff --git a/viewser/tui/formatting/formatters.py b/viewser/tui/formatting/formatters.py deleted file mode 100644 index 4a39779..0000000 --- a/viewser/tui/formatting/formatters.py +++ /dev/null @@ -1,12 +0,0 @@ - -from . import sections, generic_models, abc - -class ListFormatter(abc.Formatter[generic_models.ListModel]): - SECTIONS = [ - sections.ListSection - ] - -class DictFormatter(abc.Formatter[generic_models.DictModel]): - SECTIONS = [ - sections.DictSection - ] diff --git a/viewser/tui/formatting/generic_models.py b/viewser/tui/formatting/generic_models.py deleted file mode 100644 index 96b749f..0000000 --- a/viewser/tui/formatting/generic_models.py +++ /dev/null @@ -1,9 +0,0 @@ - -from typing import List, Dict, Any -import pydantic - -class ListModel(pydantic.BaseModel): - values: List[str] - -class DictModel(pydantic.BaseModel): - values: Dict[str, Any] diff --git a/viewser/tui/formatting/json_formatter.py b/viewser/tui/formatting/json_formatter.py deleted file mode 100644 index 63784f7..0000000 --- a/viewser/tui/formatting/json_formatter.py +++ /dev/null @@ -1,7 +0,0 @@ -import pydantic -from . import abc - -class JsonFormatter(abc.Formatter[pydantic.BaseModel]): - SECTIONS = [] - def formatted(self, model: pydantic.BaseModel) -> str: - return model.json() diff --git a/viewser/tui/formatting/sections.py b/viewser/tui/formatting/sections.py deleted file mode 100644 index 69f9948..0000000 --- a/viewser/tui/formatting/sections.py +++ /dev/null @@ -1,10 +0,0 @@ - -from viewser.tui.formatting import generic_models, abc, conventions - -class ListSection(abc.Section[generic_models.ListModel]): - def compile_output(self, model: generic_models.ListModel): - return conventions.tabulate([(v,) for v in model.values]) - -class DictSection(abc.Section[generic_models.DictModel]): - def compile_output(self, model: generic_models.DictModel): - return conventions.tabulate(list(model.values.items())) diff --git a/viewser/tui/formatting/styles.py b/viewser/tui/formatting/styles.py deleted file mode 100644 index 2676432..0000000 --- a/viewser/tui/formatting/styles.py +++ /dev/null @@ -1,4 +0,0 @@ -import colorama - -def bold(str)->str: - return colorama.Style.BRIGHT + str + colorama.Style.RESET_ALL diff --git a/viewser/tui/models.py b/viewser/tui/models.py deleted file mode 100644 index f881d6c..0000000 --- a/viewser/tui/models.py +++ /dev/null @@ -1,2 +0,0 @@ - -from pydantic import BaseModel diff --git a/viewser/tui/utils.py b/viewser/tui/utils.py deleted file mode 100644 index d91c2a3..0000000 --- a/viewser/tui/utils.py +++ /dev/null @@ -1,3 +0,0 @@ -import json - -pprint_json = lambda string: json.dumps(json.loads(string),indent=4) From d8ad4ab4a070368c7a48dea439ad5abcbfaa054b Mon Sep 17 00:00:00 2001 From: jimdale Date: Thu, 15 Feb 2024 14:31:53 +0100 Subject: [PATCH 2/8] restore tui --- viewser/commands/queryset/drift_detection.py | 6 +- viewser/tui/__init__.py | 0 viewser/tui/animations.py | 33 ++++++ viewser/tui/ascii_art.py | 14 +++ viewser/tui/formatting/__init__.py | 0 viewser/tui/formatting/abc.py | 112 +++++++++++++++++++ viewser/tui/formatting/conventions.py | 5 + viewser/tui/formatting/errors.py | 26 +++++ viewser/tui/formatting/formatters.py | 12 ++ viewser/tui/formatting/generic_models.py | 9 ++ viewser/tui/formatting/json_formatter.py | 7 ++ viewser/tui/formatting/sections.py | 10 ++ viewser/tui/formatting/styles.py | 4 + viewser/tui/models.py | 2 + viewser/tui/utils.py | 3 + 15 files changed, 240 insertions(+), 3 deletions(-) create mode 100644 viewser/tui/__init__.py create mode 100644 viewser/tui/animations.py create mode 100644 viewser/tui/ascii_art.py create mode 100644 viewser/tui/formatting/__init__.py create mode 100644 viewser/tui/formatting/abc.py create mode 100644 viewser/tui/formatting/conventions.py create mode 100644 viewser/tui/formatting/errors.py create mode 100644 viewser/tui/formatting/formatters.py create mode 100644 viewser/tui/formatting/generic_models.py create mode 100644 viewser/tui/formatting/json_formatter.py create mode 100644 viewser/tui/formatting/sections.py create mode 100644 viewser/tui/formatting/styles.py create mode 100644 viewser/tui/models.py create mode 100644 viewser/tui/utils.py diff --git a/viewser/commands/queryset/drift_detection.py b/viewser/commands/queryset/drift_detection.py index 6270ade..0ece379 100644 --- a/viewser/commands/queryset/drift_detection.py +++ b/viewser/commands/queryset/drift_detection.py @@ -1,8 +1,8 @@ import numpy as np import pandas as pd -from views_transform_library import utilities -import config_drift as config -from ForecastDrift import alarm +#from views_transform_library import utilities +from . import config_drift as config +from forecastdrift import alarm class InputGate: def __init__(self, df): diff --git a/viewser/tui/__init__.py b/viewser/tui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/viewser/tui/animations.py b/viewser/tui/animations.py new file mode 100644 index 0000000..212055b --- /dev/null +++ b/viewser/tui/animations.py @@ -0,0 +1,33 @@ +from functools import partial +import itertools + +print_no_newline = partial(print, end = "", flush = True) + +class WaitingAnimation(): + CYCLE = ["-","\\", "|", "/"] + def __init__(self, message: str = ""): + self._animation_cycle = itertools.cycle(self.CYCLE) + self._previous_output = "" + self._message = message + + def _erase_prev(self): + print_no_newline("\b" * len(self._previous_output)) + + def print_next(self): + to_print = self._message + " " + next(self._animation_cycle) + self._erase_prev() + self._previous_output = to_print + print_no_newline(to_print) + + def end(self): + self._erase_prev() + +class LineAnimation(WaitingAnimation): + CYCLE = [ + ". ", + " o ", + " O ", + " O ", + " o ", + " .", + ] diff --git a/viewser/tui/ascii_art.py b/viewser/tui/ascii_art.py new file mode 100644 index 0000000..52b32a0 --- /dev/null +++ b/viewser/tui/ascii_art.py @@ -0,0 +1,14 @@ +""" +ASCII art assets +""" + +# Viewserspace logo +# Generated by https://patorjk.com/software/taag +# Font: Cybermedium + +VIEWSERSPACE_LOGO = """ +_ _ _ ____ _ _ _ ____ ____ ____ ____ ___ ____ ____ ____ +| | | |___ | | | [__ |___ |__/ [__ |__] |__| | |___ + \/ | |___ |_|_| ___] |___ | \ ___] | | | |___ |___ + +""" diff --git a/viewser/tui/formatting/__init__.py b/viewser/tui/formatting/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/viewser/tui/formatting/abc.py b/viewser/tui/formatting/abc.py new file mode 100644 index 0000000..e2ea4c2 --- /dev/null +++ b/viewser/tui/formatting/abc.py @@ -0,0 +1,112 @@ +from contextlib import contextmanager +import re +from typing import Optional, List, Generic, TypeVar, Callable +from abc import ABC, abstractmethod +import click +import colorama + +T = TypeVar("T") + +class Section(ABC, Generic[T]): + """ + Section + ======= + + An output section. Subclass this to create sections that can be written as + output via a formatter. Subclass, and override the compile_output method, + which should turn a model T into a string. Override TITLE to add a section + title. + + example: + + class MyModel(pydantic.BaseModel): + name: str + value: int + + class MySection(abc.Section[MyModel]): + TITLE = "My Model" + def compile_output(self, model: MyModel): + return f"{model.name}: {model.value}" + """ + + TITLE: str = "" + + def add_section(self, model: T, formatter: "Formatter")-> None: + compiled = self.compile_output(model) + if compiled: + with formatter.text_in_section(self.TITLE): + formatter.write(formatter.indented(compiled)) + + @abstractmethod + def compile_output(self, model: T) -> Optional[str]: + pass + +class Formatter(click.HelpFormatter, ABC, Generic[T]): + """ + Formatter + ========= + + An ABC that can be used to create Formatter classes, which can be used to + print nice, consistent output across various commands. + + To create a subclass, overwrite the SECTIONS class property with a list of + Section subclasses. Sections are written as output sequentially. + + example: + + class MyFormatter(abc.Formatter): + SECTIONS = [ + HeaderSection, + SectionA, + SectionB, + FooterSection + ] + """ + + _max_data_len = 150 + _divider_length = 32 + _divider_char = "-" + SEPARATOR = "{{SEP}}" + + @property + @abstractmethod + def SECTIONS(self)-> List[Callable[[], Section[T]]]: + pass + + def __init__(self): + self.SECTIONS = [s() for s in self.SECTIONS] + colorama.init() + super().__init__() + + def indented(self, str) -> str: + return "\n".join([(" "*self.current_indent)+ ln for ln in str.split("\n")]) + + def divider(self) -> str: + return self.indented(self.SEPARATOR) + + def write_heading(self, heading: str): + self.write(self.indented( + colorama.Style.BRIGHT + heading.capitalize() + colorama.Style.RESET_ALL + "\n" + )) + self.write("\n") + + @contextmanager + def text_in_section(self, name: Optional[str] = None): + if name: + self.write(self.indented(">> "+ name)+"\n\n") + try: + yield + finally: + self.write("\n"+self.indented(self.SEPARATOR)+"\n") + + def _format(self, model: T): + for s in self.SECTIONS: + s.add_section(model, self) + + def formatted(self, model: T) -> str: + self.write("\n") + self.indent() + self._format(model) + rendered = self.getvalue() + longest_line = max([len(l) for l in rendered.split("\n")]) + return re.sub(self.SEPARATOR,"-"*longest_line,rendered) diff --git a/viewser/tui/formatting/conventions.py b/viewser/tui/formatting/conventions.py new file mode 100644 index 0000000..a5660e3 --- /dev/null +++ b/viewser/tui/formatting/conventions.py @@ -0,0 +1,5 @@ + +from toolz.functoolz import partial +from tabulate import tabulate as vanilla_tabulate + +tabulate = partial(vanilla_tabulate, tablefmt = "pipe") diff --git a/viewser/tui/formatting/errors.py b/viewser/tui/formatting/errors.py new file mode 100644 index 0000000..6253234 --- /dev/null +++ b/viewser/tui/formatting/errors.py @@ -0,0 +1,26 @@ + +from typing import List +from views_schema import viewser as schema +from . import abc + +class ErrorSection(abc.Section[schema.Dump]): + def _messages_of_kind(self, model: schema.Dump, kind: schema.MessageType) -> List[schema.Message]: + return [m.content for m in model.messages if m.message_type is kind] + +class ErrorMessages(ErrorSection): + TITLE = "Error" + + def compile_output(self, model: schema.Dump) -> str: + return "\n".join(self._messages_of_kind(model, schema.MessageType.MESSAGE)) + +class RecoveryHints(ErrorSection): + TITLE = "Hints" + + def compile_output(self, model: schema.Dump) -> str: + return "\n".join(self._messages_of_kind(model, schema.MessageType.HINT)) + +class ErrorFormatter(abc.Formatter[schema.Dump]): + SECTIONS = [ + ErrorMessages, + RecoveryHints, + ] diff --git a/viewser/tui/formatting/formatters.py b/viewser/tui/formatting/formatters.py new file mode 100644 index 0000000..4a39779 --- /dev/null +++ b/viewser/tui/formatting/formatters.py @@ -0,0 +1,12 @@ + +from . import sections, generic_models, abc + +class ListFormatter(abc.Formatter[generic_models.ListModel]): + SECTIONS = [ + sections.ListSection + ] + +class DictFormatter(abc.Formatter[generic_models.DictModel]): + SECTIONS = [ + sections.DictSection + ] diff --git a/viewser/tui/formatting/generic_models.py b/viewser/tui/formatting/generic_models.py new file mode 100644 index 0000000..96b749f --- /dev/null +++ b/viewser/tui/formatting/generic_models.py @@ -0,0 +1,9 @@ + +from typing import List, Dict, Any +import pydantic + +class ListModel(pydantic.BaseModel): + values: List[str] + +class DictModel(pydantic.BaseModel): + values: Dict[str, Any] diff --git a/viewser/tui/formatting/json_formatter.py b/viewser/tui/formatting/json_formatter.py new file mode 100644 index 0000000..63784f7 --- /dev/null +++ b/viewser/tui/formatting/json_formatter.py @@ -0,0 +1,7 @@ +import pydantic +from . import abc + +class JsonFormatter(abc.Formatter[pydantic.BaseModel]): + SECTIONS = [] + def formatted(self, model: pydantic.BaseModel) -> str: + return model.json() diff --git a/viewser/tui/formatting/sections.py b/viewser/tui/formatting/sections.py new file mode 100644 index 0000000..69f9948 --- /dev/null +++ b/viewser/tui/formatting/sections.py @@ -0,0 +1,10 @@ + +from viewser.tui.formatting import generic_models, abc, conventions + +class ListSection(abc.Section[generic_models.ListModel]): + def compile_output(self, model: generic_models.ListModel): + return conventions.tabulate([(v,) for v in model.values]) + +class DictSection(abc.Section[generic_models.DictModel]): + def compile_output(self, model: generic_models.DictModel): + return conventions.tabulate(list(model.values.items())) diff --git a/viewser/tui/formatting/styles.py b/viewser/tui/formatting/styles.py new file mode 100644 index 0000000..2676432 --- /dev/null +++ b/viewser/tui/formatting/styles.py @@ -0,0 +1,4 @@ +import colorama + +def bold(str)->str: + return colorama.Style.BRIGHT + str + colorama.Style.RESET_ALL diff --git a/viewser/tui/models.py b/viewser/tui/models.py new file mode 100644 index 0000000..f881d6c --- /dev/null +++ b/viewser/tui/models.py @@ -0,0 +1,2 @@ + +from pydantic import BaseModel diff --git a/viewser/tui/utils.py b/viewser/tui/utils.py new file mode 100644 index 0000000..d91c2a3 --- /dev/null +++ b/viewser/tui/utils.py @@ -0,0 +1,3 @@ +import json + +pprint_json = lambda string: json.dumps(json.loads(string),indent=4) From cc409f36da61854b1f44db94d114e714af1a3907 Mon Sep 17 00:00:00 2001 From: jimdale Date: Fri, 15 Mar 2024 09:52:35 +0100 Subject: [PATCH 3/8] integrate views-tensor-utilities into drift detection and implement rudimentary user-configuration system --- viewser/commands/queryset/config_drift.py | 11 +- viewser/commands/queryset/drift_alert.py | 5 - viewser/commands/queryset/drift_detection.py | 273 ++++++++++++------- viewser/commands/queryset/models/queryset.py | 15 + viewser/commands/queryset/operations.py | 23 +- 5 files changed, 213 insertions(+), 114 deletions(-) delete mode 100644 viewser/commands/queryset/drift_alert.py diff --git a/viewser/commands/queryset/config_drift.py b/viewser/commands/queryset/config_drift.py index 591f3f7..11019d8 100644 --- a/viewser/commands/queryset/config_drift.py +++ b/viewser/commands/queryset/config_drift.py @@ -1,8 +1,7 @@ -threshold_global_nan_frac = 0.05 -threshold_feature_nan_frac = 0.15 -threshold_time_unit_nan_frac = 0.15 -threshold_space_unit_nan_frac = 0.15 -threshold_delta = 1.5 +import numpy as np + +default_dne = -np.inf +default_missing = np.nan standard_partition_length = 10 -test_partition_length = 1 \ No newline at end of file +test_partition_length = 1 diff --git a/viewser/commands/queryset/drift_alert.py b/viewser/commands/queryset/drift_alert.py deleted file mode 100644 index 2518929..0000000 --- a/viewser/commands/queryset/drift_alert.py +++ /dev/null @@ -1,5 +0,0 @@ -class DriftAlert: - - def __init__(self,offender,uoas): - self.offender = offender - self.uoas = uoas \ No newline at end of file diff --git a/viewser/commands/queryset/drift_detection.py b/viewser/commands/queryset/drift_detection.py index 0ece379..f0721c7 100644 --- a/viewser/commands/queryset/drift_detection.py +++ b/viewser/commands/queryset/drift_detection.py @@ -1,48 +1,95 @@ import numpy as np -import pandas as pd -#from views_transform_library import utilities +from views_tensor_utilities import objects, mappings +from . import utilities from . import config_drift as config from forecastdrift import alarm + + class InputGate: - def __init__(self, df): - self.tensor = df_to_numpy(df) - self.index = df.index - self.columns = df.columns - self.times, self.time_to_index, self.index_to_time = utilities.map_times(self.index) - self.spaces, self.space_to_index, self.index_to_space = utilities.map_times(self.index) + def __init__(self, df, drift_config_dict): + self.config_dict = drift_config_dict + self.tensor_container = objects.ViewsDataframe(df).to_numpy_time_space().get_numeric_tensor() + self.index = self.tensor_container.index + self.columns = self.tensor_container.columns + self.times = mappings.TimeUnits.from_pandas(self.index) + self.spaces = mappings.SpaceUnits.from_pandas(self.index) self.uoa_mask = self.get_valid_uoa_mask() + self.default_config_dict = self.get_default_config_dict() def get_valid_uoa_mask(self): - return ~np.where(self.tensor==-np.inf,True,False) + """ + get_valid_uoa_mask + + Compute a boolean mask to mask out units-of-analysis which do not exist (e.g countries that do not exist + in a given month) + + """ + + return ~np.where(self.tensor_container.tensor == config.default_dne, True, False) + + def get_default_config_dict(self): + + return { + 'global_missingness': 0.05, + 'time_missingness': 0.01, + 'space_missingness': 0.03, + 'feature_missingness': 0.01, + 'delta_completeness': 1.25 - def test_global_nan_frac(self): - if severity:=(np.count_nonzero(np.isnan(self.tensor[self.uoa_mask]))/np.count_nonzero(self.uoa_mask))/config.threshold_global_nan_frac>1: - return alarm.Alarm(f"Global missingness exceeded threshold {config.threshold_global_nan_frac}",int(severity)) + } + + + def test_global_nan_frac(self, threshold): + """ + test_global_nan_frac + + Raise alarm if global missingness fraction exceeds threshold + + """ + + if severity := (np.count_nonzero(np.isnan(self.tensor_container.tensor[self.uoa_mask]))/ + np.count_nonzero(self.uoa_mask))/threshold > 1: + return alarm.Alarm(f"Global missingness exceeded threshold {threshold}", + int(1+severity)) else: return None def get_time_nan_fracs(self): + """ + get_time_nan_fracs + + Compute missing fractions for all time units + + """ + time_nan_fracs = [] - nans = np.where(np.isnan(self.tensor),1,0) - for itime in range(self.tensor.shape[0]): + nans = np.where(np.isnan(self.tensor_container.tensor), 1, 0) + for itime in range(self.tensor_container.tensor.shape[0]): time_nan_fracs.append( - np.count_nonzero(nans[itime,:,:])/np.count_nonzero(self.uoa_mask[itime,:,:]) + np.count_nonzero(nans[itime, :, :])/np.count_nonzero(self.uoa_mask[itime, :, :]) ) return np.array(time_nan_fracs) - def test_time_nan_fracs(self): + def test_time_nan_fracs(self, threshold): + """ + test_time_nan_fracs + + Generate alarms for any time units whose missingness exceeds a threshold - results = self.get_time_nan_fracs()/config.threshold_time_unit_nan_frac - offenders = np.where(results>1) + """ + + results = self.get_time_nan_fracs()/threshold + offenders = np.where(results>1)[0] if len(offenders) > 0: alarms = [] - for offender,severity in zip(offenders,results): + for offender, severity in zip(offenders, results): al = alarm.Alarm( - f"Missingness for time unit {self.index_to_time[offender]} exceeded threshold {config.threshold_time_unit_nan_frac}", - int(severity)) + f"Missingness for time unit {self.times.index_to_time[offender]} " + f"exceeded threshold {threshold}", + int(1+severity)) alarms.append(al) return alarms @@ -50,26 +97,40 @@ def test_time_nan_fracs(self): return None def get_space_nan_fracs(self): + """ + get_space_nan_fracs + + Compute missing fractions for all space units + + """ + space_nan_fracs = [] - nans = np.where(np.isnan(self.tensor), 1, 0) - for ispace in range(self.tensor.shape[1]): + nans = np.where(np.isnan(self.tensor_container.tensor), 1, 0) + for ispace in range(self.tensor_container.tensor.shape[1]): space_nan_fracs.append( np.count_nonzero(nans[:, ispace, :]) / np.count_nonzero(self.uoa_mask[:, ispace, :]) ) return np.array(space_nan_fracs) - def test_space_nan_fracs(self): + def test_space_nan_fracs(self, threshold): + """ + test_space_nan_fracs + + Generate alarms for any space units whose missingness exceeds a threshold - results = self.get_space_nan_fracs() / config.threshold_space_unit_nan_frac - offenders = np.where(results > 1) + """ + + results = self.get_space_nan_fracs() / threshold + offenders = np.where(results > 1)[0] if len(offenders) > 0: alarms = [] for offender, severity in zip(offenders, results): al = alarm.Alarm( - f"Missingness for space unit {self.index_to_space[offender]} exceeded threshold {config.threshold_space_unit_nan_frac}", - int(severity)) + f"Missingness for space unit {self.spaces.index_to_space[offender]} " + f"exceeded threshold {threshold}", + int(1+severity)) alarms.append(al) return alarms @@ -77,26 +138,39 @@ def test_space_nan_fracs(self): return None def get_feature_nan_fracs(self): + """ + get_feature_nan_fracs + + Compute missing fractions for all features + + """ feature_nan_fracs = [] - nans = np.where(np.isnan(self.tensor), 1, 0) - for ifeature in range(self.tensor.shape[2]): + nans = np.where(np.isnan(self.tensor_container.tensor), 1, 0) + for ifeature in range(self.tensor_container.shape[2]): feature_nan_fracs.append( np.count_nonzero(nans[:, :, ifeature]) / np.count_nonzero(self.uoa_mask[:, :, ifeature]) ) return np.array(feature_nan_fracs) - def test_feature_nan_fracs(self): + def test_feature_nan_fracs(self, threshold): + """ + test_feature_nan_fracs - results = self.get_feature_nan_fracs() / config.threshold_feature_nan_frac - offenders = np.where(results > 1) + Generate alarms for any features whose missingness exceeds a threshold + + """ + + results = self.get_feature_nan_fracs() / threshold + offenders = np.where(results > 1)[0] if len(offenders) > 0: alarms = [] for offender, severity in zip(offenders, results): al = alarm.Alarm( - f"Missingness for feature {self.columns[offender]} exceeded threshold {config.threshold_feature_nan_frac}", - int(severity)) + f"Missingness for feature {self.columns[offender]} " + f"exceeded threshold {threshold}", + int(1+severity)) alarms.append(al) return alarms @@ -104,101 +178,106 @@ def test_feature_nan_fracs(self): return None def get_delta_completeness(self): + """ + get_delta_completeness + + Compute delta_completenesses for every feature with the specified standard (i.e. trustworthy) and test + (untrustworthy) partitions. + + """ + delta_completenesses = [] - standard_partition, test_partition = self.partitioner() - standard, test = self.partition(standard_partition, test_partition) - for ifeature in range(self.tensor.shape[2]): - standard_nans = np.where(np.isnan(standard[:,:,ifeature]), 1, 0) - test_nans = np.where(np.isnan(test[:,:,ifeature]), 1, 0) - standard_uoa_mask = ~np.where(standard==-np.inf,True,False) - test_uoa_mask = ~np.where(test==-np.inf,True,False) + standard, test = self.partition() + + for ifeature in range(self.tensor_container.tensor.shape[2]): + standard_nans = np.where(np.isnan(standard[:, :, ifeature]), 1, 0) + test_nans = np.where(np.isnan(test[:, :, ifeature]), 1, 0) - standard_nan_frac = np.count_nonzero(standard_nans)/np.count_nonzero(standard_uoa_mask) - test_nan_frac = np.count_nonzero(test_nans)/np.count_nonzero(test_uoa_mask) + standard_uoa_mask = ~np.where(standard == config.default_dne, True, False) + test_uoa_mask = ~np.where(test == config.default_dne, True, False) - delta_completenesses.append(np.abs(test_nan_frac-standard_nan_frac)/standard_nan_frac+1e-20) + standard_nan_frac = np.count_nonzero(standard_nans)/(np.count_nonzero(standard_uoa_mask)+1e-20) + test_nan_frac = np.count_nonzero(test_nans)/(np.count_nonzero(test_uoa_mask)+1e-20) + + delta_completenesses.append(np.abs(test_nan_frac-standard_nan_frac)/(standard_nan_frac+1e-20)) return np.array(delta_completenesses) - def test_delta_completeness(self): + def test_delta_completeness(self, threshold): + """ + test_delta_completeness + + Generate alarms for any features whose delta-completeness exceeds a threshold + + """ - results = self.get_delta_completeness()/config.threshold_delta - offenders = np.where(results > 1) + results = self.get_delta_completeness()/threshold + offenders = np.where(results > 1)[0] if len(offenders) > 0: alarms = [] for offender, severity in zip(offenders, results): al = alarm.Alarm( - f"Delta-completeness for feature {self.columns[offender]} exceeded threshold {config.threshold_delta}", - int(severity)) + f"Delta-completeness for feature {self.columns[offender]} " + f"exceeded threshold {threshold}", + int(1+severity)) alarms.append(al) return alarms else: return None + def partition(self): + """ + partitioner + + Partition the input data tensor according to partitions in drift_config + + """ - def partitioner(self): tend = self.times[-1] tboundary = tend - config.test_partition_length tstart = tboundary - config.standard_partition_length - return (tstart,tboundary), (tboundary, tend) - - def partition(self, standard_partition, test_partition): + standard_partition, test_partition = (tstart, tboundary), (tboundary, tend) - standard_data = self.tensor[standard_partition[0]:standard_partition[1], :, :] - test_data = self.tensor[test_partition[0]:test_partition[1], :, :] + standard_data = self.tensor_container.tensor[standard_partition[0]:standard_partition[1], :, :] + test_data = self.tensor_container.tensor[test_partition[0]:test_partition[1], :, :] return standard_data, test_data - def assemble_alerts(self): - alerts = [] - alerts.append(self.test_global_nan_frac()) - alerts.append(self.test_time_nan_fracs()) - alerts.append(self.test_space_nan_fracs()) - alerts.append(self.test_feature_nan_fracs()) - alerts.append(self.test_delta_completeness()) - - return alerts - -def df_to_numpy(df): - """ - df: dataframe to be tensorised - - this_hash: fileneme corresponding to hash of column's path + """ + assemble_alerts - """ + Get lists of alerts generated by alert-generators - index = df.index - time_indices = index.levels[0].to_list() - space_indices = index.levels[1].to_list() + """ - original_index_tuples = index.to_list() - original_values = df.values + # override defaults - nrow = len(original_index_tuples) + for key in self.config_dict.keys(): + if key not in self.default_config_dict.keys(): + raise KeyError(f'missingness {key} not in allowed missingness types:' + f'{self.default_config_dict.keys()}') - ntime = len(time_indices) - nspace = len(space_indices) - nfeature = len(df.columns) + else: + self.default_config_dict[key] = self.config_dict[key] - if nrow != ntime * nspace: - - if df[df == -np.inf].count() > 0: - raise RuntimeError(f'Default does-not-exist token {-np.inf} found in input data') - - tensor = np.full((ntime, nspace, nfeature), -np.inf) - - for irow in range(len(original_index_tuples)): - idx = original_index_tuples[irow] - itime = time_indices.index(idx[0]) - ispace = space_indices.index(idx[1]) - tensor[itime, ispace, :] = original_values[irow] - - else: - tensor = utilities.df_to_tensor_strides(df) + alerts = [] + for key in self.default_config_dict.keys(): + if key in self.config_dict.keys(): + match key: + case 'global_missingness': + alerts.append(self.test_global_nan_frac(self.default_config_dict[key])) + case 'time_missingness': + alerts.append(self.test_time_nan_fracs(self.default_config_dict[key])) + case 'space_missingness': + alerts.append(self.test_space_nan_fracs(self.default_config_dict[key])) + case 'feature_missingness': + alerts.append(self.test_feature_nan_fracs(self.default_config_dict[key])) + case 'delta_completeness': + alerts.append(self.test_delta_completeness(self.default_config_dict[key])) - return tensor + return alerts diff --git a/viewser/commands/queryset/models/queryset.py b/viewser/commands/queryset/models/queryset.py index 3138ed4..a2f7b34 100644 --- a/viewser/commands/queryset/models/queryset.py +++ b/viewser/commands/queryset/models/queryset.py @@ -119,3 +119,18 @@ def fetch(self, *args, **kwargs): logger.info(f"Fetching queryset {self.name}") dataset = queryset_operations.fetch(self.name, *args, **kwargs)#.maybe(None, lambda x:x) return dataset + + def fetch_with_drift_detection(self, *args, **kwargs): + """ + fetch + ===== + + returns: + pandas.DataFrame + + Fetch the dataset corresponding to this queryset in its current state. + Requires a self.push first. + """ + logger.info(f"Fetching queryset {self.name}") + dataset = queryset_operations.fetch_with_drift_detection(self.name, *args, **kwargs)#.maybe(None, lambda x:x) + return dataset diff --git a/viewser/commands/queryset/operations.py b/viewser/commands/queryset/operations.py index 714d758..94a3f80 100644 --- a/viewser/commands/queryset/operations.py +++ b/viewser/commands/queryset/operations.py @@ -29,6 +29,7 @@ response_json = lambda rsp: rsp.json() + class QuerysetOperations(): def __init__(self, @@ -39,7 +40,10 @@ def __init__(self, self._max_retries = max_retries self._error_handler = error_handler if error_handler else error_handling.ErrorDumper([]) - def fetch(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, start_date: Optional[date] = None, end_date: Optional[date] = None) -> pd.DataFrame: + def fetch(self, queryset_name: str, + out_file: Optional[BufferedWriter] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None) -> pd.DataFrame: """ fetch ===== @@ -61,11 +65,17 @@ def fetch(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, st return f - def fetch_with_drift_detection(self, queryset_name:str, out_file: Optional[BufferedWriter] = None, start_date: Optional[date] = None, end_date: Optional[date] = None): + def fetch_with_drift_detection(self, + queryset_name: str, + out_file: Optional[BufferedWriter] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + drift_config_dict: Optional[dict] = None + ): - df = self.fetch(queryset_name, out_file = out_file, start_date=start_date, end_date=end_date) + df = self.fetch(queryset_name, out_file=out_file, start_date=start_date, end_date=end_date) - input_alerts = drift_detection.InputGate(df).assemble_alerts() + input_alerts = drift_detection.InputGate(df, drift_config_dict).assemble_alerts() return df, input_alerts @@ -147,7 +157,7 @@ def _fetch( Returns: Either[errors.Dump, pd.DataFrame] """ - start_date, end_date = [date.strftime("%Y-%m-%d") if date else None for date in (start_date, end_date)] +# start_date, end_date = [date.strftime("%Y-%m-%d") if date else None for date in (start_date, end_date)] checks = [ remotes.check_4xx, @@ -176,7 +186,8 @@ def _fetch( data = remotes.request(base_url, "GET", checks, path, parameters=parameters) try: - data = pd.read_parquet(io.BytesIO(data.value.content)) + data = pd.read_parquet(io.BytesIO(data.value.content)).loc[start_date:end_date] + data.index = data.index.remove_unused_levels() clear_output(wait=True) print(f'{retries+1}: Queryset data read successfully', end="\r") succeeded = True From bf757a13040cb52479900d8fcc729041983bbc47 Mon Sep 17 00:00:00 2001 From: jimdale Date: Tue, 19 Mar 2024 10:28:22 +0100 Subject: [PATCH 4/8] fix bug in delta-completeness, add partitions sizes to config --- viewser/commands/queryset/config_drift.py | 3 - viewser/commands/queryset/drift_detection.py | 74 +++++++++++++------- viewser/commands/queryset/operations.py | 2 +- 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/viewser/commands/queryset/config_drift.py b/viewser/commands/queryset/config_drift.py index 11019d8..e731ef6 100644 --- a/viewser/commands/queryset/config_drift.py +++ b/viewser/commands/queryset/config_drift.py @@ -2,6 +2,3 @@ default_dne = -np.inf default_missing = np.nan - -standard_partition_length = 10 -test_partition_length = 1 diff --git a/viewser/commands/queryset/drift_detection.py b/viewser/commands/queryset/drift_detection.py index f0721c7..225e810 100644 --- a/viewser/commands/queryset/drift_detection.py +++ b/viewser/commands/queryset/drift_detection.py @@ -1,17 +1,31 @@ import numpy as np from views_tensor_utilities import objects, mappings -from . import utilities from . import config_drift as config -from forecastdrift import alarm +import datetime + + +class Alarm: + def __repr__(self): + return f"Alarm: {self.message} Severity: {self.severity} Timestamp: {self.timestamp}" + + def __str__(self): + return f"Alarm: {self.message} Severity: {self.severity} Timestamp: {self.timestamp}" + + def __init__(self, message, severity=1): + self.message = message + self.severity = severity + self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") class InputGate: def __init__(self, df, drift_config_dict): self.config_dict = drift_config_dict - self.tensor_container = objects.ViewsDataframe(df).to_numpy_time_space().get_numeric_tensor() + self.tensor_container = objects.ViewsDataframe(df).to_numpy_time_space() + self.numeric_part = self.tensor_container.get_numeric_part() + self.tensor = self.numeric_part.tensor self.index = self.tensor_container.index - self.columns = self.tensor_container.columns + self.columns = self.numeric_part.columns self.times = mappings.TimeUnits.from_pandas(self.index) self.spaces = mappings.SpaceUnits.from_pandas(self.index) self.uoa_mask = self.get_valid_uoa_mask() @@ -26,7 +40,7 @@ def get_valid_uoa_mask(self): """ - return ~np.where(self.tensor_container.tensor == config.default_dne, True, False) + return ~np.where(self.tensor == config.default_dne, True, False) def get_default_config_dict(self): @@ -35,11 +49,12 @@ def get_default_config_dict(self): 'time_missingness': 0.01, 'space_missingness': 0.03, 'feature_missingness': 0.01, - 'delta_completeness': 1.25 + 'delta_completeness': 1.25, + 'standard_partition_length': 10, + 'test_partition_length': 1 } - def test_global_nan_frac(self, threshold): """ test_global_nan_frac @@ -48,10 +63,10 @@ def test_global_nan_frac(self, threshold): """ - if severity := (np.count_nonzero(np.isnan(self.tensor_container.tensor[self.uoa_mask]))/ + if severity := (np.count_nonzero(np.isnan(self.tensor[self.uoa_mask]))/ np.count_nonzero(self.uoa_mask))/threshold > 1: - return alarm.Alarm(f"Global missingness exceeded threshold {threshold}", - int(1+severity)) + return Alarm(f"Global missingness exceeded threshold {threshold}", + int(1+severity)) else: return None @@ -64,8 +79,8 @@ def get_time_nan_fracs(self): """ time_nan_fracs = [] - nans = np.where(np.isnan(self.tensor_container.tensor), 1, 0) - for itime in range(self.tensor_container.tensor.shape[0]): + nans = np.where(np.isnan(self.tensor), 1, 0) + for itime in range(self.tensor.shape[0]): time_nan_fracs.append( np.count_nonzero(nans[itime, :, :])/np.count_nonzero(self.uoa_mask[itime, :, :]) ) @@ -86,7 +101,7 @@ def test_time_nan_fracs(self, threshold): if len(offenders) > 0: alarms = [] for offender, severity in zip(offenders, results): - al = alarm.Alarm( + al = Alarm( f"Missingness for time unit {self.times.index_to_time[offender]} " f"exceeded threshold {threshold}", int(1+severity)) @@ -105,8 +120,8 @@ def get_space_nan_fracs(self): """ space_nan_fracs = [] - nans = np.where(np.isnan(self.tensor_container.tensor), 1, 0) - for ispace in range(self.tensor_container.tensor.shape[1]): + nans = np.where(np.isnan(self.tensor), 1, 0) + for ispace in range(self.tensor.shape[1]): space_nan_fracs.append( np.count_nonzero(nans[:, ispace, :]) / np.count_nonzero(self.uoa_mask[:, ispace, :]) ) @@ -127,7 +142,7 @@ def test_space_nan_fracs(self, threshold): if len(offenders) > 0: alarms = [] for offender, severity in zip(offenders, results): - al = alarm.Alarm( + al = Alarm( f"Missingness for space unit {self.spaces.index_to_space[offender]} " f"exceeded threshold {threshold}", int(1+severity)) @@ -145,8 +160,8 @@ def get_feature_nan_fracs(self): """ feature_nan_fracs = [] - nans = np.where(np.isnan(self.tensor_container.tensor), 1, 0) - for ifeature in range(self.tensor_container.shape[2]): + nans = np.where(np.isnan(self.tensor), 1, 0) + for ifeature in range(self.tensor.shape[2]): feature_nan_fracs.append( np.count_nonzero(nans[:, :, ifeature]) / np.count_nonzero(self.uoa_mask[:, :, ifeature]) ) @@ -167,7 +182,7 @@ def test_feature_nan_fracs(self, threshold): if len(offenders) > 0: alarms = [] for offender, severity in zip(offenders, results): - al = alarm.Alarm( + al = Alarm( f"Missingness for feature {self.columns[offender]} " f"exceeded threshold {threshold}", int(1+severity)) @@ -190,7 +205,7 @@ def get_delta_completeness(self): standard, test = self.partition() - for ifeature in range(self.tensor_container.tensor.shape[2]): + for ifeature in range(self.tensor.shape[2]): standard_nans = np.where(np.isnan(standard[:, :, ifeature]), 1, 0) test_nans = np.where(np.isnan(test[:, :, ifeature]), 1, 0) @@ -213,12 +228,13 @@ def test_delta_completeness(self, threshold): """ results = self.get_delta_completeness()/threshold + offenders = np.where(results > 1)[0] if len(offenders) > 0: alarms = [] for offender, severity in zip(offenders, results): - al = alarm.Alarm( + al = Alarm( f"Delta-completeness for feature {self.columns[offender]} " f"exceeded threshold {threshold}", int(1+severity)) @@ -236,14 +252,18 @@ def partition(self): """ - tend = self.times[-1] - tboundary = tend - config.test_partition_length - tstart = tboundary - config.standard_partition_length + tend = self.times.times[-1] + tboundary = tend - self.default_config_dict['test_partition_length'] + tstart = tboundary - self.default_config_dict['standard_partition_length'] + + tstart_index = self.times.time_to_index[tstart] + tboundary_index = self.times.time_to_index[tboundary] + tend_index = self.times.time_to_index[tend] - standard_partition, test_partition = (tstart, tboundary), (tboundary, tend) + standard_partition, test_partition = (tstart_index, tboundary_index), (tboundary_index, tend_index) - standard_data = self.tensor_container.tensor[standard_partition[0]:standard_partition[1], :, :] - test_data = self.tensor_container.tensor[test_partition[0]:test_partition[1], :, :] + standard_data = self.tensor[standard_partition[0]:standard_partition[1], :, :] + test_data = self.tensor[test_partition[0]:test_partition[1], :, :] return standard_data, test_data diff --git a/viewser/commands/queryset/operations.py b/viewser/commands/queryset/operations.py index bdf7206..a39e911 100644 --- a/viewser/commands/queryset/operations.py +++ b/viewser/commands/queryset/operations.py @@ -198,7 +198,7 @@ def _fetch( print(f'\n') print(f'\r {retries + 1}: {message}', flush=True, end="\r") else: - print(f'\r {retries+1}: {message}', flush=True, end="\r") + print(f'\r {retries + 1}: {message}', flush=True, end="\r") if 'failed' in message: failed = True data = message From d727956be99b59bb4daa79e516d213e20001e3a0 Mon Sep 17 00:00:00 2001 From: jimdale Date: Mon, 8 Apr 2024 09:36:54 +0200 Subject: [PATCH 5/8] total refactor - new Tester class, move test functions to new module --- viewser/commands/queryset/config_drift.py | 41 +++ viewser/commands/queryset/drift_detection.py | 342 ++++++------------ viewser/commands/queryset/integrity_checks.py | 298 +++++++++++++++ viewser/commands/queryset/operations.py | 34 +- 4 files changed, 475 insertions(+), 240 deletions(-) create mode 100644 viewser/commands/queryset/integrity_checks.py diff --git a/viewser/commands/queryset/config_drift.py b/viewser/commands/queryset/config_drift.py index e731ef6..b927036 100644 --- a/viewser/commands/queryset/config_drift.py +++ b/viewser/commands/queryset/config_drift.py @@ -1,4 +1,45 @@ import numpy as np +from . import integrity_checks as ic default_dne = -np.inf default_missing = np.nan + + +default_config_dict = { + + 'global_missingness': {'threshold': 0.05, + 'test_function': ic.get_global_nan_fracs, + 'message': 'dataset missingness'}, + + 'time_missingness': {'threshold': 0.01, + 'test_function': ic.get_time_nan_fracs, + 'message': 'time-unit missingness'}, + + 'space_missingness': {'threshold': 0.03, + 'test_function': ic.get_space_nan_fracs, + 'message': 'space-unit missingness'}, + + 'feature_missingness': {'threshold': 0.01, + 'test_function': ic.get_feature_nan_fracs, + 'message': 'feature missingness'}, + + 'delta_completeness': {'threshold': 1.25, + 'test_function': ic.get_delta_completeness, + 'message': 'feature delta_completeness'}, + + 'delta_zeroes': {'threshold': 1.25, + 'test_function': ic.get_delta_zeroes, + 'message': 'feature delta_zeroes'}, + + 'ks_drift': {'threshold': 100., + 'test_function': ic.get_ks_drift, + 'message': 'feature KS drift'}, + + 'ecod_drift': {'threshold': 0.05, + 'test_function': ic.get_ecod_drift, + 'message': 'dataset ECOD drift'}, + + 'standard_partition_length': 10, + 'test_partition_length': 1 + + } diff --git a/viewser/commands/queryset/drift_detection.py b/viewser/commands/queryset/drift_detection.py index 225e810..b1b948f 100644 --- a/viewser/commands/queryset/drift_detection.py +++ b/viewser/commands/queryset/drift_detection.py @@ -1,4 +1,5 @@ import numpy as np +import scipy from views_tensor_utilities import objects, mappings from . import config_drift as config import datetime @@ -17,226 +18,75 @@ def __init__(self, message, severity=1): self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") -class InputGate: - - def __init__(self, df, drift_config_dict): - self.config_dict = drift_config_dict - self.tensor_container = objects.ViewsDataframe(df).to_numpy_time_space() - self.numeric_part = self.tensor_container.get_numeric_part() - self.tensor = self.numeric_part.tensor - self.index = self.tensor_container.index - self.columns = self.numeric_part.columns - self.times = mappings.TimeUnits.from_pandas(self.index) - self.spaces = mappings.SpaceUnits.from_pandas(self.index) - self.uoa_mask = self.get_valid_uoa_mask() - self.default_config_dict = self.get_default_config_dict() - - def get_valid_uoa_mask(self): - """ - get_valid_uoa_mask - - Compute a boolean mask to mask out units-of-analysis which do not exist (e.g countries that do not exist - in a given month) - - """ - - return ~np.where(self.tensor == config.default_dne, True, False) - - def get_default_config_dict(self): - - return { - 'global_missingness': 0.05, - 'time_missingness': 0.01, - 'space_missingness': 0.03, - 'feature_missingness': 0.01, - 'delta_completeness': 1.25, - 'standard_partition_length': 10, - 'test_partition_length': 1 - - } - - def test_global_nan_frac(self, threshold): - """ - test_global_nan_frac - - Raise alarm if global missingness fraction exceeds threshold - - """ - - if severity := (np.count_nonzero(np.isnan(self.tensor[self.uoa_mask]))/ - np.count_nonzero(self.uoa_mask))/threshold > 1: - return Alarm(f"Global missingness exceeded threshold {threshold}", - int(1+severity)) - else: - return None - - def get_time_nan_fracs(self): - """ - get_time_nan_fracs +class Tester: - Compute missing fractions for all time units + """ + Tester - """ - - time_nan_fracs = [] - nans = np.where(np.isnan(self.tensor), 1, 0) - for itime in range(self.tensor.shape[0]): - time_nan_fracs.append( - np.count_nonzero(nans[itime, :, :])/np.count_nonzero(self.uoa_mask[itime, :, :]) - ) - - return np.array(time_nan_fracs) - - def test_time_nan_fracs(self, threshold): - """ - test_time_nan_fracs - - Generate alarms for any time units whose missingness exceeds a threshold - - """ + Class that mediates between the InputGate class and the testing functions. Brings together the relevant + test function, the partitions, the threshold, the message describing the test in the alarms, and the input data + - tensor, index, and feature names (required for useful reporting in the alarms) - results = self.get_time_nan_fracs()/threshold - offenders = np.where(results>1)[0] + """ - if len(offenders) > 0: - alarms = [] - for offender, severity in zip(offenders, results): - al = Alarm( - f"Missingness for time unit {self.times.index_to_time[offender]} " - f"exceeded threshold {threshold}", - int(1+severity)) - alarms.append(al) + def __init__(self, test_function=None, + test_partition_length=1, + standard_partition_length=1, + threshold=0, + message='', + data=None, + index=None, + features=None, + ): - return alarms - else: - return None - - def get_space_nan_fracs(self): - """ - get_space_nan_fracs + self.test_function = test_function + self.test_partition_length = test_partition_length + self.standard_partition_length = standard_partition_length + self.threshold = threshold + self.message = message + self.data = data + self.index = index + self.features = features - Compute missing fractions for all space units + def generate_alarms(self): """ + generate alarms - space_nan_fracs = [] - nans = np.where(np.isnan(self.tensor), 1, 0) - for ispace in range(self.tensor.shape[1]): - space_nan_fracs.append( - np.count_nonzero(nans[:, ispace, :]) / np.count_nonzero(self.uoa_mask[:, ispace, :]) - ) - - return np.array(space_nan_fracs) + Calls the object's assigned testing function with a kwarg dict that the function picks and chooses from + as needed. + The function returns an array of results and a dictionary translating the indexes of the results in the + array into units of analysis or features for reporting in the alarms. - def test_space_nan_fracs(self, threshold): """ - test_space_nan_fracs - Generate alarms for any space units whose missingness exceeds a threshold + results, translation_dict = self.test_function( + tensor=self.data, + index=self.index, + features=self.features, + test_partition_length=self.test_partition_length, + standard_partition_length=self.standard_partition_length) - """ + results /= self.threshold - results = self.get_space_nan_fracs() / threshold - offenders = np.where(results > 1)[0] - - if len(offenders) > 0: - alarms = [] - for offender, severity in zip(offenders, results): - al = Alarm( - f"Missingness for space unit {self.spaces.index_to_space[offender]} " - f"exceeded threshold {threshold}", - int(1+severity)) - alarms.append(al) + try: + offenders = np.where(results > 1)[0] + severities = results[offenders] + except: - return alarms - else: return None - def get_feature_nan_fracs(self): - """ - get_feature_nan_fracs - - Compute missing fractions for all features - - """ - feature_nan_fracs = [] - nans = np.where(np.isnan(self.tensor), 1, 0) - for ifeature in range(self.tensor.shape[2]): - feature_nan_fracs.append( - np.count_nonzero(nans[:, :, ifeature]) / np.count_nonzero(self.uoa_mask[:, :, ifeature]) - ) - - return np.array(feature_nan_fracs) - - def test_feature_nan_fracs(self, threshold): - """ - test_feature_nan_fracs - - Generate alarms for any features whose missingness exceeds a threshold - - """ - - results = self.get_feature_nan_fracs() / threshold - offenders = np.where(results > 1)[0] - if len(offenders) > 0: alarms = [] - for offender, severity in zip(offenders, results): - al = Alarm( - f"Missingness for feature {self.columns[offender]} " - f"exceeded threshold {threshold}", - int(1+severity)) - alarms.append(al) - - return alarms - else: - return None - - def get_delta_completeness(self): - """ - get_delta_completeness - - Compute delta_completenesses for every feature with the specified standard (i.e. trustworthy) and test - (untrustworthy) partitions. - - """ - - delta_completenesses = [] + for offender, severity in zip(offenders, severities): + if translation_dict is not None: + offender_id = translation_dict[offender] + else: + offender_id = offender - standard, test = self.partition() - - for ifeature in range(self.tensor.shape[2]): - standard_nans = np.where(np.isnan(standard[:, :, ifeature]), 1, 0) - test_nans = np.where(np.isnan(test[:, :, ifeature]), 1, 0) - - standard_uoa_mask = ~np.where(standard == config.default_dne, True, False) - test_uoa_mask = ~np.where(test == config.default_dne, True, False) - - standard_nan_frac = np.count_nonzero(standard_nans)/(np.count_nonzero(standard_uoa_mask)+1e-20) - test_nan_frac = np.count_nonzero(test_nans)/(np.count_nonzero(test_uoa_mask)+1e-20) - - delta_completenesses.append(np.abs(test_nan_frac-standard_nan_frac)/(standard_nan_frac+1e-20)) - - return np.array(delta_completenesses) - - def test_delta_completeness(self, threshold): - """ - test_delta_completeness - - Generate alarms for any features whose delta-completeness exceeds a threshold - - """ - - results = self.get_delta_completeness()/threshold - - offenders = np.where(results > 1)[0] - - if len(offenders) > 0: - alarms = [] - for offender, severity in zip(offenders, results): al = Alarm( - f"Delta-completeness for feature {self.columns[offender]} " - f"exceeded threshold {threshold}", + f"Input warning: {self.message}; offender: {offender_id}, " + f"threshold: {self.threshold},", int(1+severity)) alarms.append(al) @@ -244,60 +94,74 @@ def test_delta_completeness(self, threshold): else: return None - def partition(self): - """ - partitioner - Partition the input data tensor according to partitions in drift_config +class InputGate: - """ + """ + InputGate - tend = self.times.times[-1] - tboundary = tend - self.default_config_dict['test_partition_length'] - tstart = tboundary - self.default_config_dict['standard_partition_length'] + Class which superintends the input warning machinery. Accepts a dataframe containing the data to be examined + and a configuration dictionary which users can use to override the default settings in config_drift. - tstart_index = self.times.time_to_index[tstart] - tboundary_index = self.times.time_to_index[tboundary] - tend_index = self.times.time_to_index[tend] + The df is converted to a tensor container and non-numeric parts of the data are stripped out. - standard_partition, test_partition = (tstart_index, tboundary_index), (tboundary_index, tend_index) + """ - standard_data = self.tensor[standard_partition[0]:standard_partition[1], :, :] - test_data = self.tensor[test_partition[0]:test_partition[1], :, :] + def __init__(self, df, drift_config_dict=None): + self.config_dict = drift_config_dict + self.tensor_container = objects.ViewsDataframe(df).to_numpy_time_space() + self.numeric_part = self.tensor_container.get_numeric_part() + self.tensor = self.numeric_part.tensor + self.index = self.tensor_container.index + self.columns = self.numeric_part.columns - return standard_data, test_data + self.default_config_dict = config.default_config_dict + self.testers = [] def assemble_alerts(self): """ assemble_alerts - Get lists of alerts generated by alert-generators + Method which compares the default configuration dictionary with that supplied when the InputGate object is + instantiated, removes test functions that are not required and updates thresholds of those requested. + + The resulting configuration dictionary is then used to generate a list of Tester objects, whose + generate_alarm methods are then called in sequence. """ # override defaults - + if self.config_dict is None: + self.config_dict = self.default_config_dict + else: + for key in self.default_config_dict.keys(): + if key in self.config_dict.keys(): + try: + detector_dict = self.default_config_dict[key] + detector_dict['threshold'] = self.config_dict[key]['threshold'] + self.config_dict[key] = detector_dict + except: + pass + else: + try: + dummy = self.default_config_dict[key]['threshold'] + except: + self.config_dict[key] = self.default_config_dict[key] + + testers = [] for key in self.config_dict.keys(): - if key not in self.default_config_dict.keys(): - raise KeyError(f'missingness {key} not in allowed missingness types:' - f'{self.default_config_dict.keys()}') - - else: - self.default_config_dict[key] = self.config_dict[key] - - alerts = [] - for key in self.default_config_dict.keys(): - if key in self.config_dict.keys(): - match key: - case 'global_missingness': - alerts.append(self.test_global_nan_frac(self.default_config_dict[key])) - case 'time_missingness': - alerts.append(self.test_time_nan_fracs(self.default_config_dict[key])) - case 'space_missingness': - alerts.append(self.test_space_nan_fracs(self.default_config_dict[key])) - case 'feature_missingness': - alerts.append(self.test_feature_nan_fracs(self.default_config_dict[key])) - case 'delta_completeness': - alerts.append(self.test_delta_completeness(self.default_config_dict[key])) - - return alerts + try: + tester_dict = self.config_dict[key] + testers.append(Tester(test_function=tester_dict['test_function'], + test_partition_length=self.config_dict['test_partition_length'], + standard_partition_length=self.config_dict['standard_partition_length'], + threshold=tester_dict['threshold'], + message=tester_dict['message'], + data=self.tensor, + index=self.index, + features=self.columns, + )) + except: + pass + + return [tester.generate_alarms() for tester in testers] diff --git a/viewser/commands/queryset/integrity_checks.py b/viewser/commands/queryset/integrity_checks.py new file mode 100644 index 0000000..d0e9119 --- /dev/null +++ b/viewser/commands/queryset/integrity_checks.py @@ -0,0 +1,298 @@ +import numpy as np +from . import config_drift as config +from views_tensor_utilities import mappings, objects +import scipy +from pyod.models.ecod import ECOD + + +def get_valid_uoa_mask(tensor): + """ + get_valid_uoa_mask + + Compute a boolean mask to mask out units-of-analysis which do not exist (e.g countries that do not exist + in a given month) + + """ + + return ~np.where(tensor == config.default_dne, True, False) + + +def partitioner(tensor, index, test_partition_length, standard_partition_length): + """ + partitioner + + Partition the input data tensor according to partitions in drift_config + + """ + + times = mappings.TimeUnits.from_pandas(index) + + tend = times.times[-1] + tboundary = tend - test_partition_length + tstart = tboundary - standard_partition_length + + tstart_index = times.time_to_index[tstart] + tboundary_index = times.time_to_index[tboundary] + tend_index = times.time_to_index[tend] + + standard_partition, test_partition = (tstart_index, tboundary_index), (tboundary_index, tend_index) + + standard_data = tensor[standard_partition[0]:standard_partition[1], :, :] + test_data = tensor[test_partition[0]:test_partition[1], :, :] + + return standard_data, test_data + + +def get_global_nan_fracs(**kwargs): + + tensor = kwargs['tensor'] + + uoa_mask = get_valid_uoa_mask(tensor) + + results = np.count_nonzero(np.isnan(tensor[uoa_mask]))/np.count_nonzero(uoa_mask) + + return np.array([results, 0.0]), None + + +def get_time_nan_fracs(**kwargs): + """ + get_time_nan_fracs + + Compute missing fractions for all time units + + """ + + tensor = kwargs['tensor'] + index = kwargs['index'] + + times = mappings.TimeUnits.from_pandas(index) + + uoa_mask = get_valid_uoa_mask(tensor) + + time_nan_fracs = [] + nans = np.where(np.isnan(tensor), 1, 0) + for itime in range(tensor.shape[0]): + time_nan_fracs.append(np.count_nonzero(nans[itime, :, :])/np.count_nonzero(uoa_mask[itime, :, :])) + + return np.array(time_nan_fracs), times.index_to_time + + +def get_space_nan_fracs(**kwargs): + """ + get_space_nan_fracs + + Compute missing fractions for all space units + + """ + + tensor = kwargs['tensor'] + index = kwargs['index'] + + spaces = mappings.SpaceUnits.from_pandas(index) + + uoa_mask = get_valid_uoa_mask(tensor) + + space_nan_fracs = [] + nans = np.where(np.isnan(tensor), 1, 0) + for ispace in range(tensor.shape[1]): + space_nan_fracs.append(np.count_nonzero(nans[:, ispace, :]) / np.count_nonzero(uoa_mask[:, ispace, :])) + + return np.array(space_nan_fracs), spaces.index_to_space + + +def get_feature_nan_fracs(**kwargs): + """ + get_feature_nan_fracs + + Compute missing fractions for all features + + """ + + tensor = kwargs['tensor'] + index_to_feature = {} + for ifeature, feature in enumerate(kwargs['features']): + index_to_feature[ifeature] = feature + + uoa_mask = get_valid_uoa_mask(tensor) + + feature_nan_fracs = [] + nans = np.where(np.isnan(tensor), 1, 0) + for ifeature in range(tensor.shape[2]): + feature_nan_fracs.append(np.count_nonzero(nans[:, :, ifeature]) / np.count_nonzero(uoa_mask[:, :, ifeature])) + + return np.array(feature_nan_fracs), index_to_feature + + +def get_delta_completeness(**kwargs): + """ + get_delta_completeness + + Compute delta_completenesses for every feature with the specified standard (i.e. trustworthy) and test + (untrustworthy) partitions. + + """ + + tensor = kwargs['tensor'] + index = kwargs['index'] + test_partition_length = kwargs['test_partition_length'] + standard_partition_length = kwargs['standard_partition_length'] + index_to_feature = {} + for ifeature, feature in enumerate(kwargs['features']): + index_to_feature[ifeature] = feature + + delta_completenesses = [] + + standard, test = partitioner(tensor, index, test_partition_length, standard_partition_length) + + for ifeature in range(tensor.shape[2]): + standard_nans = np.where(np.isnan(standard[:, :, ifeature]), 1, 0) + test_nans = np.where(np.isnan(test[:, :, ifeature]), 1, 0) + + standard_uoa_mask = ~np.where(standard == config.default_dne, True, False) + test_uoa_mask = ~np.where(test == config.default_dne, True, False) + + standard_nan_frac = np.count_nonzero(standard_nans)/(np.count_nonzero(standard_uoa_mask)+1e-20) + test_nan_frac = np.count_nonzero(test_nans)/(np.count_nonzero(test_uoa_mask)+1e-20) + + delta_completenesses.append(np.abs(test_nan_frac-standard_nan_frac)/(standard_nan_frac+1e-20)) + + return np.array(delta_completenesses), index_to_feature + + +def get_delta_zeroes(**kwargs): + """ + get_delta_zeroes + + Compute delta in zero-fraction for every feature with the specified standard (i.e. trustworthy) and test + (untrustworthy) partitions. + + """ + + tensor = kwargs['tensor'] + index = kwargs['index'] + test_partition_length = kwargs['test_partition_length'] + standard_partition_length = kwargs['standard_partition_length'] + index_to_feature = {} + for ifeature, feature in enumerate(kwargs['features']): + index_to_feature[ifeature] = feature + + delta_zeroes = [] + + standard, test = partitioner(tensor, index, test_partition_length, standard_partition_length) + + for ifeature in range(tensor.shape[2]): + standard_zeroes = np.where(standard[:, :, ifeature] == 0, 1, 0) + test_zeroes = np.where(test[:, :, ifeature] == 0, 1, 0) + + standard_uoa_mask = ~np.where(standard == config.default_dne, True, False) + test_uoa_mask = ~np.where(test == config.default_dne, True, False) + + standard_zero_frac = np.count_nonzero(standard_zeroes)/(np.count_nonzero(standard_uoa_mask)+1e-20) + test_zero_frac = np.count_nonzero(test_zeroes)/(np.count_nonzero(test_uoa_mask)+1e-20) + + delta_zeroes.append(np.abs(test_zero_frac-standard_zero_frac)/(standard_zero_frac+1e-20)) + + return np.array(delta_zeroes), index_to_feature + + +def get_ks_drift(**kwargs): + + """ + get_ks_drift + + Function which uses a two-sample Kolmogorov-Smirnoff test to determine the probability (in the sense of an + hypothesis test) that the data in the test partition and standard partition are drawn from the same + underlying distribution (regardless of what that distribution might be). + + To harmonise functionality with the rest of the testing machinery, what is returned is 1/(p_value), so that + large numbers represent likely divergence between the test and standard data. + + Only the distribution of non-zero values is tested. If either partition contains only zeros, a very large + dummy value is returned as a test result. + + """ + + tensor = kwargs['tensor'] + index = kwargs['index'] + test_partition_length = kwargs['test_partition_length'] + standard_partition_length = kwargs['standard_partition_length'] + index_to_feature = {} + for ifeature, feature in enumerate(kwargs['features']): + index_to_feature[ifeature] = feature + + ks_pvalues = [] + + standard, test = partitioner(tensor, index, test_partition_length, standard_partition_length) + + for ifeature in range(tensor.shape[2]): + + standard_feature = standard[:, :, ifeature] + test_feature = test[:, :, ifeature] + + standard_feature = standard_feature[standard_feature > 0] + test_feature = test_feature[test_feature > 0] + + if len(standard_feature) == 0 or len(test_feature) == 0: + ks_pvalues.append(1e20) + else: + ks_pvalues.append(1./scipy.stats.ks_2samp(standard_feature, test_feature).pvalue) + + return np.array(ks_pvalues), index_to_feature + + +def get_ecod_drift(**kwargs): + + """ + get_ecod_drift + + Function that uses outlier detection (Empirical Cumulative Distribution functions) to detect drift + between the standard and test partitions. This test can be done simultaneously on the entire + (time x space * feature) datacube. + + An outlier model is generated for the standard partition, which will contains a fraction of examples + labelled as outliers. + + The same model is then used to label the examples in the test partition and the fraction of outliers + compared with that in the standard partition. A substantial difference between these two outlier + fractions is taken to indicate possible inconsistency between the partitions. + + """ + + tensor = kwargs['tensor'] + index = kwargs['index'] + + test_partition_length = kwargs['test_partition_length'] + standard_partition_length = kwargs['standard_partition_length'] + + ecod_drifts = [] + + standard, test = partitioner(tensor, index, test_partition_length, standard_partition_length) + + # rearrange tensors to panels + + standard_panel = standard.reshape(-1, standard.shape[-1]) + test_panel = test.reshape(-1, test.shape[-1]) + + # eliminate NaNs + + standard_panel = standard_panel[~np.isnan(standard_panel).any(axis=1)] + test_panel = test_panel[~np.isnan(test_panel).any(axis=1)] + + # eliminate =/- Infs + + standard_panel = standard_panel[np.isfinite(standard_panel).any(axis=1)] + test_panel = test_panel[np.isfinite(test_panel).any(axis=1)] + + clf = ECOD() + clf.fit(standard_panel) + standard_labels = clf.labels_ + outlier_fraction_standard = np.count_nonzero(standard_labels)/len(standard_labels) + + test_labels = clf.predict(test_panel) + outlier_fraction_test = np.count_nonzero(test_labels)/len(test_labels) + + ecod_drifts.append(abs(outlier_fraction_test - outlier_fraction_standard)/(outlier_fraction_standard + 1e-20)) + + ecod_drifts.append(0.0) + + return np.array(ecod_drifts), None diff --git a/viewser/commands/queryset/operations.py b/viewser/commands/queryset/operations.py index e4433be..cc70065 100644 --- a/viewser/commands/queryset/operations.py +++ b/viewser/commands/queryset/operations.py @@ -5,7 +5,7 @@ """ import sys import time -from typing import Optional +from typing import Optional, Dict from urllib import parse from tqdm import tqdm import json @@ -19,6 +19,7 @@ from IPython.display import clear_output from . import queryset_list +from . import drift_detection logger = logging.getLogger(__name__) @@ -56,6 +57,37 @@ def fetch(self, queryset_name: str) -> pd.DataFrame: return f + def fetch_with_drift_detection(self, queryset_name: str, start_date: int, end_date: int, drift_config_dict: + Optional[Dict]): + """ + fetch_with_drift_detection + ===== + + parameters: + queryset_name (str): Name of the queryset to fetch + start_date: first month to include in output + end_data: last month to include in output + drift_config_dict: dictionary specifying which drift detection parameters to use + + returns: + Dataframe corresponding to queryset (if query succeeds) + + """ + + f = self._fetch( + self._max_retries, + self._remote_url, + queryset_name, + ).loc[start_date:end_date] + + f.index = f.index.remove_unused_levels() + + input_gate = drift_detection.InputGate(f, drift_config_dict=drift_config_dict) + + alerts = input_gate.assemble_alerts() + + return f, alerts + def list(self) -> queryset_list.QuerysetList: """ list From 58fb37350f8095253992af10cc5f5982891c3bdd Mon Sep 17 00:00:00 2001 From: jimdale Date: Mon, 15 Apr 2024 10:03:59 +0200 Subject: [PATCH 6/8] add extreme value detector --- viewser/commands/queryset/config_drift.py | 4 +++ viewser/commands/queryset/integrity_checks.py | 32 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/viewser/commands/queryset/config_drift.py b/viewser/commands/queryset/config_drift.py index b927036..f2b89e9 100644 --- a/viewser/commands/queryset/config_drift.py +++ b/viewser/commands/queryset/config_drift.py @@ -31,6 +31,10 @@ 'test_function': ic.get_delta_zeroes, 'message': 'feature delta_zeroes'}, + 'extreme_values': {'threshold': 4.0, + 'test_function': ic.get_extreme_values, + 'message': 'feature extreme values'}, + 'ks_drift': {'threshold': 100., 'test_function': ic.get_ks_drift, 'message': 'feature KS drift'}, diff --git a/viewser/commands/queryset/integrity_checks.py b/viewser/commands/queryset/integrity_checks.py index d0e9119..a33a1ea 100644 --- a/viewser/commands/queryset/integrity_checks.py +++ b/viewser/commands/queryset/integrity_checks.py @@ -195,6 +195,38 @@ def get_delta_zeroes(**kwargs): return np.array(delta_zeroes), index_to_feature +def get_extreme_values(**kwargs): + + tensor = kwargs['tensor'] + index = kwargs['index'] + test_partition_length = kwargs['test_partition_length'] + standard_partition_length = kwargs['standard_partition_length'] + index_to_feature = {} + for ifeature, feature in enumerate(kwargs['features']): + index_to_feature[ifeature] = feature + + extreme_values = [] + + standard, test = partitioner(tensor, index, test_partition_length, standard_partition_length) + standard_non_zero = np.where(np.logical_or(standard == config.default_dne, np.isnan(standard)), np.nan, standard) + + for ifeature in range(tensor.shape[2]): + + standard_mean_non_zero = np.nanmean(standard_non_zero[:, :, ifeature]) + standard_sigma_non_zero = np.nanstd(standard_non_zero[:, :, ifeature]) + + test_max = np.max(test[:, :, ifeature]) + +# extreme_values.append(abs(test_max-standard_mean_non_zero)/(standard_sigma_non_zero + 1e-20)) + + extreme_values.append(abs(test_max - standard_mean_non_zero)/(standard_sigma_non_zero + 1e-20)) + + print(standard_mean_non_zero, standard_sigma_non_zero, test_max) + + print(extreme_values) + + return np.array(extreme_values), index_to_feature + def get_ks_drift(**kwargs): """ From e7ac6f2ab38bc681f6fb3497392e2e6fbd98241c Mon Sep 17 00:00:00 2001 From: jimdale Date: Mon, 29 Apr 2024 13:56:08 +0200 Subject: [PATCH 7/8] complete first version of input dirft detection machinery --- README.md | 86 ++++++++++++++++- viewser/commands/queryset/config_drift.py | 16 ++++ viewser/commands/queryset/drift_detection.py | 16 ++-- viewser/commands/queryset/integrity_checks.py | 92 +++++++++++++++++-- viewser/commands/queryset/models/queryset.py | 40 +++++++- viewser/commands/queryset/operations.py | 1 - 6 files changed, 227 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index f2d0a49..1563319 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,14 @@ Show docstring for a particular transform: `viewser transforms show ` +List querysets stored in the queryset database: + +`viewser querysets list` + +Produce code required to generate a queryset + +`viewser querysets show ` + ## Via API The full functionality of viewser is exposed via its API for use in scripts and notebooks @@ -81,6 +89,8 @@ The full functionality of viewser is exposed via its API for use in scripts and The two fundamental objects used to define what data is fetched by the client are the *Queryset* and the *Column*, where a Queryset consists of one or more Columns. +### Defining a new queryset from scratch + To define a queryset, one first imports the Queryset and Column classes `from viewser import Queryset, Column` @@ -124,7 +134,7 @@ If the wrong loa is specified, the queryset will be rejected by the server and a The final argument to the Column instance is the name of the raw column to be fetched from the database. If a non-existant column is requested, the queryset will be rejected by the server and an error message detailing which columns are unavailable will be returned. -## Aggregation/disaggregation +#### Aggregation/disaggregation The definition of a queryset must include the *target* level of analysis, at which the resulting data will be presented to the user. @@ -153,7 +163,7 @@ It is up to users to ensure that they select the correct aggregation functions. If a non-existent aggregation function is specified, the queryset will be rejected by the server and an error message detailing which columns have incorrect aggregation functions will be returned. -## Transforms +#### Transforms Any queryset column may specify an arbitrary number of transforms to be done to the raw data *after* any necessary aggregation/disaggregation has been done. @@ -174,13 +184,29 @@ A list of available transforms can be obtained using the viewser CLI. A notebook Note that not all transforms are available at all levels of analysis. If a transform is requested at an innapproprite loa, the queryset will be rejected by the server and an error message detailing which columns have requested incompatible transforms and loas will be returned. -## Publising a queryset +### Making a new queryset by merging two or more existing querysets + +It is sometimes desirable to make a larger queryset by merging several existing querysets. This can be done with the from_merger method. The method requires at mininum a list of querysets to be merged and a name for the merged queryset. Optionally, a theme and description can also be passed. There is also a boolean verbose flag, described below. +For example + + querysets_to_merge = ['queryset1','querysets2','queryset3'] + merged_queryset = Queryset.from_merger(querysets_to_merge,'my_merged_queryset',theme='my_theme',description='description') + +Before merging, some checks are performed. The querysets to be merged must all have the same target LOA. If the querysets to be merged contain two or more columns with the same name, the method checks that all the definitions of that column are exactly the same (same raw data, same transforms with same parameters). If this is the case, one copy of this column is included in the merged queryset (if the verbose flag is True, the method reports that this has been done). If there are multiple definitions of the columns with the same column name, the attempt at merging is aborted. + +### Recreating a queryset from storage + +If a queryset has already been published to the queryset store (see below), the queryset object can be regenerated by doing + + queryset = Queryset.from_storage(queryset_name) + +### Publising a queryset -Before a queryset can be fetched, it must be published to a permanent database on the server. This is done using the `publish()` method: +Before a new queryset (written from scratch or created by merging existing querysets) can be fetched, it must be published to a permanent database on the server. This is done using the `publish()` method: data = new_queryset.publish() -## Fetching a queryset +### Fetching a queryset A published queryset can be fetched using the `.fetch()` method @@ -269,6 +295,56 @@ This message indicates only that the queryset is waiting in the transform queue. When all transforms have completed, downloading of the completed dataframe begins. The status message at this point will cease updating, which can make it appear that the client is hung in the case of large querysets. Users are asked to be patient :) . +## Input drift detection + +The viewser package is able to perform a series of tests on the data fetched from the remote service which are designed to detect, and issue warnings about, anomalies in the data. + +Two broad types of anomaly can be monitored + +### Global anomalies + +These examine the whole dataset, whatever its dimensions (thought on terms of time_units x space_units x features). The available anomaly detectors are + + - global_missingness: simply reports if the total fraction of missing (i.e. NaN) values across the whole dataset exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05. + + + - global zeros: reports if the total fraction of zero values across the whole dataset exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05. + + + - time_missingness: reports if the fraction of missingness across any (space_units x features) slices exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05. + + + - space_missingness: reports if the fraction of missingness across any (time_units x features) slices exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05. + + + - feature_missingness: reports if the fraction of missingness for any feature (over all time and space units) exceeds a threshold. Threshold should be a small number between 0 and 1, e.g. 0.05. + + + - time_zeros: reports if the fraction of zeros across any (space_units x features) slices exceeds a threshold. Threshold should be a number between 0 and 1 and close to 1, e.g. 0.95. + + + - space_zeros: reports if the fraction of zeros across any (time_units x features) slices exceeds a threshold. Threshold should be a number between 0 and 1 close to 1, e.g. 0.95. + + + - feature_zeros: reports if the fraction of zeros for any feature (over all time and space units) exceeds a threshold. Threshold should be a number between 0 and 1 close to 1, e.g. 0.95. + + +### Recent data anomalies +These partition the dataset into three partitions, defined by two integers n and m. If the most recent time unit in the dateset is k: the test partition consists of the most recent n time units, i.e. k-n+1 to k inclusive (usually n would be 1 so the test parition simply consists of the most recent time unit k), the standard partition consists of the most recent k-m-n to k-n time units. The time units before k-m-n are discarded. The available anomaly detectors are +- delta_completeness: reports, for each feature, if the ratio of missingness fractions in the test and standard partitions is greater than a threshold. Threshold should be a number between 0 and 1, e.g. 0.25. + + +- delta_zeros: reports, for each feature, if the ratio of the fraction of zeros in the test and standard partitions is greater than a threshold. Threshold should be a number between 0 and 1, e.g. 0.25. + + +- extreme_values: reports, for each feature, if the most extreme value in the test partition is more than (threshold) standard deviations from the mean of the data in the test partition. Threshold should be a number in the range 2-7, e.g. 5. + + +- ks_drift: for each feature, performs a two-sample Kolmogorov-Smirnoff test (https://en.wikipedia.org/wiki/Kolmogorov–Smirnov_test#Two-sample_Kolmogorov–Smirnov_test) between the data in the test and standard partitions and reports if (1/the returned p-value) exceeds a threshold. Threshold should be a large number, e.g. 100. + + +- ecod_drift: for all features simultaneously, reports if the fraction of data-points considered outliers in the test partition exceeds that in the standard partition, according to an ECOD model (https://pyod.readthedocs.io/en/latest/_modules/pyod/models/ecod.html#ECOD) trained on the standard partition, exceeds a threshold. Threshold should be a number between 0 and 1, e.g. 0.25. + ## Funding The contents of this repository is the outcome of projects that have received funding from the European Research Council (ERC) under the European Union’s Horizon 2020 research and innovation programme (Grant agreement No. 694640, *ViEWS*) and Horizon Europe (Grant agreement No. 101055176, *ANTICIPATE*; and No. 101069312, *ViEWS* (ERC-2022-POC1)), Riksbankens Jubileumsfond (Grant agreement No. M21-0002, *Societies at Risk*), Uppsala University, Peace Research Institute Oslo, the United Nations Economic and Social Commission for Western Asia (*ViEWS-ESCWA*), the United Kingdom Foreign, Commonwealth & Development Office (GSRA – *Forecasting Fatalities in Armed Conflict*), the Swedish Research Council (*DEMSCORE*), the Swedish Foundation for Strategic Environmental Research (*MISTRA Geopolitics*), the Norwegian MFA (*Conflict Trends* QZA-18/0227), and the United Nations High Commissioner for Refugees (*the Sahel Predictive Analytics project*). diff --git a/viewser/commands/queryset/config_drift.py b/viewser/commands/queryset/config_drift.py index f2b89e9..18afb93 100644 --- a/viewser/commands/queryset/config_drift.py +++ b/viewser/commands/queryset/config_drift.py @@ -11,6 +11,10 @@ 'test_function': ic.get_global_nan_fracs, 'message': 'dataset missingness'}, + 'global_zeros': {'threshold': 0.95, + 'test_function': ic.get_global_zero_fracs, + 'message': 'dataset zero'}, + 'time_missingness': {'threshold': 0.01, 'test_function': ic.get_time_nan_fracs, 'message': 'time-unit missingness'}, @@ -23,6 +27,18 @@ 'test_function': ic.get_feature_nan_fracs, 'message': 'feature missingness'}, + 'time_zeros': {'threshold': 0.95, + 'test_function': ic.get_time_zero_fracs, + 'message': 'time-unit zero'}, + + 'space_zeros': {'threshold': 0.95, + 'test_function': ic.get_space_zero_fracs, + 'message': 'space-unit zero'}, + + 'feature_zeros': {'threshold': 0.95, + 'test_function': ic.get_feature_zero_fracs, + 'message': 'feature zero'}, + 'delta_completeness': {'threshold': 1.25, 'test_function': ic.get_delta_completeness, 'message': 'feature delta_completeness'}, diff --git a/viewser/commands/queryset/drift_detection.py b/viewser/commands/queryset/drift_detection.py index b1b948f..26c9f9c 100644 --- a/viewser/commands/queryset/drift_detection.py +++ b/viewser/commands/queryset/drift_detection.py @@ -5,12 +5,12 @@ import datetime -class Alarm: +class InputAlarm: def __repr__(self): - return f"Alarm: {self.message} Severity: {self.severity} Timestamp: {self.timestamp}" + return f"Input alarm: {self.message} Severity: {self.severity} Timestamp: {self.timestamp}\n" def __str__(self): - return f"Alarm: {self.message} Severity: {self.severity} Timestamp: {self.timestamp}" + return f"Input alarm: {self.message} Severity: {self.severity} Timestamp: {self.timestamp}\n" def __init__(self, message, severity=1): self.message = message @@ -84,15 +84,15 @@ def generate_alarms(self): else: offender_id = offender - al = Alarm( - f"Input warning: {self.message}; offender: {offender_id}, " - f"threshold: {self.threshold},", + al = InputAlarm( + f"{self.message}; offender: {offender_id}, " + f"threshold: {self.threshold}", int(1+severity)) alarms.append(al) return alarms else: - return None + return f"{self.message} passed" class InputGate: @@ -144,7 +144,7 @@ def assemble_alerts(self): pass else: try: - dummy = self.default_config_dict[key]['threshold'] + _ = self.default_config_dict[key]['threshold'] except: self.config_dict[key] = self.default_config_dict[key] diff --git a/viewser/commands/queryset/integrity_checks.py b/viewser/commands/queryset/integrity_checks.py index a33a1ea..ebd2d8f 100644 --- a/viewser/commands/queryset/integrity_checks.py +++ b/viewser/commands/queryset/integrity_checks.py @@ -54,6 +54,17 @@ def get_global_nan_fracs(**kwargs): return np.array([results, 0.0]), None +def get_global_zero_fracs(**kwargs): + + tensor = kwargs['tensor'] + + uoa_mask = get_valid_uoa_mask(tensor) + + results = 1. - np.count_nonzero(tensor[uoa_mask])/np.count_nonzero(uoa_mask) + + return np.array([results, 0.0]), None + + def get_time_nan_fracs(**kwargs): """ get_time_nan_fracs @@ -123,6 +134,76 @@ def get_feature_nan_fracs(**kwargs): return np.array(feature_nan_fracs), index_to_feature +def get_time_zero_fracs(**kwargs): + """ + get_time_nan_fracs + + Compute missing fractions for all time units + + """ + + tensor = kwargs['tensor'] + index = kwargs['index'] + + times = mappings.TimeUnits.from_pandas(index) + + uoa_mask = get_valid_uoa_mask(tensor) + + time_zero_fracs = [] + + for itime in range(tensor.shape[0]): + time_zero_fracs.append(1. - np.count_nonzero(tensor[itime, :, :])/np.count_nonzero(uoa_mask[itime, :, :])) + + return np.array(time_zero_fracs), times.index_to_time + + +def get_space_zero_fracs(**kwargs): + """ + get_space_nan_fracs + + Compute missing fractions for all space units + + """ + + tensor = kwargs['tensor'] + index = kwargs['index'] + + spaces = mappings.SpaceUnits.from_pandas(index) + + uoa_mask = get_valid_uoa_mask(tensor) + + space_zero_fracs = [] + + for ispace in range(tensor.shape[1]): + space_zero_fracs.append(1. - np.count_nonzero(tensor[:, ispace, :]) / np.count_nonzero(uoa_mask[:, ispace, :])) + + return np.array(space_zero_fracs), spaces.index_to_space + + +def get_feature_zero_fracs(**kwargs): + """ + get_feature_nan_fracs + + Compute missing fractions for all features + + """ + + tensor = kwargs['tensor'] + index_to_feature = {} + for ifeature, feature in enumerate(kwargs['features']): + index_to_feature[ifeature] = feature + + uoa_mask = get_valid_uoa_mask(tensor) + + feature_zero_fracs = [] + + for ifeature in range(tensor.shape[2]): + feature_zero_fracs.append(1. - np.count_nonzero(tensor[:, :, ifeature]) / + np.count_nonzero(uoa_mask[:, :, ifeature])) + + return np.array(feature_zero_fracs), index_to_feature + + def get_delta_completeness(**kwargs): """ get_delta_completeness @@ -217,16 +298,11 @@ def get_extreme_values(**kwargs): test_max = np.max(test[:, :, ifeature]) -# extreme_values.append(abs(test_max-standard_mean_non_zero)/(standard_sigma_non_zero + 1e-20)) - extreme_values.append(abs(test_max - standard_mean_non_zero)/(standard_sigma_non_zero + 1e-20)) - print(standard_mean_non_zero, standard_sigma_non_zero, test_max) - - print(extreme_values) - return np.array(extreme_values), index_to_feature + def get_ks_drift(**kwargs): """ @@ -265,7 +341,7 @@ def get_ks_drift(**kwargs): test_feature = test_feature[test_feature > 0] if len(standard_feature) == 0 or len(test_feature) == 0: - ks_pvalues.append(1e20) + ks_pvalues.append(1e10) else: ks_pvalues.append(1./scipy.stats.ks_2samp(standard_feature, test_feature).pvalue) @@ -310,7 +386,7 @@ def get_ecod_drift(**kwargs): standard_panel = standard_panel[~np.isnan(standard_panel).any(axis=1)] test_panel = test_panel[~np.isnan(test_panel).any(axis=1)] - # eliminate =/- Infs + # eliminate +/- Infs standard_panel = standard_panel[np.isfinite(standard_panel).any(axis=1)] test_panel = test_panel[np.isfinite(test_panel).any(axis=1)] diff --git a/viewser/commands/queryset/models/queryset.py b/viewser/commands/queryset/models/queryset.py index 0db663e..735e6a5 100644 --- a/viewser/commands/queryset/models/queryset.py +++ b/viewser/commands/queryset/models/queryset.py @@ -1,4 +1,5 @@ import logging +import requests from views_schema import queryset_manager as schema from viewser.commands.queryset.operations import QuerysetOperations from viewser import settings @@ -10,6 +11,7 @@ settings.QUERYSET_URL, defaults.default_error_handler()) + class Queryset(schema.Queryset): """ Queryset @@ -41,6 +43,40 @@ class Queryset(schema.Queryset): def __init__(self, name, loa): super().__init__(name=name, loa=loa, operations=[]) + @classmethod + def from_storage(cls, name): + + config = settings.config_resolver.ConfigResolver(settings.db.Session) + + remote_url = config.get("REMOTE_URL") + + response = requests.request(method="GET", url=f'{remote_url}/querysets/querysets/{name}') + + if response.status_code == 404: + raise RuntimeError(f'queryset {name} does not appear to be in the queryset store') + + json_ = response.json() + + allowed_fields = ['name', 'loa', 'description', 'themes', 'operations'] + allowed_namespaces = ['base', 'trf'] + + for key in json_.keys(): + if key not in allowed_fields: + raise RuntimeError(f'Queryset json contains unrecognised field: {key}') + + for column in json_['operations']: + for operation in column: + if operation['namespace'] not in allowed_namespaces: + raise RuntimeError(f"Queryset operation contains unrecognised namespace: {operation['namespace']}") + + qs = cls(name=json_['name'], loa=json_['loa']) + + qs.operations = json_['operations'] + qs.themes = json_['themes'] + qs.description = json_['description'] + + return qs + @classmethod def from_merger(cls, querysets, name, theme=None, description=None, verbose=False): @@ -104,7 +140,7 @@ def transform_to_string(column): qs_merged = cls(name=name, loa=loas[0]) qs_merged.operations = columns - qs_merged.themes = [] if theme is None else [theme,] + qs_merged.themes = [] if theme is None else [theme, ] qs_merged.description = description return qs_merged @@ -200,5 +236,5 @@ def fetch_with_drift_detection(self, *args, **kwargs): Requires a self.push first. """ logger.info(f"Fetching queryset {self.name}") - dataset = queryset_operations.fetch_with_drift_detection(self.name, *args, **kwargs)#.maybe(None, lambda x:x) + dataset = queryset_operations.fetch_with_drift_detection(self.name, *args, **kwargs) return dataset diff --git a/viewser/commands/queryset/operations.py b/viewser/commands/queryset/operations.py index 368aca8..e25d1f0 100644 --- a/viewser/commands/queryset/operations.py +++ b/viewser/commands/queryset/operations.py @@ -100,7 +100,6 @@ def fetch_with_drift_detection(self, queryset_name: str, start_date: str, end_da return f, alerts - def list(self): """ list From f0dcc20db075f5fd68b1a68b047468de9c1dff85 Mon Sep 17 00:00:00 2001 From: jimdale Date: Mon, 29 Apr 2024 15:44:02 +0200 Subject: [PATCH 8/8] bump version to 6.5.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8394194..a5ee658 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "viewser" -version = "6.4.2" +version = "6.5.0" description = "The Views 3 CLI tool" authors = ["peder2911 "] readme = "README.md"