Skip to content

Commit

Permalink
[dagster-airbyte] Add destination_type to kind tags in Airbyte asset …
Browse files Browse the repository at this point in the history
…specs (#26376)

## Summary & Motivation

Add the destination type to the Airbyte asset tags. Users can now what
is the kind of destination for their Fivetran assets.

## How I Tested These Changes

Updated unit test with BK
  • Loading branch information
maximearmstrong authored Dec 10, 2024
1 parent 267b53a commit 3032429
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class AirbyteConnectionTableProps:
json_schema: Mapping[str, Any]
connection_id: str
connection_name: str
destination_type: str
database: Optional[str]
schema: Optional[str]

Expand Down Expand Up @@ -67,6 +68,7 @@ class AirbyteDestination:
"""Represents an Airbyte destination, based on data as returned from the API."""

id: str
type: str
database: Optional[str]
schema: Optional[str]

Expand All @@ -77,6 +79,7 @@ def from_destination_details(
) -> "AirbyteDestination":
return cls(
id=destination_details["destinationId"],
type=destination_details["destinationType"],
database=destination_details["configuration"].get("database"),
schema=destination_details["configuration"].get("schema"),
)
Expand Down Expand Up @@ -138,6 +141,7 @@ def to_airbyte_connection_table_props_data(self) -> Sequence[AirbyteConnectionTa
json_schema=stream.json_schema,
connection_id=connection.id,
connection_name=connection.name,
destination_type=destination.type,
database=destination.database,
schema=destination.schema,
)
Expand Down Expand Up @@ -186,5 +190,5 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> AssetSpec:
return AssetSpec(
key=AssetKey(props.table_name),
metadata=metadata,
kinds={"airbyte"},
kinds={"airbyte", props.destination_type},
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

# Taken from the examples in the Airbyte REST API documentation
TEST_DESTINATION_ID = "18dccc91-0ab1-4f72-9ed7-0b8fc27c5826"
TEST_DESTINATION_TYPE = "postgres"
TEST_DESTINATION_DATABASE = "test_database"
TEST_DESTINATION_SCHEMA = "test_schema"
TEST_CONNECTION_ID = "9924bcd0-99be-453d-ba47-c2c9766f7da5"
Expand All @@ -31,9 +32,10 @@
table_name=f"{TEST_STREAM_PREFIX}{TEST_STREAM_NAME}",
stream_prefix=TEST_STREAM_PREFIX,
stream_name=TEST_STREAM_NAME,
json_schema=TEST_JSON_SCHEMA,
connection_id=TEST_CONNECTION_ID,
connection_name=TEST_CONNECTION_NAME,
json_schema=TEST_JSON_SCHEMA,
destination_type=TEST_DESTINATION_TYPE,
database=TEST_DESTINATION_DATABASE,
schema=TEST_DESTINATION_SCHEMA,
)
Expand Down Expand Up @@ -147,7 +149,7 @@
SAMPLE_DESTINATION_DETAILS = {
"destinationId": TEST_DESTINATION_ID,
"name": "My Destination",
"sourceType": "postgres",
"destinationType": TEST_DESTINATION_TYPE,
"workspaceId": "744cc0ed-7f05-4949-9e60-2a814f90c035",
"configuration": {
"conversion_window_days": 14,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
TEST_CONNECTION_NAME,
TEST_DESTINATION_DATABASE,
TEST_DESTINATION_SCHEMA,
TEST_DESTINATION_TYPE,
TEST_JSON_SCHEMA,
TEST_STREAM_NAME,
TEST_STREAM_PREFIX,
Expand Down Expand Up @@ -103,3 +104,4 @@ def test_custom_translator(
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == ["test_connection", "test_prefix_test_stream"]
assert has_kind(asset_spec.tags, "airbyte")
assert has_kind(asset_spec.tags, TEST_DESTINATION_TYPE)

0 comments on commit 3032429

Please sign in to comment.