Skip to content

Commit

Permalink
Flatten artifacts on load (#84)
Browse files Browse the repository at this point in the history
* Add V2 artifact upload method which flattens on load, avoiding 16MB field limit
  • Loading branch information
alanmcruickshank authored Feb 22, 2022
1 parent ec1bda4 commit f435114
Show file tree
Hide file tree
Showing 17 changed files with 635 additions and 152 deletions.
80 changes: 55 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ vars:
dbt_artifacts_database: your_db # optional, default is your target database
dbt_artifacts_schema: your_schema # optional, default is 'dbt_artifacts'
dbt_artifacts_table: your_table # optional, default is 'artifacts'
dbt_artifacts_results_table: your_table # optional, default is 'dbt_run_results'
dbt_artifacts_result_nodes_table: your_table # optional, default is 'dbt_run_result_nodes'
dbt_artifacts_manifest_nodes_table: your_table # optional, default is 'dbt_run_manifest_nodes'
dbt_artifacts_manifest_sources_table: your_table # optional, default is 'dbt_run_manifest_sources'
dbt_artifacts_manifest_exposures_table: your_table # optional, default is 'dbt_run_manifest_exposures'

models:
...
Expand All @@ -45,33 +50,58 @@ Note that the model materializations are defined in this package's `dbt_project.

3. Run `dbt deps`.

## Generating the source table
This package requires that the source data exists in a table in Snowflake.

### Option 1: Loading local files
## Uploading the artifacts
This package uploads the artifact files into Snowflake. There are two supported ways of doing this:
- The _V2_ way of doing this which flattens the uploaded files on load. This supports files over
16MB (the limit of a variant field in snowflake) and also makes rebuilds of the materialised
models much faster because the JSON unpacking is done once on load. The downside of this approach
is that the upload is much heavier and more complex, as such we only directly support the
_"local file"_ method. Loading via cloud storage is also _possible_ but we recommend users
copy the method used in `upload_artifacts_v2.sql` to create their own approach.
- The _V1_ or _legacy_ option, which uploads the files unprocessed. This affords much more flexibility
in their use, but is subject to field size limits and higher compute loads to reprocess the
large JSON payloads in future. This may be appropriate for more custom setups or for small projects
but for large projects which aren't extending the functionality of the package significantly, we
recommend the _V2_ method.

### Option 1: Loading local files [V1 & V2]
Snowflake makes it possible to load local files into your warehouse. We've included a number of macros to assist with this. This method can be used by both dbt Cloud users, and users of other orchestration tools.

1. To initially create these tables, execute `dbt run-operation create_artifact_resources` ([source](macros/create_artifact_resources.sql)). This will create a stage and a table named `{{ target.database }}.dbt_artifacts.artifacts` — you can override this name using the variables listed in the Installation section, above.

2. Add [operations](https://docs.getdbt.com/docs/building-a-dbt-project/hooks-operations/#operations) to your production run to load files into your table, via the `upload_artifacts` macro ([source](macros/upload_artifacts.sql)). You'll need to specify which files to upload through use of the `--args` flag. Here's an example setup.
```txt
$ dbt seed
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [manifest, run_results]}'
$ dbt run
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [manifest, run_results]}'
$ dbt test
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [run_results]}'
$ dbt source snapshot-freshness
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [sources]}'
$ dbt docs generate
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [catalog]}'
```

### Option 2: Loading cloud-storage files
1. To initially create these tables, execute `dbt run-operation create_artifact_resources`
([source](macros/create_artifact_resources.sql)). This will create a stage and a set of tables in
the `{{ target.database }}.dbt_artifacts` schema — you can override the database, schema and table
names using the variables listed in the Installation section, above.

2. Add [operations](https://docs.getdbt.com/docs/building-a-dbt-project/hooks-operations/#operations)
to your production run to load files into your table.

**V2 Macro**: Use the `upload_dbt_artifacts_v2` macro ([source](macros/upload_artifacts.sql)). You only
need to run the macro after `run`, `test`, `seed`, `snapshot` or `build` operations.
```txt
$ dbt run
$ dbt run-operation upload_dbt_artifacts_v2
```

**V1 Macro**: Use the `upload_dbt_artifacts` macro ([source](macros/upload_artifacts.sql)). You'll need
to specify which files to upload through use of the `--args` flag. Here's an example setup.
```txt
$ dbt seed
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [manifest, run_results]}'
$ dbt run
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [manifest, run_results]}'
$ dbt test
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [run_results]}'
$ dbt source snapshot-freshness
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [sources]}'
$ dbt docs generate
$ dbt run-operation upload_dbt_artifacts --args '{filenames: [catalog]}'
```

### Option 2: Loading cloud-storage files [V1 only]

If you are using an orchestrator, you might instead upload these files to cloud storage — the method to do this will depend on your orchestrator. Then, link the cloud storage destination to a Snowflake external stage, and use a snowpipe to copy these files into the source table:

Expand Down
11 changes: 11 additions & 0 deletions integration_test_project/macros/drop_test_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{% macro drop_test_schema() %}

-- We drop if exists so that it still passes when the db is clean.
{% set drop_schema_query %}
drop schema if exists {{ target.schema }};
{% endset %}

{% do log("Dropping test schema: " ~ drop_schema_query, info=True) %}
{% do run_query(drop_schema_query) %}

{% endmacro %}
37 changes: 37 additions & 0 deletions integration_test_project/tests/check_model_executions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
with raw_model_executions as (

select * from {{ ref('fct_dbt__model_executions') }}

),

grouped_executions as (

select
artifact_run_id,
count(*) as runs
from raw_model_executions
group by artifact_run_id

),

expected_results as (

select
artifact_run_id,
runs,
-- Hard coded expected results. Potentially to improve later.
case artifact_run_id
when 'b27910c784063dc867a762eb91ac7e93033492ac49b482215cd1761824b07a58' then 31 -- build
when '1ab40ec436539434416dfca0bb0e8d8cf3708bb568fb2385321a192b59b9c4e7' then 31 -- build_full_refresh
when 'c6775fc1f3d39acb37f389df8b67aa59cb989994dc9b940b51e7bcba830212a3' then 31 -- run
when '4fbd1feb6cfc3cd088fc47ac461efdfab7f95380aa5a939360da629bbdb9ce1d' then 31 -- run_full_refresh
when '6ee8780f7533ae3901f8759fd07ddae4af20b7856c788bf515bdf14ee059e90d' then 0 -- seed
when '1c87fbb828af7f041f0d7d4440904a8e482a8be74e617eb57a11b76001936550' then 0 -- snapshot
when '37f4a0fca17b0f8f1fb0db04fbef311dd73cacfcd6653c76d46e3d7f36dc079c' then 0 -- test
else 0
end as expected_runs
from grouped_executions
where runs != expected_runs
)

select * from expected_results
37 changes: 37 additions & 0 deletions integration_test_project/tests/check_node_executions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
with raw_node_executions as (

select * from {{ ref('stg_dbt__node_executions') }}

),

grouped_executions as (

select
artifact_run_id,
count(*) as runs
from raw_node_executions
group by artifact_run_id

),

expected_results as (

select
artifact_run_id,
runs,
-- Hard coded expected results. Potentially to improve later.
case artifact_run_id
when 'b27910c784063dc867a762eb91ac7e93033492ac49b482215cd1761824b07a58' then 51 -- build
when '1ab40ec436539434416dfca0bb0e8d8cf3708bb568fb2385321a192b59b9c4e7' then 51 -- build_full_refresh
when 'c6775fc1f3d39acb37f389df8b67aa59cb989994dc9b940b51e7bcba830212a3' then 31 -- run
when '4fbd1feb6cfc3cd088fc47ac461efdfab7f95380aa5a939360da629bbdb9ce1d' then 31 -- run_full_refresh
when '6ee8780f7533ae3901f8759fd07ddae4af20b7856c788bf515bdf14ee059e90d' then 1 -- seed
when '1c87fbb828af7f041f0d7d4440904a8e482a8be74e617eb57a11b76001936550' then 1 -- snapshot
when '37f4a0fca17b0f8f1fb0db04fbef311dd73cacfcd6653c76d46e3d7f36dc079c' then 18 -- test
else 0
end as expected_runs
from grouped_executions
where runs != expected_runs
)

select * from expected_results
25 changes: 25 additions & 0 deletions integration_test_project/tests/check_run_results.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
with raw_runs as (

select * from {{ ref('stg_dbt__run_results') }}

),

grouped_runs as (

select
count(*) as runs
from raw_runs

),

expected_results as (

select
runs,
-- Hard coded expected results. Potentially to improve later.
7 as expected_runs
from grouped_runs
where runs != expected_runs
)

select * from expected_results
4 changes: 4 additions & 0 deletions macros/artifact_run.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This ID provides a reliable ID, regardless of whether running in a local or cloud environment.
{% macro make_artifact_run_id() %}
sha2_hex(coalesce(dbt_cloud_run_id::string, command_invocation_id::string), 256)
{% endmacro %}
96 changes: 90 additions & 6 deletions macros/create_artifact_resources.sql
Original file line number Diff line number Diff line change
@@ -1,29 +1,113 @@
{% macro create_artifact_resources() %}

{% set src_dbt_artifacts = source('dbt_artifacts', 'artifacts') %}
{% set artifact_stage = var('dbt_artifacts_stage', 'dbt_artifacts_stage') %}

{% set src_results = source('dbt_artifacts', 'dbt_run_results') %}
{% set src_results_nodes = source('dbt_artifacts', 'dbt_run_results_nodes') %}
{% set src_manifest_nodes = source('dbt_artifacts', 'dbt_run_manifest_nodes') %}

{{ create_schema(src_dbt_artifacts) }}

{% set create_stage_query %}
{% set create_v1_stage_query %}
create stage if not exists {{ src_dbt_artifacts }}
file_format = (type = json);
{% endset %}

{% set create_table_query %}
{% set create_v2_stage_query %}
create stage if not exists {{ artifact_stage }}
file_format = (type = json);
{% endset %}

{% set create_v1_table_query %}
create table if not exists {{ src_dbt_artifacts }} (
data variant,
generated_at timestamp,
path string,
artifact_type string
);
{% endset %}

{% set create_v2_results_query %}
create table if not exists {{ src_results }} (
command_invocation_id string,
dbt_cloud_run_id int,
artifact_run_id string,
artifact_generated_at timestamp_tz,
dbt_version string,
env variant,
elapsed_time double,
execution_command string,
was_full_refresh boolean,
selected_models variant,
target string,
metadata variant,
args variant
);
{% endset %}

{% set create_v2_result_nodes_table_query %}
create table if not exists {{ src_results_nodes }} (
command_invocation_id string,
dbt_cloud_run_id int,
artifact_run_id string,
artifact_generated_at timestamp_tz,
execution_command string,
was_full_refresh boolean,
node_id string,
thread_id integer,
status string,
message string,
compile_started_at timestamp_tz,
query_completed_at timestamp_tz,
total_node_runtime float,
rows_affected int,
result_json variant
);
{% endset %}

{% set create_v2_manifest_nodes_table_query %}
create table if not exists {{ src_manifest_nodes }} (
command_invocation_id string,
dbt_cloud_run_id int,
artifact_run_id string,
artifact_generated_at timestamp_tz,
node_id string,
resource_type string,
node_database string,
node_schema string,
name string,
depends_on_nodes array,
depends_on_sources array,
exposure_type string,
exposure_owner string,
exposure_maturity string,
source_name string,
package_name string,
relation_name string,
node_path string,
checksum string,
materialization string,
node_json variant
);
{% endset %}

{% do log("Creating V1 Stage: " ~ create_v1_stage_query, info=True) %}
{% do run_query(create_v1_stage_query) %}

{% do log("Creating V2 Stage: " ~ create_v2_stage_query, info=True) %}
{% do run_query(create_v2_stage_query) %}

{% do log("Creating V1 Table: " ~ create_v1_table_query, info=True) %}
{% do run_query(create_v1_table_query) %}

{% do log("Creating V2 Results Table: " ~ create_v2_results_query, info=True) %}
{% do run_query(create_v2_results_query) %}

{% do log("Creating Stage: " ~ create_stage_query, info=True) %}
{% do run_query(create_stage_query) %}
{% do log("Creating V2 Result Nodes Table: " ~ create_v2_result_nodes_table_query, info=True) %}
{% do run_query(create_v2_result_nodes_table_query) %}

{% do log("Creating Table: " ~ create_table_query, info=True) %}
{% do run_query(create_table_query) %}
{% do log("Creating V2 Manifest Nodes Table: " ~ create_v2_manifest_nodes_table_query, info=True) %}
{% do run_query(create_v2_manifest_nodes_table_query) %}

{% endmacro %}
Loading

0 comments on commit f435114

Please sign in to comment.