Skip to content

Commit

Permalink
Duplicate check: remove unused aggregated query (#2118)
Browse files Browse the repository at this point in the history
  • Loading branch information
m1n0 authored Jun 28, 2024
1 parent 87cc985 commit 7262ded
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 34 deletions.
34 changes: 12 additions & 22 deletions soda/core/soda/execution/query/duplicates_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,6 @@ def __init__(self, partition: "Partition", metric: "Metric"):
)
)

self.failing_rows_sql_aggregated = jinja_resolve(
data_source.sql_get_duplicates_aggregated(
column_names,
table_name,
values_filter,
self.samples_limit,
invert_condition=False,
exclude_patterns=exclude_patterns,
)
)

def execute(self):
self.fetchone()
if self.row:
Expand All @@ -97,14 +86,15 @@ def execute(self):
sample_query.execute()

# TODO: This should be a second failed rows file, refactor failed rows to support multiple files.
if self.failing_rows_sql_aggregated and self.samples_limit > 0:
aggregate_sample_query = Query(
self.data_source_scan,
self.partition.table,
self.partition,
unqualified_query_name=f"duplicate_count[{'-'.join(self.metric.metric_args)}].failed_rows.aggregated",
sql=self.failing_rows_sql_aggregated,
samples_limit=self.samples_limit,
)
aggregate_sample_query.execute()
self.aggregated_failed_rows_data = aggregate_sample_query.rows
# TODO: removing for now, this is not using standard Query.store() and "gatekeeper" does not kick in - which is a potential data leak.
# if self.failing_rows_sql_aggregated and self.samples_limit > 0:
# aggregate_sample_query = Query(
# self.data_source_scan,
# self.partition.table,
# self.partition,
# unqualified_query_name=f"duplicate_count[{'-'.join(self.metric.metric_args)}].failed_rows.aggregated",
# sql=self.failing_rows_sql_aggregated,
# samples_limit=self.samples_limit,
# )
# aggregate_sample_query.execute()
# self.aggregated_failed_rows_data = aggregate_sample_query.rows
6 changes: 4 additions & 2 deletions soda/core/tests/data_source/test_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ def test_duplicates_with_exclude_columns(data_source_fixture: DataSourceFixture)
scan.assert_all_checks_pass()

# Exclude columns present, query should list the columns explicitly
scan.assert_log("cat, frequency")
scan.assert_no_log(" * ")
scan.assert_log("main.cat,", ignore_case=True)
scan.assert_log("main.pct,", ignore_case=True)
scan.assert_no_log("main.country,", ignore_case=True)
scan.assert_no_log(" * ", ignore_case=True)


def test_duplicates_with_filter(data_source_fixture: DataSourceFixture):
Expand Down
24 changes: 14 additions & 10 deletions soda/core/tests/helpers/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,25 @@ def assert_log_warning(self, message):
def assert_log_error(self, message):
self.assert_log(message, LogLevel.ERROR)

def assert_log(self, message, level: LogLevel | None = None):
if level:
if not any([log.level == level and message in log.message for log in self._logs.logs]):
raise AssertionError(f"{level.name} not found: {message}")
def assert_log(self, message, level: LogLevel | None = None, ignore_case=False):
levels = [level] if level else [l for l in LogLevel]

if ignore_case:
if not any([log.level in levels and message.lower() in log.message.lower() for log in self._logs.logs]):
raise AssertionError(f"{level.name if level else 'Log'} not found: {message}")
else:
if not any([message in log.message for log in self._logs.logs]):
raise AssertionError(f"Log not found: {message}")
raise AssertionError(f"{level.name if level else 'Log'} not found: {message}")

def assert_no_log(self, message, level: LogLevel | None = None, ignore_case=False):
levels = [level] if level else [l for l in LogLevel]

def assert_no_log(self, message, level: LogLevel | None = None):
if level:
if any([log.level == level and message in log.message for log in self._logs.logs]):
raise AssertionError(f"{level.name} found: {message}")
if ignore_case:
if any([log.level in levels and message.lower() in log.message.lower() for log in self._logs.logs]):
raise AssertionError(f"{level.name if level else 'Log'} found: {message}")
else:
if any([message in log.message for log in self._logs.logs]):
raise AssertionError(f"Log found: {message}")
raise AssertionError(f"{level.name if level else 'Log'} found: {message}")

def assert_all_checks_pass(self):
self.assert_all_checks(CheckOutcome.PASS)
Expand Down

0 comments on commit 7262ded

Please sign in to comment.