Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dryrun): Pass all query parameters when dry-running queries #6835

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 42 additions & 25 deletions bigquery_etl/dryrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@
from backports.cached_property import cached_property # type: ignore


QUERY_PARAMETER_TYPE_VALUES = {
"DATE": "2019-01-01",
"DATETIME": "2019-01-01 00:00:00",
"TIMESTAMP": "2019-01-01 00:00:00",
"STRING": "foo",
"BOOL": True,
"FLOAT64": 1,
"FLOAT": 1,
"INT64": 1,
sean-rose marked this conversation as resolved.
Show resolved Hide resolved
"INTEGER": 1,
"NUMERIC": 1,
"BIGNUMERIC": 1,
}


def get_credentials(auth_req: Optional[GoogleAuthRequest] = None):
"""Get GCP credentials."""
auth_req = auth_req or GoogleAuthRequest()
Expand Down Expand Up @@ -204,28 +219,30 @@ def dry_run_result(self):
sql = self.content
else:
sql = self.get_sql()
if self.metadata:
# use metadata to rewrite date-type query params as submission_date
date_params = [
query_param
for query_param in (
self.metadata.scheduling.get("date_partition_parameter"),
*(
param.split(":", 1)[0]
for param in self.metadata.scheduling.get("parameters", [])
if re.fullmatch(r"[^:]+:DATE:{{.*ds.*}}", param)
),

query_parameters = []
scheduling_metadata = self.metadata.scheduling if self.metadata else {}
if date_partition_parameter := scheduling_metadata.get(
"date_partition_parameter", "submission_date"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does submission_date need to be set if if there's no date_partition_parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If date_partition_parameter isn't specified at all in the scheduling metadata then it does default to passing a @submission_date parameter, so dryrun should too. However, if date_partition_parameter is set to null in the scheduling metadata then no @submission_date parameter is passed by default, so this does the same in that case (the "date_partition_parameter" key will exist in the scheduling dictionary, so the get() will return the null value as None which won't pass the if check).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. I didn't know it did that

):
query_parameters.append(
bigquery.ScalarQueryParameter(
date_partition_parameter,
"DATE",
QUERY_PARAMETER_TYPE_VALUES["DATE"],
)
if query_param and query_param != "submission_date"
]
if date_params:
pattern = re.compile(
"@("
+ "|".join(date_params)
# match whole query parameter names
+ ")(?![a-zA-Z0-9_])"
)
for parameter in scheduling_metadata.get("parameters", []):
parameter_name, parameter_type, _ = parameter.strip().split(":", 2)
parameter_type = parameter_type.upper() or "STRING"
query_parameters.append(
bigquery.ScalarQueryParameter(
parameter_name,
parameter_type,
QUERY_PARAMETER_TYPE_VALUES.get(parameter_type),
)
sql = pattern.sub("@submission_date", sql)
)

project = basename(dirname(dirname(dirname(self.sqlfile))))
dataset = basename(dirname(dirname(self.sqlfile)))
try:
Expand All @@ -234,6 +251,10 @@ def dry_run_result(self):
"project": self.project or project,
"dataset": self.dataset or dataset,
"query": sql,
"query_parameters": [
query_parameter.to_api_repr()
for query_parameter in query_parameters
],
}

if self.table:
Expand All @@ -257,11 +278,7 @@ def dry_run_result(self):
dry_run=True,
use_query_cache=False,
default_dataset=f"{project}.{dataset}",
query_parameters=[
bigquery.ScalarQueryParameter(
"submission_date", "DATE", "2019-01-01"
)
],
query_parameters=query_parameters,
)
job = self.client.query(sql, job_config=job_config)
try:
Expand Down