Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement new anomaly detection in soda core #1995

Merged
merged 6 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ repos:
rev: v3.15.0
hooks:
- id: pyupgrade
args: [--py37-plus]
exclude: _models?\.py$
args: [--py38-plus, --keep-runtime-typing]
- repo: https://github.com/PyCQA/isort
rev: 5.13.2
hooks:
Expand Down
1 change: 1 addition & 0 deletions soda/core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"opentelemetry-exporter-otlp-proto-http~=1.16.0",
"sqlparse~=0.4",
"inflect~=7.0",
"pydantic>=2.0.0,<3.0.0",
]

setup(
Expand Down
174 changes: 174 additions & 0 deletions soda/core/soda/execution/check/anomaly_detection_metric_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from __future__ import annotations

from datetime import timezone
from typing import Any

from soda.cloud.historic_descriptor import (
HistoricCheckResultsDescriptor,
HistoricMeasurementsDescriptor,
)
from soda.common.exceptions import SODA_SCIENTIFIC_MISSING_LOG_MESSAGE
from soda.execution.check.metric_check import MetricCheck
from soda.execution.check_outcome import CheckOutcome
from soda.execution.column import Column
from soda.execution.data_source_scan import DataSourceScan
from soda.execution.metric.metric import Metric
from soda.execution.partition import Partition
from soda.sodacl.anomaly_detection_metric_check_cfg import (
AnomalyDetectionMetricCheckCfg,
)

KEY_HISTORIC_MEASUREMENTS = "historic_measurements"
KEY_HISTORIC_CHECK_RESULTS = "historic_check_results"


class AnomalyDetectionMetricCheck(MetricCheck):
def __init__(
self,
check_cfg: AnomalyDetectionMetricCheckCfg,
data_source_scan: DataSourceScan,
partition: Partition | None = None,
column: Column | None = None,
):
try:
super().__init__(
check_cfg=check_cfg,
data_source_scan=data_source_scan,
partition=partition,
column=column,
)
self.check_cfg: AnomalyDetectionMetricCheckCfg
self.skip_anomaly_check = False
metric_name = self.check_cfg.metric_name
metric = self.metrics[metric_name]
measurements_limit = self.check_cfg.training_dataset_params.window_length
self.historic_descriptors[KEY_HISTORIC_MEASUREMENTS] = HistoricMeasurementsDescriptor(
metric_identity=metric.identity,
limit=measurements_limit,
)
self.historic_descriptors[KEY_HISTORIC_CHECK_RESULTS] = HistoricCheckResultsDescriptor(
check_identity=self.create_identity(), limit=measurements_limit
)
self.diagnostics = {}
self.cloud_check_type = "anomalyDetection"
except Exception as e:
self.skip_anomaly_check = True
data_source_scan.scan._logs.error(
f"""An error occurred during the initialization of AnomalyMetricCheck. Please make sure"""
f""" that the metric '{check_cfg.metric_name}' is supported. For more information see"""
f""" the docs: https://docs.soda.io/soda-cl/anomaly-detection.html""",
exception=e,
)

def evaluate(self, metrics: dict[str, Metric], historic_values: dict[str, dict[str, Any]]) -> None:
if self.skip_anomaly_check:
return

if not isinstance(historic_values, dict):
self.logs.error(
"Getting historical measurements and check results from Soda Cloud resulted in a "
f"{type(historic_values)} object which is not compatible with anomaly detection. "
"Check previous log messages for more information."
)
return

historic_check_results = historic_values.get(KEY_HISTORIC_CHECK_RESULTS, {}).get("check_results", {})
historic_measurements = self.get_historic_measurements(metrics, historic_values)

# TODO test for module installation and set check status to is_skipped if the module is not installed
try:
from soda.scientific.anomaly_detection_v2.anomaly_detector import (
AnomalyDetector,
)
except ModuleNotFoundError as e:
self.logs.error(f"{SODA_SCIENTIFIC_MISSING_LOG_MESSAGE}\n Original error: {e}")
return

anomaly_detector = AnomalyDetector(
measurements=historic_measurements,
check_results=historic_check_results,
logs=self.logs,
model_cfg=self.check_cfg.model_cfg,
training_dataset_params=self.check_cfg.training_dataset_params,
)
level, diagnostics = anomaly_detector.evaluate()
assert isinstance(diagnostics, dict), f"Anomaly diagnostics should be a dict. Got a {type(diagnostics)} instead"

if diagnostics["anomalyErrorCode"] == "not_enough_measurements":
self.add_outcome_reason(
outcome_type=diagnostics["anomalyErrorCode"],
message="Anomaly detection needs at least 4 measurements",
severity=diagnostics["anomalyErrorSeverity"],
)
self.diagnostics = diagnostics
if self.diagnostics["value"] is None:
self.diagnostics["value"] = self.get_metric_value()
return
self.outcome = CheckOutcome(level)
self.diagnostics = diagnostics
if diagnostics["anomalyErrorSeverity"] in ["warn", "error"]:
self.add_outcome_reason(
outcome_type=diagnostics["anomalyErrorCode"],
message=diagnostics["anomalyErrorMessage"],
severity=diagnostics["anomalyErrorSeverity"],
)

def get_historic_measurements(
self, metrics: dict[str, Metric], historic_values: dict[str, dict[str, Any]]
) -> dict[str, list[dict[str, Any]]]:
metric_name = self.check_cfg.metric_name
historic_measurements = historic_values.get(KEY_HISTORIC_MEASUREMENTS, {}).get("measurements", {})
self.logs.debug(
"Anomaly Detection: using historical measurements " f"for identity {metrics[metric_name].identity}"
)
if not historic_measurements:
self.logs.warning(f"This is the first time that we derive {metrics[metric_name]} metric")
historic_measurements["results"] = []

# Append current results
historic_measurements.get("results", []).append(
{
"id": "dummy_id", # Placeholder number that will be overwritten
"identity": metrics[metric_name].identity,
"value": self.get_metric_value(),
"dataTime": (
self.data_source_scan.scan._data_timestamp.replace(tzinfo=timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
),
}
)
return historic_measurements

def get_cloud_diagnostics_dict(self) -> dict:
cloud_diagnostics = super().get_cloud_diagnostics_dict()
return {**cloud_diagnostics, **self.diagnostics}

def get_log_diagnostic_dict(self) -> dict:
log_diagnostics = super().get_log_diagnostic_dict()
if self.historic_diff_values:
log_diagnostics.update(self.diagnostics)
return log_diagnostics

def create_migrate_identities(self) -> dict[str, str] | None:
"""
This method is used to migrate the identites from anomaly score to anomaly detection.
It's a hack to obtain the same identity for the anomaly detection check as the anomaly score check.
"""
original_source_line = self.check_cfg.source_line.strip()
original_migrate_data_source_name = self.data_source_scan.data_source.migrate_data_source_name

hacked_source_line = original_source_line.replace("anomaly detection", "anomaly score") + " < default"
hacked_migrate_data_source_name = original_migrate_data_source_name
if original_migrate_data_source_name is None:
hacked_migrate_data_source_name = True

self.check_cfg.source_line = hacked_source_line
self.data_source_scan.data_source.migrate_data_source_name = hacked_migrate_data_source_name

identities = super().create_migrate_identities()

# Overwrite the original source line and migrate data source name to avoid confusion
self.check_cfg.source_line = original_source_line
self.data_source_scan.data_source.migrate_data_source_name = original_migrate_data_source_name
return identities
41 changes: 41 additions & 0 deletions soda/core/soda/execution/check/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from soda.execution.metric.metric import Metric
from soda.execution.query.query import Query
from soda.sampler.sample_ref import SampleRef
from soda.sodacl.anomaly_detection_metric_check_cfg import (
AnomalyDetectionMetricCheckCfg,
)
from soda.sodacl.check_cfg import CheckCfg
from soda.sodacl.distribution_check_cfg import DistributionCheckCfg
from soda.sodacl.group_by_check_cfg import GroupByCheckCfg
Expand All @@ -29,6 +32,9 @@ def create(
column: Column | None = None,
data_source_scan: DataSourceScan | None = None,
) -> Check | None:
from soda.sodacl.anomaly_detection_metric_check_cfg import (
AnomalyDetectionMetricCheckCfg,
)
from soda.sodacl.anomaly_metric_check_cfg import AnomalyMetricCheckCfg
from soda.sodacl.change_over_time_metric_check_cfg import (
ChangeOverTimeMetricCheckCfg,
Expand Down Expand Up @@ -59,6 +65,13 @@ def create(

return AnomalyMetricCheck(check_cfg, data_source_scan, partition, column)

elif isinstance(check_cfg, AnomalyDetectionMetricCheckCfg):
from soda.execution.check.anomaly_detection_metric_check import (
AnomalyDetectionMetricCheck,
)

return AnomalyDetectionMetricCheck(check_cfg, data_source_scan, partition, column)

elif isinstance(check_cfg, MetricCheckCfg):
from soda.execution.check.metric_check import MetricCheck

Expand Down Expand Up @@ -207,6 +220,12 @@ def create_identity(self, with_datasource: bool | str = False, with_filename: bo
identity_source_configurations.pop("attributes", None)
identity_source_configurations.pop("template", None)
identity_source_configurations.pop("warn_only", None)

# Exlude hyperparameters / tuning configurations from identity for anomaly detection checks
if isinstance(check_cfg, AnomalyDetectionMetricCheckCfg):
identity_source_configurations.pop("training_dataset_parameters", None)
identity_source_configurations.pop("model", None)

if len(identity_source_configurations) > 0:
# The next line ensures that ordering of the check configurations don't matter for identity
identity_source_configurations = collections.OrderedDict(sorted(identity_source_configurations.items()))
Expand Down Expand Up @@ -277,6 +296,28 @@ def create_identities(self):
identities["v4"] = identity
return identities

# Migrate Identities are created specifically to resolve https://sodadata.atlassian.net/browse/CLOUD-5447?focusedCommentId=30022
# and can eventually be removed when all checks are migrated.
def create_migrate_identities(self) -> dict[str, str] | None:
migrate_data_source_name = self.data_source_scan.data_source.migrate_data_source_name
if (
migrate_data_source_name is None
or self.data_source_scan.data_source.data_source_name == migrate_data_source_name
):
return None

identities = {
"v1": self.create_identity(with_datasource=False, with_filename=False),
"v2": self.create_identity(with_datasource=migrate_data_source_name, with_filename=False),
"v3": self.create_identity(with_datasource=migrate_data_source_name, with_filename=True),
# v4 is reserved for custom identity
}
if isinstance(self.check_cfg.source_configurations, dict):
identity = self.check_cfg.source_configurations.get("identity")
if isinstance(identity, str):
identities["v4"] = identity
return identities

def get_cloud_dict(self):
from soda.execution.column import Column
from soda.execution.partition import Partition
Expand Down
Loading