Skip to content

Commit

Permalink
Remove the fallback operator for async task
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Feb 13, 2025
1 parent 745ed14 commit d9a9c7d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
5 changes: 2 additions & 3 deletions cosmos/operators/_asynchronous/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ def _create_async_operator_class(profile_type: str, dbt_class: str) -> Any:
module = importlib.import_module(module_path)
operator_class = getattr(module, class_name)
return operator_class
except (ModuleNotFoundError, AttributeError):
log.info("Error in loading class: %s. falling back to DbtRunLocalOperator", class_path)
return DbtRunLocalOperator
except (ModuleNotFoundError, AttributeError) as e:
raise ImportError(f"Error in loading class: {class_path}. Unable to find the specified operator class.") from e


class DbtRunAirflowAsyncFactoryOperator(DbtRunLocalOperator): # type: ignore[misc]
Expand Down
21 changes: 11 additions & 10 deletions tests/operators/_asynchronous/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,33 @@
from cosmos.operators._asynchronous import TeardownAsyncOperator
from cosmos.operators._asynchronous.base import DbtRunAirflowAsyncFactoryOperator, _create_async_operator_class
from cosmos.operators._asynchronous.bigquery import DbtRunAirflowAsyncBigqueryOperator
from cosmos.operators._asynchronous.databricks import DbtRunAirflowAsyncDatabricksOperator
from cosmos.operators.local import DbtRunLocalOperator

_ASYNC_PROFILE = ["bigquery", "databricks"]


@pytest.mark.parametrize(
"profile_type, dbt_class, expected_operator_class",
[
("bigquery", "DbtRun", DbtRunAirflowAsyncBigqueryOperator),
("snowflake", "DbtRun", DbtRunLocalOperator),
("bigquery", "DbtTest", DbtRunLocalOperator),
("databricks", "DbtRun", DbtRunAirflowAsyncDatabricksOperator),
],
)
def test_create_async_operator_class_success(profile_type, dbt_class, expected_operator_class):
def test_create_async_operator_class(profile_type, dbt_class, expected_operator_class):
"""Test the successful loading of the async operator class."""

operator_class = _create_async_operator_class(profile_type, dbt_class)

assert operator_class == expected_operator_class


def test_create_async_operator_class_unsupported():

with pytest.raises(ImportError, match="Error in loading class"):
_create_async_operator_class("test_profile", "DbtRun")


@pytest.fixture
def profile_config_mock():
"""Fixture to create a mock ProfileConfig."""
Expand All @@ -46,13 +54,6 @@ def test_create_async_operator_class_valid():
assert result == mock_class


def test_create_async_operator_class_fallback():
"""Test _create_async_operator_class falls back to DbtRunLocalOperator when import fails."""
with patch("cosmos.operators._asynchronous.base.importlib.import_module", side_effect=ModuleNotFoundError):
result = _create_async_operator_class("bigquery", "DbtRun")
assert result == DbtRunLocalOperator


class MockAsyncOperator(DbtRunLocalOperator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down

0 comments on commit d9a9c7d

Please sign in to comment.