diff --git a/.github/workflows/main.workflow.yml b/.github/workflows/main.workflow.yml index 33b884d96..8fad8fdbb 100644 --- a/.github/workflows/main.workflow.yml +++ b/.github/workflows/main.workflow.yml @@ -98,6 +98,7 @@ jobs: sudo apt-get update ACCEPT_EULA=Y sudo apt-get install -y libsasl2-dev msodbcsql18 python -m pip install --upgrade pip + pip install requests==2.31.0 cat dev-requirements.in | grep tox | xargs pip install - name: Test with tox @@ -132,6 +133,7 @@ jobs: sudo apt-get update sudo apt-get install -y libsasl2-dev python -m pip install --upgrade pip + pip install requests==2.31.0 cat dev-requirements.in | grep tox | xargs pip install - name: Test with tox @@ -166,6 +168,7 @@ jobs: sudo apt-get update sudo apt-get install -y libsasl2-dev python -m pip install --upgrade pip + pip install requests==2.31.0 cat dev-requirements.in | grep tox | xargs pip install - name: Test with tox @@ -194,6 +197,7 @@ jobs: sudo apt-get update sudo apt-get install -y libsasl2-dev python -m pip install --upgrade pip + pip install requests==2.31.0 cat dev-requirements.in | grep tox | xargs pip install - name: Test with tox diff --git a/.github/workflows/pr.workflow.yml b/.github/workflows/pr.workflow.yml index 8356a1a15..aaafb3194 100644 --- a/.github/workflows/pr.workflow.yml +++ b/.github/workflows/pr.workflow.yml @@ -78,6 +78,7 @@ jobs: sudo apt-get update ACCEPT_EULA=Y sudo apt-get install -y libsasl2-dev msodbcsql18 python -m pip install --upgrade pip + pip install requests==2.31.0 cat dev-requirements.in | grep tox | xargs pip install - name: Test with tox @@ -110,6 +111,7 @@ jobs: sudo apt-get update sudo apt-get install -y libsasl2-dev python -m pip install --upgrade pip + pip install requests==2.31.0 cat dev-requirements.in | grep tox | xargs pip install - name: Test with tox @@ -142,6 +144,7 @@ jobs: sudo apt-get update sudo apt-get install -y libsasl2-dev python -m pip install --upgrade pip + pip install requests==2.31.0 cat dev-requirements.in | grep tox | xargs pip install - name: Test with tox @@ -171,6 +174,7 @@ jobs: sudo apt-get update sudo apt-get install -y libsasl2-dev python -m pip install --upgrade pip + pip install requests==2.31.0 cat dev-requirements.in | grep tox | xargs pip install - name: Test with tox diff --git a/dev-requirements.in b/dev-requirements.in index 4ad215a28..a2712f911 100644 --- a/dev-requirements.in +++ b/dev-requirements.in @@ -15,3 +15,5 @@ readme-renderer~=32.0 certifi>=2022.12.07 wheel>=0.38.1 docutils<0.21 # 0.21 dropped py38 support, remove this after py38 support is gone +requests==2.31.0 # 2.32.0 is broken, does not support docker. Remove this after new version is out + diff --git a/dev-requirements.txt b/dev-requirements.txt index d4dadcd46..c84f6aad9 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -61,7 +61,7 @@ pathspec==0.12.1 # via black pip-tools==7.4.1 # via -r dev-requirements.in -platformdirs==4.2.1 +platformdirs==4.2.2 # via # black # virtualenv @@ -100,7 +100,9 @@ python-dotenv==1.0.1 readme-renderer==32.0 # via -r dev-requirements.in requests==2.31.0 - # via docker + # via + # -r dev-requirements.in + # docker schema==0.7.7 # via tbump six==1.16.0 @@ -137,7 +139,7 @@ urllib3==1.26.18 # -r dev-requirements.in # docker # requests -virtualenv==20.26.1 +virtualenv==20.26.2 # via tox webencodings==0.5.1 # via bleach diff --git a/soda/core/soda/execution/metric/user_defined_numeric_metric.py b/soda/core/soda/execution/metric/user_defined_numeric_metric.py index e78d3cfe5..7afaacf97 100644 --- a/soda/core/soda/execution/metric/user_defined_numeric_metric.py +++ b/soda/core/soda/execution/metric/user_defined_numeric_metric.py @@ -1,14 +1,19 @@ +from __future__ import annotations + +from numbers import Number + from soda.execution.metric.query_metric import QueryMetric +from soda.execution.query.sample_query import SampleQuery from soda.execution.query.user_defined_numeric_query import UserDefinedNumericQuery class UserDefinedNumericMetric(QueryMetric): def __init__( self, - data_source_scan: "DataSourceScan", + data_source_scan: DataSourceScan, check_name: str, sql: str, - check: "Check" = None, + check: Check = None, ): super().__init__( data_source_scan=data_source_scan, @@ -19,6 +24,7 @@ def __init__( identity_parts=[sql], ) self.sql = sql + self.check = check def __str__(self): return f'"{self.name}"' @@ -38,3 +44,13 @@ def ensure_query(self): ) self.queries.append(query) self.data_source_scan.queries.append(query) + + def create_failed_rows_sample_query(self) -> SampleQuery | None: + sampler = self.data_source_scan.scan._configuration.sampler + if sampler and isinstance(self.value, Number) and self.check.check_cfg.failed_rows_query: + if self.samples_limit > 0: + jinja_resolve = self.data_source_scan.scan.jinja_resolve + sql = jinja_resolve(self.check.check_cfg.failed_rows_query) + sample_query = SampleQuery(self.data_source_scan, self, "failed_rows", sql) + + return sample_query diff --git a/soda/core/soda/execution/query/user_defined_numeric_query.py b/soda/core/soda/execution/query/user_defined_numeric_query.py index 379687cef..43b7dc15f 100644 --- a/soda/core/soda/execution/query/user_defined_numeric_query.py +++ b/soda/core/soda/execution/query/user_defined_numeric_query.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from soda.execution.metric.metric import Metric from soda.execution.query.query import Query @@ -5,7 +7,7 @@ class UserDefinedNumericQuery(Query): def __init__( self, - data_source_scan: "DataSourceScan", + data_source_scan: DataSourceScan, check_name: str, sql: str, metric: Metric, @@ -22,3 +24,8 @@ def execute(self): if self.row[index] is not None: metric_value = float(self.row[index]) self.metric.set_value(metric_value) + + sample_query = self.metric.create_failed_rows_sample_query() + if sample_query: + self.metric.queries.append(sample_query) + sample_query.execute() diff --git a/soda/core/soda/sodacl/check_cfg.py b/soda/core/soda/sodacl/check_cfg.py index 8bcf72235..6878e3dfa 100644 --- a/soda/core/soda/sodacl/check_cfg.py +++ b/soda/core/soda/sodacl/check_cfg.py @@ -17,6 +17,7 @@ def __init__( name: str | None, samples_limit: int | None = None, samples_columns: list | None = None, + failed_rows_query: str | None = None, ): self.source_header: str = source_header self.source_line: str = source_line @@ -25,6 +26,7 @@ def __init__( self.name: str | None = name self.samples_limit: int | None = samples_limit self.samples_columns: list | None = samples_columns + self.failed_rows_query: str | None = failed_rows_query def get_column_name(self) -> str | None: pass diff --git a/soda/core/soda/sodacl/metric_check_cfg.py b/soda/core/soda/sodacl/metric_check_cfg.py index a77dcc21c..687e12b89 100644 --- a/soda/core/soda/sodacl/metric_check_cfg.py +++ b/soda/core/soda/sodacl/metric_check_cfg.py @@ -32,8 +32,17 @@ def __init__( fail_threshold_cfg: ThresholdCfg | None, warn_threshold_cfg: ThresholdCfg | None, samples_limit: int | None = None, + failed_rows_query: str | None = None, ): - super().__init__(source_header, source_line, source_configurations, location, name, samples_limit) + super().__init__( + source_header, + source_line, + source_configurations, + location, + name, + samples_limit, + failed_rows_query=failed_rows_query, + ) self.metric_name: str = metric_name self.metric_args: list[object] | None = metric_args self.missing_and_valid_cfg: MissingAndValidCfg = missing_and_valid_cfg diff --git a/soda/core/soda/sodacl/sodacl_parser.py b/soda/core/soda/sodacl/sodacl_parser.py index 4bb8a01f1..9fe161c63 100644 --- a/soda/core/soda/sodacl/sodacl_parser.py +++ b/soda/core/soda/sodacl/sodacl_parser.py @@ -1,6 +1,7 @@ from __future__ import annotations import functools +import inspect import logging import os import re @@ -619,6 +620,7 @@ def __parse_metric_check( condition = None metric_expression = None metric_query = None + failed_rows_query = None samples_limit = None samples_columns = None training_dataset_params: TrainingDatasetParameters = TrainingDatasetParameters() @@ -657,6 +659,13 @@ def __parse_metric_check( f'In configuration "{configuration_key}" the metric name must match exactly the metric name in the check "{metric_name}"', location=self.location, ) + elif configuration_key == "failed rows query" or configuration_key == "failed rows sql_file": + if configuration_key.endswith("sql_file"): + fs = file_system() + sql_file_path = fs.join(fs.dirname(self.path_stack.file_path), configuration_value.strip()) + failed_rows_query = dedent(fs.file_read_as_str(sql_file_path)).strip() + else: + failed_rows_query = dedent(configuration_value).strip() elif configuration_key.endswith("query") or configuration_key.endswith("sql_file"): if configuration_key.endswith("sql_file"): fs = file_system() @@ -918,24 +927,39 @@ def __parse_metric_check( f"Invalid syntax used in '{check_str}'. More than one check attribute is not supported. A check like this will be skipped in future versions of Soda Core" ) - return metric_check_cfg_class( - source_header=header_str, - source_line=check_str, - source_configurations=check_configurations, - location=self.location, - name=name, - metric_name=metric_name, - metric_args=metric_args, - missing_and_valid_cfg=missing_and_valid_cfg, - filter=filter, - condition=condition, - metric_expression=metric_expression, - metric_query=metric_query, - change_over_time_cfg=change_over_time_cfg, - fail_threshold_cfg=fail_threshold_cfg, - warn_threshold_cfg=warn_threshold_cfg, - samples_limit=samples_limit, - ) + def takes_keyword_argument(cls, keyword): + signature = inspect.signature(cls.__init__) + return keyword in signature.parameters + + # Some arguments make no sense for certain metric checks, so we only pass the ones that are supported by the given class constructor. + # Do this instead of accepting kwargs and passing all arguments to the constructor, because it's easier to see what arguments are supported and they do not disappear in the constructor. + all_args = { + "source_header": header_str, + "source_line": check_str, + "source_configurations": check_configurations, + "location": self.location, + "name": name, + "metric_name": metric_name, + "metric_args": metric_args, + "missing_and_valid_cfg": missing_and_valid_cfg, + "filter": filter, + "condition": condition, + "metric_expression": metric_expression, + "metric_query": metric_query, + "change_over_time_cfg": change_over_time_cfg, + "fail_threshold_cfg": fail_threshold_cfg, + "warn_threshold_cfg": warn_threshold_cfg, + "samples_limit": samples_limit, + "failed_rows_query": failed_rows_query, + } + + use_args = {} + + for arg in all_args.keys(): + if takes_keyword_argument(metric_check_cfg_class, arg): + use_args[arg] = all_args[arg] + + return metric_check_cfg_class(**use_args) def __parse_configuration_threshold_condition(self, value) -> ThresholdCfg | None: if isinstance(value, str): diff --git a/soda/core/tests/data_source/test_user_defined_metric_checks.py b/soda/core/tests/data_source/test_user_defined_metric_checks.py index 8d621c42d..6baad84ab 100644 --- a/soda/core/tests/data_source/test_user_defined_metric_checks.py +++ b/soda/core/tests/data_source/test_user_defined_metric_checks.py @@ -135,3 +135,62 @@ def test_user_defined_data_source_query_metric_with_sql_file(data_source_fixture finally: os.remove(path) + + +def test_user_defined_data_source_query_metric_check_with_fail_query(data_source_fixture: DataSourceFixture): + table_name = data_source_fixture.ensure_test_table(customers_test_table) + + qualified_table_name = data_source_fixture.data_source.qualified_table_name(table_name) + + scan = data_source_fixture.create_test_scan() + mock_soda_cloud = scan.enable_mock_soda_cloud() + scan.enable_mock_sampler() + scan.add_sodacl_yaml_str( + f""" + checks: + - belgium_customers = 6: + belgium_customers query: | + SELECT count(*) as belgium_customers + FROM {qualified_table_name} + WHERE country = 'BE' + failed rows query: | + SELECT * + FROM {qualified_table_name} + WHERE country != 'BE' + """ + ) + scan.execute() + scan.assert_all_checks_pass() + + assert mock_soda_cloud.find_failed_rows_line_count(0) == 4 + + +def test_user_defined_data_source_query_metric_check_with_fail_query_file(data_source_fixture: DataSourceFixture): + fd, path = tempfile.mkstemp() + table_name = data_source_fixture.ensure_test_table(customers_test_table) + qualified_table_name = data_source_fixture.data_source.qualified_table_name(table_name) + + scan = data_source_fixture.create_test_scan() + mock_soda_cloud = scan.enable_mock_soda_cloud() + scan.enable_mock_sampler() + try: + with os.fdopen(fd, "w") as tmp: + tmp.write(f"SELECT * FROM {qualified_table_name} WHERE country != 'BE'") + + scan.add_sodacl_yaml_str( + f""" + checks: + - belgium_customers = 6: + belgium_customers query: | + SELECT count(*) as belgium_customers + FROM {qualified_table_name} + WHERE country = 'BE' + failed rows sql_file: "{path}" + """ + ) + scan.execute() + scan.assert_all_checks_pass() + assert mock_soda_cloud.find_failed_rows_line_count(0) == 4 + + finally: + os.remove(path)