Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User defined metric fail query #2089

Merged
merged 5 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/main.workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ jobs:
sudo apt-get update
ACCEPT_EULA=Y sudo apt-get install -y libsasl2-dev msodbcsql18
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -132,6 +133,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -166,6 +168,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -194,6 +197,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr.workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
sudo apt-get update
ACCEPT_EULA=Y sudo apt-get install -y libsasl2-dev msodbcsql18
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -110,6 +111,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -142,6 +144,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down Expand Up @@ -171,6 +174,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y libsasl2-dev
python -m pip install --upgrade pip
pip install requests==2.31.0
cat dev-requirements.in | grep tox | xargs pip install
- name: Test with tox
Expand Down
2 changes: 2 additions & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ readme-renderer~=32.0
certifi>=2022.12.07
wheel>=0.38.1
docutils<0.21 # 0.21 dropped py38 support, remove this after py38 support is gone
requests==2.31.0 # 2.32.0 is broken, does not support docker. Remove this after new version is out

8 changes: 5 additions & 3 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pathspec==0.12.1
# via black
pip-tools==7.4.1
# via -r dev-requirements.in
platformdirs==4.2.1
platformdirs==4.2.2
# via
# black
# virtualenv
Expand Down Expand Up @@ -100,7 +100,9 @@ python-dotenv==1.0.1
readme-renderer==32.0
# via -r dev-requirements.in
requests==2.31.0
# via docker
# via
# -r dev-requirements.in
# docker
schema==0.7.7
# via tbump
six==1.16.0
Expand Down Expand Up @@ -137,7 +139,7 @@ urllib3==1.26.18
# -r dev-requirements.in
# docker
# requests
virtualenv==20.26.1
virtualenv==20.26.2
# via tox
webencodings==0.5.1
# via bleach
Expand Down
20 changes: 18 additions & 2 deletions soda/core/soda/execution/metric/user_defined_numeric_metric.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from __future__ import annotations

from numbers import Number

from soda.execution.metric.query_metric import QueryMetric
from soda.execution.query.sample_query import SampleQuery
from soda.execution.query.user_defined_numeric_query import UserDefinedNumericQuery


class UserDefinedNumericMetric(QueryMetric):
def __init__(
self,
data_source_scan: "DataSourceScan",
data_source_scan: DataSourceScan,
check_name: str,
sql: str,
check: "Check" = None,
check: Check = None,
):
super().__init__(
data_source_scan=data_source_scan,
Expand All @@ -19,6 +24,7 @@ def __init__(
identity_parts=[sql],
)
self.sql = sql
self.check = check

def __str__(self):
return f'"{self.name}"'
Expand All @@ -38,3 +44,13 @@ def ensure_query(self):
)
self.queries.append(query)
self.data_source_scan.queries.append(query)

def create_failed_rows_sample_query(self) -> SampleQuery | None:
sampler = self.data_source_scan.scan._configuration.sampler
if sampler and isinstance(self.value, Number) and self.check.check_cfg.failed_rows_query:
if self.samples_limit > 0:
jinja_resolve = self.data_source_scan.scan.jinja_resolve
sql = jinja_resolve(self.check.check_cfg.failed_rows_query)
sample_query = SampleQuery(self.data_source_scan, self, "failed_rows", sql)

return sample_query
9 changes: 8 additions & 1 deletion soda/core/soda/execution/query/user_defined_numeric_query.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

from soda.execution.metric.metric import Metric
from soda.execution.query.query import Query


class UserDefinedNumericQuery(Query):
def __init__(
self,
data_source_scan: "DataSourceScan",
data_source_scan: DataSourceScan,
check_name: str,
sql: str,
metric: Metric,
Expand All @@ -22,3 +24,8 @@ def execute(self):
if self.row[index] is not None:
metric_value = float(self.row[index])
self.metric.set_value(metric_value)

sample_query = self.metric.create_failed_rows_sample_query()
if sample_query:
self.metric.queries.append(sample_query)
sample_query.execute()
2 changes: 2 additions & 0 deletions soda/core/soda/sodacl/check_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(
name: str | None,
samples_limit: int | None = None,
samples_columns: list | None = None,
failed_rows_query: str | None = None,
):
self.source_header: str = source_header
self.source_line: str = source_line
Expand All @@ -25,6 +26,7 @@ def __init__(
self.name: str | None = name
self.samples_limit: int | None = samples_limit
self.samples_columns: list | None = samples_columns
self.failed_rows_query: str | None = failed_rows_query

def get_column_name(self) -> str | None:
pass
Expand Down
11 changes: 10 additions & 1 deletion soda/core/soda/sodacl/metric_check_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,17 @@ def __init__(
fail_threshold_cfg: ThresholdCfg | None,
warn_threshold_cfg: ThresholdCfg | None,
samples_limit: int | None = None,
failed_rows_query: str | None = None,
):
super().__init__(source_header, source_line, source_configurations, location, name, samples_limit)
super().__init__(
source_header,
source_line,
source_configurations,
location,
name,
samples_limit,
failed_rows_query=failed_rows_query,
)
self.metric_name: str = metric_name
self.metric_args: list[object] | None = metric_args
self.missing_and_valid_cfg: MissingAndValidCfg = missing_and_valid_cfg
Expand Down
60 changes: 42 additions & 18 deletions soda/core/soda/sodacl/sodacl_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import functools
import inspect
import logging
import os
import re
Expand Down Expand Up @@ -619,6 +620,7 @@ def __parse_metric_check(
condition = None
metric_expression = None
metric_query = None
failed_rows_query = None
samples_limit = None
samples_columns = None
training_dataset_params: TrainingDatasetParameters = TrainingDatasetParameters()
Expand Down Expand Up @@ -657,6 +659,13 @@ def __parse_metric_check(
f'In configuration "{configuration_key}" the metric name must match exactly the metric name in the check "{metric_name}"',
location=self.location,
)
elif configuration_key == "failed rows query" or configuration_key == "failed rows sql_file":
if configuration_key.endswith("sql_file"):
fs = file_system()
sql_file_path = fs.join(fs.dirname(self.path_stack.file_path), configuration_value.strip())
failed_rows_query = dedent(fs.file_read_as_str(sql_file_path)).strip()
else:
failed_rows_query = dedent(configuration_value).strip()
elif configuration_key.endswith("query") or configuration_key.endswith("sql_file"):
if configuration_key.endswith("sql_file"):
fs = file_system()
Expand Down Expand Up @@ -918,24 +927,39 @@ def __parse_metric_check(
f"Invalid syntax used in '{check_str}'. More than one check attribute is not supported. A check like this will be skipped in future versions of Soda Core"
)

return metric_check_cfg_class(
source_header=header_str,
source_line=check_str,
source_configurations=check_configurations,
location=self.location,
name=name,
metric_name=metric_name,
metric_args=metric_args,
missing_and_valid_cfg=missing_and_valid_cfg,
filter=filter,
condition=condition,
metric_expression=metric_expression,
metric_query=metric_query,
change_over_time_cfg=change_over_time_cfg,
fail_threshold_cfg=fail_threshold_cfg,
warn_threshold_cfg=warn_threshold_cfg,
samples_limit=samples_limit,
)
def takes_keyword_argument(cls, keyword):
signature = inspect.signature(cls.__init__)
return keyword in signature.parameters

# Some arguments make no sense for certain metric checks, so we only pass the ones that are supported by the given class constructor.
# Do this instead of accepting kwargs and passing all arguments to the constructor, because it's easier to see what arguments are supported and they do not disappear in the constructor.
all_args = {
"source_header": header_str,
"source_line": check_str,
"source_configurations": check_configurations,
"location": self.location,
"name": name,
"metric_name": metric_name,
"metric_args": metric_args,
"missing_and_valid_cfg": missing_and_valid_cfg,
"filter": filter,
"condition": condition,
"metric_expression": metric_expression,
"metric_query": metric_query,
"change_over_time_cfg": change_over_time_cfg,
"fail_threshold_cfg": fail_threshold_cfg,
"warn_threshold_cfg": warn_threshold_cfg,
"samples_limit": samples_limit,
"failed_rows_query": failed_rows_query,
}

use_args = {}

for arg in all_args.keys():
if takes_keyword_argument(metric_check_cfg_class, arg):
use_args[arg] = all_args[arg]

return metric_check_cfg_class(**use_args)

def __parse_configuration_threshold_condition(self, value) -> ThresholdCfg | None:
if isinstance(value, str):
Expand Down
59 changes: 59 additions & 0 deletions soda/core/tests/data_source/test_user_defined_metric_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,62 @@ def test_user_defined_data_source_query_metric_with_sql_file(data_source_fixture

finally:
os.remove(path)


def test_user_defined_data_source_query_metric_check_with_fail_query(data_source_fixture: DataSourceFixture):
table_name = data_source_fixture.ensure_test_table(customers_test_table)

qualified_table_name = data_source_fixture.data_source.qualified_table_name(table_name)

scan = data_source_fixture.create_test_scan()
mock_soda_cloud = scan.enable_mock_soda_cloud()
scan.enable_mock_sampler()
scan.add_sodacl_yaml_str(
f"""
checks:
- belgium_customers = 6:
belgium_customers query: |
SELECT count(*) as belgium_customers
FROM {qualified_table_name}
WHERE country = 'BE'
failed rows query: |
SELECT *
FROM {qualified_table_name}
WHERE country != 'BE'
"""
)
scan.execute()
scan.assert_all_checks_pass()

assert mock_soda_cloud.find_failed_rows_line_count(0) == 4


def test_user_defined_data_source_query_metric_check_with_fail_query_file(data_source_fixture: DataSourceFixture):
fd, path = tempfile.mkstemp()
table_name = data_source_fixture.ensure_test_table(customers_test_table)
qualified_table_name = data_source_fixture.data_source.qualified_table_name(table_name)

scan = data_source_fixture.create_test_scan()
mock_soda_cloud = scan.enable_mock_soda_cloud()
scan.enable_mock_sampler()
try:
with os.fdopen(fd, "w") as tmp:
tmp.write(f"SELECT * FROM {qualified_table_name} WHERE country != 'BE'")

scan.add_sodacl_yaml_str(
f"""
checks:
- belgium_customers = 6:
belgium_customers query: |
SELECT count(*) as belgium_customers
FROM {qualified_table_name}
WHERE country = 'BE'
failed rows sql_file: "{path}"
"""
)
scan.execute()
scan.assert_all_checks_pass()
assert mock_soda_cloud.find_failed_rows_line_count(0) == 4

finally:
os.remove(path)
Loading