Skip to content

Commit

Permalink
Simplify staging models and inner join in staging. (#85)
Browse files Browse the repository at this point in the history
* Simplify staging models and inner join in staging.
  • Loading branch information
alanmcruickshank authored Feb 16, 2022
1 parent 7b69afa commit d8bd5e1
Show file tree
Hide file tree
Showing 24 changed files with 322 additions and 849 deletions.
2 changes: 1 addition & 1 deletion models/fct_dbt__critical_path.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
with models as (

select *
from {{ ref('stg_dbt__models') }}
from {{ ref('dim_dbt__models') }}

),

Expand Down
36 changes: 13 additions & 23 deletions models/incremental/dim_dbt__exposures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,38 @@
)
}}

with dbt_exposures as (
with dbt_nodes as (

select * from {{ ref('stg_dbt__exposures') }}

),

run_results as (

select *
from {{ ref('fct_dbt__run_results') }}
select * from {{ ref('stg_dbt__nodes') }}

),

dbt_exposures_incremental as (

select dbt_exposures.*
from dbt_exposures
-- Inner join with run results to enforce consistency and avoid race conditions.
-- https://github.com/brooklyn-data/dbt_artifacts/issues/75
inner join run_results on
dbt_exposures.artifact_run_id = run_results.artifact_run_id
select *
from dbt_nodes
where resource_type = 'exposure'

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where dbt_exposures.artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
and artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

fields as (

select
t.manifest_exposure_id,
t.manifest_node_id as manifest_exposure_id,
t.command_invocation_id,
t.dbt_cloud_run_id,
t.artifact_run_id,
t.artifact_generated_at,
t.node_id,
t.name,
t.type,
t.owner,
t.maturity,
t.exposure_type as type,
t.exposure_owner as owner,
t.exposure_maturity as maturity,
f.value::string as output_feeds,
t.package_name
from dbt_exposures_incremental as t,
Expand Down
38 changes: 14 additions & 24 deletions models/incremental/dim_dbt__models.sql
Original file line number Diff line number Diff line change
@@ -1,51 +1,41 @@
{{ config( materialized='incremental', unique_key='manifest_model_id' ) }}

with dbt_models as (
with dbt_nodes as (

select * from {{ ref('stg_dbt__models') }}

),

run_results as (

select *
from {{ ref('fct_dbt__run_results') }}
select * from {{ ref('stg_dbt__nodes') }}

),

dbt_models_incremental as (

select dbt_models.*
from dbt_models
-- Inner join with run results to enforce consistency and avoid race conditions.
-- https://github.com/brooklyn-data/dbt_artifacts/issues/75
inner join run_results on
dbt_models.artifact_run_id = run_results.artifact_run_id
select *
from dbt_nodes
where resource_type = 'model'

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where coalesce(dbt_models.artifact_generated_at > (select max(artifact_generated_at) from {{ this }}), true)
{% endif %}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
and coalesce(artifact_generated_at > (select max(artifact_generated_at) from {{ this }}), true)
{% endif %}

),

fields as (

select
manifest_model_id,
manifest_node_id as manifest_model_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_run_id,
artifact_generated_at,
node_id,
model_database,
model_schema,
node_database as model_database,
node_schema as model_schema,
name,
depends_on_nodes,
package_name,
model_path,
node_path as model_path,
checksum,
model_materialization
materialization as model_materialization
from dbt_models_incremental

)
Expand Down
36 changes: 13 additions & 23 deletions models/incremental/dim_dbt__seeds.sql
Original file line number Diff line number Diff line change
@@ -1,49 +1,39 @@
{{ config( materialized='incremental', unique_key='manifest_seed_id' ) }}

with dbt_seeds as (
with dbt_nodes as (

select * from {{ ref('stg_dbt__seeds') }}

),

run_results as (

select *
from {{ ref('fct_dbt__run_results') }}
select * from {{ ref('stg_dbt__nodes') }}

),

dbt_seeds_incremental as (

select dbt_seeds.*
from dbt_seeds
-- Inner join with run results to enforce consistency and avoid race conditions.
-- https://github.com/brooklyn-data/dbt_artifacts/issues/75
inner join run_results on
dbt_seeds.artifact_run_id = run_results.artifact_run_id
select *
from dbt_nodes
where resource_type = 'seed'

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where dbt_seeds.artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
and artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

fields as (

select
manifest_seed_id,
manifest_node_id as manifest_seed_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_run_id,
artifact_generated_at,
node_id,
seed_database,
seed_schema,
node_database as seed_database,
node_schema as seed_schema,
name,
depends_on_nodes,
package_name,
seed_path,
node_path as seed_path,
checksum
from dbt_seeds_incremental

Expand Down
36 changes: 13 additions & 23 deletions models/incremental/dim_dbt__snapshots.sql
Original file line number Diff line number Diff line change
@@ -1,49 +1,39 @@
{{ config( materialized='incremental', unique_key='manifest_snapshot_id' ) }}

with dbt_snapshots as (
with dbt_nodes as (

select * from {{ ref('stg_dbt__snapshots') }}

),

run_results as (

select *
from {{ ref('fct_dbt__run_results') }}
select * from {{ ref('stg_dbt__nodes') }}

),

dbt_snapshots_incremental as (

select dbt_snapshots.*
from dbt_snapshots
-- Inner join with run results to enforce consistency and avoid race conditions.
-- https://github.com/brooklyn-data/dbt_artifacts/issues/75
inner join run_results on
dbt_snapshots.artifact_run_id = run_results.artifact_run_id
select *
from dbt_nodes
where resource_type = 'snapshot'

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where dbt_snapshots.artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
and artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

fields as (

select
manifest_snapshot_id,
manifest_node_id as manifest_snapshot_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_run_id,
artifact_generated_at,
node_id,
snapshot_database,
snapshot_schema,
node_database as snapshot_database,
node_schema as snapshot_schema,
name,
depends_on_nodes,
package_name,
snapshot_path,
node_path as snapshot_path,
checksum
from dbt_snapshots_incremental

Expand Down
34 changes: 12 additions & 22 deletions models/incremental/dim_dbt__sources.sql
Original file line number Diff line number Diff line change
@@ -1,49 +1,39 @@
{{ config( materialized='incremental', unique_key='manifest_source_id' ) }}

with dbt_sources as (
with dbt_nodes as (

select * from {{ ref('stg_dbt__sources') }}

),

run_results as (

select *
from {{ ref('fct_dbt__run_results') }}
select * from {{ ref('stg_dbt__nodes') }}

),

dbt_sources_incremental as (

select dbt_sources.*
from dbt_sources
-- Inner join with run results to enforce consistency and avoid race conditions.
-- https://github.com/brooklyn-data/dbt_artifacts/issues/75
inner join run_results on
dbt_sources.artifact_run_id = run_results.artifact_run_id
select *
from dbt_nodes
where resource_type = 'source'

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where dbt_sources.artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
and artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

fields as (

select
manifest_source_id,
manifest_node_id as manifest_source_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_run_id,
artifact_generated_at,
node_id,
name,
source_name,
source_schema,
node_schema as source_schema,
package_name,
relation_name,
source_path
node_path as source_path
from dbt_sources_incremental

)
Expand Down
32 changes: 11 additions & 21 deletions models/incremental/dim_dbt__tests.sql
Original file line number Diff line number Diff line change
@@ -1,38 +1,28 @@
{{ config( materialized='incremental', unique_key='manifest_test_id' ) }}

with dbt_tests as (
with dbt_nodes as (

select * from {{ ref('stg_dbt__tests') }}

),

run_results as (

select *
from {{ ref('fct_dbt__run_results') }}
select * from {{ ref('stg_dbt__nodes') }}

),

dbt_tests_incremental as (

select dbt_tests.*
from dbt_tests
-- Inner join with run results to enforce consistency and avoid race conditions.
-- https://github.com/brooklyn-data/dbt_artifacts/issues/75
inner join run_results on
dbt_tests.artifact_run_id = run_results.artifact_run_id
select *
from dbt_nodes
where resource_type = 'test'

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where dbt_tests.artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
and artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

fields as (

select
manifest_test_id,
manifest_node_id as manifest_test_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_run_id,
Expand All @@ -41,7 +31,7 @@ fields as (
name,
depends_on_nodes,
package_name,
test_path
node_path as test_path
from dbt_tests_incremental

)
Expand Down
Loading

0 comments on commit d8bd5e1

Please sign in to comment.