Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Python regressions in 1.9.0beta #857

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions dbt/adapters/databricks/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ def start(self, cluster_id: str) -> None:

response = self.session.post("/start", json={"cluster_id": cluster_id})
if response.status_code != 200:
raise DbtRuntimeError(f"Error starting terminated cluster.\n {response.content!r}")
logger.debug(f"Cluster start response={response}")
if self.status(cluster_id) not in ["RUNNING", "PENDING"]:
raise DbtRuntimeError(f"Error starting terminated cluster.\n {response.content!r}")
else:
logger.debug("Presuming race condition, waiting for cluster to start")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we query state after error, if it's pending or running it means the start failed due to race condition as another thread got the cluster started.

Copy link
Collaborator

@jackyhu-db jackyhu-db Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be info rather than debug or debug with status_code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything that is info will show in the normal dbt output, so we are very conservative about what we log with info.
This is normal operation; if they are using multiple python models and the command api they will hit this, so I don't think this is worth bringing to the users' attention if it's running or pending. For whatever reason, the cluster start API errors if you ask to start a cluster that is already in the process of starting. If it's not running or pending, they'll get the full output from raising the error.


self.wait_for_cluster(cluster_id)

Expand Down Expand Up @@ -289,7 +291,7 @@ def cancel(self, command: CommandExecution) -> None:
raise DbtRuntimeError(f"Cancel command {command} failed.\n {response.content!r}")

def poll_for_completion(self, command: CommandExecution) -> None:
self._poll_api(
response = self._poll_api(
url="/status",
params={
"clusterId": command.cluster_id,
Expand All @@ -300,7 +302,13 @@ def poll_for_completion(self, command: CommandExecution) -> None:
terminal_states={"Finished", "Error", "Cancelled"},
expected_end_state="Finished",
unexpected_end_state_func=self._get_exception,
)
).json()

if response["results"]["resultType"] == "error":
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lost this in the refactor: for some reason Command exec will give a state of 'Finished' rather then 'Error' some times, and then stuff the error in the results.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the result or resultType does not exist

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are published parts of the Databricks REST API. If these don't exist, the entire API becomes untrustworthy.

raise DbtRuntimeError(
f"Python model failed with traceback as:\n"
f"{utils.remove_ansi(response['results']['cause'])}"
)

def _get_exception(self, response: Response) -> None:
response_json = response.json()
Expand Down
2 changes: 0 additions & 2 deletions dbt/adapters/databricks/behaviors/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from dbt.adapters.databricks.utils import handle_missing_objects
from dbt.adapters.sql import SQLAdapter

GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments"


class GetColumnsBehavior(ABC):
@classmethod
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
SHOW_TABLE_EXTENDED_MACRO_NAME = "show_table_extended"
SHOW_TABLES_MACRO_NAME = "show_tables"
SHOW_VIEWS_MACRO_NAME = "show_views"
GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed this wasn't referenced anywhere when doing my debugging.


USE_INFO_SCHEMA_FOR_COLUMNS = BehaviorFlag(
name="use_info_schema_for_columns",
Expand Down
7 changes: 7 additions & 0 deletions dbt/adapters/databricks/python_models/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dbt.adapters.base import PythonJobHelper
from dbt.adapters.databricks.api_client import CommandExecution, DatabricksApiClient, WorkflowJobApi
from dbt.adapters.databricks.credentials import DatabricksCredentials
from dbt.adapters.databricks.logging import logger
from dbt.adapters.databricks.python_models.python_config import ParsedPythonModel
from dbt.adapters.databricks.python_models.run_tracking import PythonRunTracker

Expand Down Expand Up @@ -70,6 +71,8 @@ def __init__(

@override
def submit(self, compiled_code: str) -> None:
logger.debug("Submitting Python model using the Command API.")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding debug statement so that we can quickly determine submission method from logs.


context_id = self.api_client.command_contexts.create(self.cluster_id)
command_exec: Optional[CommandExecution] = None
try:
Expand Down Expand Up @@ -263,6 +266,8 @@ def create(

@override
def submit(self, compiled_code: str) -> None:
logger.debug("Submitting Python model using the Job Run API.")

file_path = self.uploader.upload(compiled_code)
job_config = self.config_compiler.compile(file_path)

Expand Down Expand Up @@ -494,6 +499,8 @@ def create(

@override
def submit(self, compiled_code: str) -> None:
logger.debug("Submitting Python model using the Workflow API.")

file_path = self.uploader.upload(compiled_code)

workflow_config, existing_job_id = self.config_compiler.compile(file_path)
Expand Down
27 changes: 27 additions & 0 deletions dbt/include/databricks/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved these to somewhere more sensible (during investigation realized these were in persist_docs, but are sometimes called outside of persisting docs).

{% macro get_columns_comments(relation) -%}
{% call statement('get_columns_comments', fetch_result=True) -%}
describe table {{ relation|lower }}
{% endcall %}

{% do return(load_result('get_columns_comments').table) %}
{% endmacro %}

{% macro get_columns_comments_via_information_schema(relation) -%}
{% call statement('repair_table', fetch_result=False) -%}
REPAIR TABLE {{ relation|lower }} SYNC METADATA
{% endcall %}
{% call statement('get_columns_comments_via_information_schema', fetch_result=True) -%}
select
column_name,
full_data_type,
comment
from `system`.`information_schema`.`columns`
where
table_catalog = '{{ relation.database|lower }}' and
table_schema = '{{ relation.schema|lower }}' and
table_name = '{{ relation.identifier|lower }}'
{% endcall %}

{% do return(load_result('get_columns_comments_via_information_schema').table) %}
{% endmacro %}
27 changes: 0 additions & 27 deletions dbt/include/databricks/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,6 @@
{% do run_query(comment_query) %}
{% endmacro %}

{% macro get_columns_comments(relation) -%}
{% call statement('get_columns_comments', fetch_result=True) -%}
describe table {{ relation|lower }}
{% endcall %}

{% do return(load_result('get_columns_comments').table) %}
{% endmacro %}

{% macro get_columns_comments_via_information_schema(relation) -%}
{% call statement('repair_table', fetch_result=False) -%}
REPAIR TABLE {{ relation|lower }} SYNC METADATA
{% endcall %}
{% call statement('get_columns_comments_via_information_schema', fetch_result=True) -%}
select
column_name,
full_data_type,
comment
from `system`.`information_schema`.`columns`
where
table_catalog = '{{ relation.database|lower }}' and
table_schema = '{{ relation.schema|lower }}' and
table_name = '{{ relation.identifier|lower }}'
{% endcall %}

{% do return(load_result('get_columns_comments_via_information_schema').table) %}
{% endmacro %}

{% macro databricks__persist_docs(relation, model, for_relation, for_columns) -%}
{%- if for_relation and config.persist_relation_docs() and model.description %}
{% do alter_table_comment(relation, model) %}
Expand Down
9 changes: 9 additions & 0 deletions tests/functional/adapter/python_model/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ def model(dbt, spark):
return spark.createDataFrame(data, schema=['test', 'test2'])
"""

python_error_model = """
import pandas as pd

def model(dbt, spark):
raise Exception("This is an error")

return pd.DataFrame()
"""

serverless_schema = """version: 2

models:
Expand Down
19 changes: 19 additions & 0 deletions tests/functional/adapter/python_model/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,25 @@ class TestPythonModel(BasePythonModelTests):
pass


@pytest.mark.python
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
class TestPythonFailureModel:
@pytest.fixture(scope="class")
def models(self):
return {"my_failure_model.py": override_fixtures.python_error_model}

def test_failure_model(self, project):
util.run_dbt(["run"], expect_pass=False)


@pytest.mark.python
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
class TestPythonFailureModelNotebook(TestPythonFailureModel):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+create_notebook": "true"}}


@pytest.mark.python
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
class TestPythonIncrementalModel(BasePythonIncrementalTests):
Expand Down
Loading