Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate check: remove unused aggregated query #2118

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions soda/core/soda/execution/query/duplicates_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,6 @@ def __init__(self, partition: "Partition", metric: "Metric"):
)
)

self.failing_rows_sql_aggregated = jinja_resolve(
data_source.sql_get_duplicates_aggregated(
column_names,
table_name,
values_filter,
self.samples_limit,
invert_condition=False,
exclude_patterns=exclude_patterns,
)
)

def execute(self):
self.fetchone()
if self.row:
Expand All @@ -97,14 +86,15 @@ def execute(self):
sample_query.execute()

# TODO: This should be a second failed rows file, refactor failed rows to support multiple files.
if self.failing_rows_sql_aggregated and self.samples_limit > 0:
aggregate_sample_query = Query(
self.data_source_scan,
self.partition.table,
self.partition,
unqualified_query_name=f"duplicate_count[{'-'.join(self.metric.metric_args)}].failed_rows.aggregated",
sql=self.failing_rows_sql_aggregated,
samples_limit=self.samples_limit,
)
aggregate_sample_query.execute()
self.aggregated_failed_rows_data = aggregate_sample_query.rows
# TODO: removing for now, this is not using standard Query.store() and "gatekeeper" does not kick in - which is a potential data leak.
# if self.failing_rows_sql_aggregated and self.samples_limit > 0:
# aggregate_sample_query = Query(
# self.data_source_scan,
# self.partition.table,
# self.partition,
# unqualified_query_name=f"duplicate_count[{'-'.join(self.metric.metric_args)}].failed_rows.aggregated",
# sql=self.failing_rows_sql_aggregated,
# samples_limit=self.samples_limit,
# )
# aggregate_sample_query.execute()
# self.aggregated_failed_rows_data = aggregate_sample_query.rows
6 changes: 4 additions & 2 deletions soda/core/tests/data_source/test_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ def test_duplicates_with_exclude_columns(data_source_fixture: DataSourceFixture)
scan.assert_all_checks_pass()

# Exclude columns present, query should list the columns explicitly
scan.assert_log("cat, frequency")
scan.assert_no_log(" * ")
scan.assert_log("main.cat,", ignore_case=True)
scan.assert_log("main.pct,", ignore_case=True)
scan.assert_no_log("main.country,", ignore_case=True)
scan.assert_no_log(" * ", ignore_case=True)


def test_duplicates_with_filter(data_source_fixture: DataSourceFixture):
Expand Down
24 changes: 14 additions & 10 deletions soda/core/tests/helpers/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,25 @@ def assert_log_warning(self, message):
def assert_log_error(self, message):
self.assert_log(message, LogLevel.ERROR)

def assert_log(self, message, level: LogLevel | None = None):
if level:
if not any([log.level == level and message in log.message for log in self._logs.logs]):
raise AssertionError(f"{level.name} not found: {message}")
def assert_log(self, message, level: LogLevel | None = None, ignore_case=False):
levels = [level] if level else [l for l in LogLevel]

if ignore_case:
if not any([log.level in levels and message.lower() in log.message.lower() for log in self._logs.logs]):
raise AssertionError(f"{level.name if level else 'Log'} not found: {message}")
else:
if not any([message in log.message for log in self._logs.logs]):
raise AssertionError(f"Log not found: {message}")
raise AssertionError(f"{level.name if level else 'Log'} not found: {message}")

def assert_no_log(self, message, level: LogLevel | None = None, ignore_case=False):
levels = [level] if level else [l for l in LogLevel]

def assert_no_log(self, message, level: LogLevel | None = None):
if level:
if any([log.level == level and message in log.message for log in self._logs.logs]):
raise AssertionError(f"{level.name} found: {message}")
if ignore_case:
if any([log.level in levels and message.lower() in log.message.lower() for log in self._logs.logs]):
raise AssertionError(f"{level.name if level else 'Log'} found: {message}")
else:
if any([message in log.message for log in self._logs.logs]):
raise AssertionError(f"Log found: {message}")
raise AssertionError(f"{level.name if level else 'Log'} found: {message}")

def assert_all_checks_pass(self):
self.assert_all_checks(CheckOutcome.PASS)
Expand Down
Loading