Skip to content

Commit

Permalink
User defined metric fail query (#2089)
Browse files Browse the repository at this point in the history
* User defined metric check: support failed rows query

* Test file version as well

* Fix CI

* Make metric check cfg contructor flexy

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
m1n0 and pre-commit-ci[bot] authored May 23, 2024
1 parent 09262b0 commit 5d1163c
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 25 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/main.workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ jobs:
sudo apt-get update
ACCEPT_EULA=Y sudo apt-get install -y libsasl2-dev msodbcsql18
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -132,6 +133,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -166,6 +168,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -194,6 +197,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr.workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
sudo apt-get update
ACCEPT_EULA=Y sudo apt-get install -y libsasl2-dev msodbcsql18
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -110,6 +111,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -142,6 +144,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -171,6 +174,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down
2 changes: 2 additions & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ readme-renderer~=32.0
certifi>=2022.12.07
wheel>=0.38.1
docutils<0.21 # 0.21 dropped py38 support, remove this after py38 support is gone
requests==2.31.0 # 2.32.0 is broken, does not support docker. Remove this after new version is out

8 changes: 5 additions & 3 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pathspec==0.12.1
# via black
pip-tools==7.4.1
# via -r dev-requirements.in
platformdirs==4.2.1
platformdirs==4.2.2
# via
# black
# virtualenv
Expand Down Expand Up @@ -100,7 +100,9 @@ python-dotenv==1.0.1
readme-renderer==32.0
# via -r dev-requirements.in
requests==2.31.0
# via docker
# via
# -r dev-requirements.in
# docker
schema==0.7.7
# via tbump
six==1.16.0
Expand Down Expand Up @@ -137,7 +139,7 @@ urllib3==1.26.18
# -r dev-requirements.in
# docker
# requests
virtualenv==20.26.1
virtualenv==20.26.2
# via tox
webencodings==0.5.1
# via bleach
Expand Down
20 changes: 18 additions & 2 deletions soda/core/soda/execution/metric/user_defined_numeric_metric.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from __future__ import annotations

from numbers import Number

from soda.execution.metric.query_metric import QueryMetric
from soda.execution.query.sample_query import SampleQuery
from soda.execution.query.user_defined_numeric_query import UserDefinedNumericQuery


class UserDefinedNumericMetric(QueryMetric):
def __init__(
self,
data_source_scan: "DataSourceScan",
data_source_scan: DataSourceScan,
check_name: str,
sql: str,
check: "Check" = None,
check: Check = None,
):
super().__init__(
data_source_scan=data_source_scan,
Expand All @@ -19,6 +24,7 @@ def __init__(
identity_parts=[sql],
)
self.sql = sql
self.check = check

def __str__(self):
return f'"{self.name}"'
Expand All @@ -38,3 +44,13 @@ def ensure_query(self):
)
self.queries.append(query)
self.data_source_scan.queries.append(query)

def create_failed_rows_sample_query(self) -> SampleQuery | None:
sampler = self.data_source_scan.scan._configuration.sampler
if sampler and isinstance(self.value, Number) and self.check.check_cfg.failed_rows_query:
if self.samples_limit > 0:
jinja_resolve = self.data_source_scan.scan.jinja_resolve
sql = jinja_resolve(self.check.check_cfg.failed_rows_query)
sample_query = SampleQuery(self.data_source_scan, self, "failed_rows", sql)

return sample_query
9 changes: 8 additions & 1 deletion soda/core/soda/execution/query/user_defined_numeric_query.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

from soda.execution.metric.metric import Metric
from soda.execution.query.query import Query


class UserDefinedNumericQuery(Query):
def __init__(
self,
data_source_scan: "DataSourceScan",
data_source_scan: DataSourceScan,
check_name: str,
sql: str,
metric: Metric,
Expand All @@ -22,3 +24,8 @@ def execute(self):
if self.row[index] is not None:
metric_value = float(self.row[index])
self.metric.set_value(metric_value)

sample_query = self.metric.create_failed_rows_sample_query()
if sample_query:
self.metric.queries.append(sample_query)
sample_query.execute()
2 changes: 2 additions & 0 deletions soda/core/soda/sodacl/check_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(
name: str | None,
samples_limit: int | None = None,
samples_columns: list | None = None,
failed_rows_query: str | None = None,
):
self.source_header: str = source_header
self.source_line: str = source_line
Expand All @@ -25,6 +26,7 @@ def __init__(
self.name: str | None = name
self.samples_limit: int | None = samples_limit
self.samples_columns: list | None = samples_columns
self.failed_rows_query: str | None = failed_rows_query

def get_column_name(self) -> str | None:
pass
Expand Down
11 changes: 10 additions & 1 deletion soda/core/soda/sodacl/metric_check_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,17 @@ def __init__(
fail_threshold_cfg: ThresholdCfg | None,
warn_threshold_cfg: ThresholdCfg | None,
samples_limit: int | None = None,
failed_rows_query: str | None = None,
):
super().__init__(source_header, source_line, source_configurations, location, name, samples_limit)
super().__init__(
source_header,
source_line,
source_configurations,
location,
name,
samples_limit,
failed_rows_query=failed_rows_query,
)
self.metric_name: str = metric_name
self.metric_args: list[object] | None = metric_args
self.missing_and_valid_cfg: MissingAndValidCfg = missing_and_valid_cfg
Expand Down
60 changes: 42 additions & 18 deletions soda/core/soda/sodacl/sodacl_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import functools
import inspect
import logging
import os
import re
Expand Down Expand Up @@ -619,6 +620,7 @@ def __parse_metric_check(
condition = None
metric_expression = None
metric_query = None
failed_rows_query = None
samples_limit = None
samples_columns = None
training_dataset_params: TrainingDatasetParameters = TrainingDatasetParameters()
Expand Down Expand Up @@ -657,6 +659,13 @@ def __parse_metric_check(
f'In configuration "{configuration_key}" the metric name must match exactly the metric name in the check "{metric_name}"',
location=self.location,
)
elif configuration_key == "failed rows query" or configuration_key == "failed rows sql_file":
if configuration_key.endswith("sql_file"):
fs = file_system()
sql_file_path = fs.join(fs.dirname(self.path_stack.file_path), configuration_value.strip())
failed_rows_query = dedent(fs.file_read_as_str(sql_file_path)).strip()
else:
failed_rows_query = dedent(configuration_value).strip()
elif configuration_key.endswith("query") or configuration_key.endswith("sql_file"):
if configuration_key.endswith("sql_file"):
fs = file_system()
Expand Down Expand Up @@ -918,24 +927,39 @@ def __parse_metric_check(
f"Invalid syntax used in '{check_str}'. More than one check attribute is not supported. A check like this will be skipped in future versions of Soda Core"
)

return metric_check_cfg_class(
source_header=header_str,
source_line=check_str,
source_configurations=check_configurations,
location=self.location,
name=name,
metric_name=metric_name,
metric_args=metric_args,
missing_and_valid_cfg=missing_and_valid_cfg,
filter=filter,
condition=condition,
metric_expression=metric_expression,
metric_query=metric_query,
change_over_time_cfg=change_over_time_cfg,
fail_threshold_cfg=fail_threshold_cfg,
warn_threshold_cfg=warn_threshold_cfg,
samples_limit=samples_limit,
)
def takes_keyword_argument(cls, keyword):
signature = inspect.signature(cls.__init__)
return keyword in signature.parameters

# Some arguments make no sense for certain metric checks, so we only pass the ones that are supported by the given class constructor.
# Do this instead of accepting kwargs and passing all arguments to the constructor, because it's easier to see what arguments are supported and they do not disappear in the constructor.
all_args = {
"source_header": header_str,
"source_line": check_str,
"source_configurations": check_configurations,
"location": self.location,
"name": name,
"metric_name": metric_name,
"metric_args": metric_args,
"missing_and_valid_cfg": missing_and_valid_cfg,
"filter": filter,
"condition": condition,
"metric_expression": metric_expression,
"metric_query": metric_query,
"change_over_time_cfg": change_over_time_cfg,
"fail_threshold_cfg": fail_threshold_cfg,
"warn_threshold_cfg": warn_threshold_cfg,
"samples_limit": samples_limit,
"failed_rows_query": failed_rows_query,
}

use_args = {}

for arg in all_args.keys():
if takes_keyword_argument(metric_check_cfg_class, arg):
use_args[arg] = all_args[arg]

return metric_check_cfg_class(**use_args)

def __parse_configuration_threshold_condition(self, value) -> ThresholdCfg | None:
if isinstance(value, str):
Expand Down
59 changes: 59 additions & 0 deletions soda/core/tests/data_source/test_user_defined_metric_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,62 @@ def test_user_defined_data_source_query_metric_with_sql_file(data_source_fixture

finally:
os.remove(path)


def test_user_defined_data_source_query_metric_check_with_fail_query(data_source_fixture: DataSourceFixture):
table_name = data_source_fixture.ensure_test_table(customers_test_table)

qualified_table_name = data_source_fixture.data_source.qualified_table_name(table_name)

scan = data_source_fixture.create_test_scan()
mock_soda_cloud = scan.enable_mock_soda_cloud()
scan.enable_mock_sampler()
scan.add_sodacl_yaml_str(
f"""
checks:
- belgium_customers = 6:
belgium_customers query: |
SELECT count(*) as belgium_customers
FROM {qualified_table_name}
WHERE country = 'BE'
failed rows query: |
SELECT *
FROM {qualified_table_name}
WHERE country != 'BE'
"""
)
scan.execute()
scan.assert_all_checks_pass()

assert mock_soda_cloud.find_failed_rows_line_count(0) == 4


def test_user_defined_data_source_query_metric_check_with_fail_query_file(data_source_fixture: DataSourceFixture):
fd, path = tempfile.mkstemp()
table_name = data_source_fixture.ensure_test_table(customers_test_table)
qualified_table_name = data_source_fixture.data_source.qualified_table_name(table_name)

scan = data_source_fixture.create_test_scan()
mock_soda_cloud = scan.enable_mock_soda_cloud()
scan.enable_mock_sampler()
try:
with os.fdopen(fd, "w") as tmp:
tmp.write(f"SELECT * FROM {qualified_table_name} WHERE country != 'BE'")

scan.add_sodacl_yaml_str(
f"""
checks:
- belgium_customers = 6:
belgium_customers query: |
SELECT count(*) as belgium_customers
FROM {qualified_table_name}
WHERE country = 'BE'
failed rows sql_file: "{path}"
"""
)
scan.execute()
scan.assert_all_checks_pass()
assert mock_soda_cloud.find_failed_rows_line_count(0) == 4

finally:
os.remove(path)

0 comments on commit 5d1163c

Please sign in to comment.