Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/postgres performance #126

Merged
merged 16 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# dbt_fivetran_log v1.7.3
[PR #126](https://github.com/fivetran/dbt_fivetran_log/pull/126) includes the following updates:

## Performance Improvements
- Updated the sequence of JSON parsing for model `fivetran_platform__audit_table` to reduce runtime.

## Bug Fixes
- Updated model `fivetran_platform__audit_user_activity` to correct the JSON parsing used to determine column `email`. This fixes an issue introduced in v1.5.0 where `fivetran_platform__audit_user_activity` could potentially have 0 rows.

## Under the hood
- Updated logic for macro `fivetran_log_lookback` to align with logic used in similar macros in other packages.
- Updated logic for the postgres dispatch of macro `fivetran_log_json_parse` to utilize `jsonb` instead of `json` for performance.

# dbt_fivetran_log v1.7.2
[PR #123](https://github.com/fivetran/dbt_fivetran_log/pull/123) includes the following updates:

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# Fivetran Platform dbt Package ([Docs](https://fivetran.github.io/dbt_fivetran_log/))
# 📣 What does this dbt package do?
- Generates a comprehensive data dictionary of your Fivetran Platform connector (previously called Fivetran Log) data via the [dbt docs site](https://fivetran.github.io/dbt_fivetran_log/)
- Produces staging models in the format described by [this ERD](https://fivetran.com/docs/logs/fivetran-log#schemainformation) which clean, test, and prepare your Fivetran data from [Fivetran's free connector](https://fivetran.com/docs/applications/fivetran-log) and generates analysis ready end models.
- Produces staging models in the format described by [this ERD](https://fivetran.com/docs/logs/fivetran-platform#schemainformation) which clean, test, and prepare your Fivetran data from [Fivetran's free connector](https://fivetran.com/docs/logs/fivetran-platform)) and generates analysis ready end models.
- The above mentioned models enable you to better understand how you are spending money in Fivetran according to our [consumption-based pricing model](https://fivetran.com/docs/getting-started/consumption-based-pricing) as well as providing details about the performance and status of your Fivetran connectors. This is achieved by:
- Displaying consumption data at the table, connector, destination, and account levels
- Providing a history of measured free and paid monthly active rows (MAR), credit consumption, and the relationship between the two
Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config-version: 2
name: 'fivetran_log'
version: '1.7.2'
version: '1.7.3'
require-dbt-version: [">=1.3.0", "<2.0.0"]

models:
Expand Down
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/run_results.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ integration_tests:
host: "{{ env_var('CI_DATABRICKS_DBT_HOST') }}"
http_path: "{{ env_var('CI_DATABRICKS_DBT_HTTP_PATH') }}"
schema: fivetran_platform_integration_tests
threads: 2
threads: 8
token: "{{ env_var('CI_DATABRICKS_DBT_TOKEN') }}"
type: databricks
databricks-sql:
catalog: "{{ env_var('CI_DATABRICKS_DBT_CATALOG') }}"
host: "{{ env_var('CI_DATABRICKS_DBT_HOST') }}"
http_path: "{{ env_var('CI_DATABRICKS_SQL_DBT_HTTP_PATH') }}"
schema: sqlw_tests
threads: 2
threads: 8
token: "{{ env_var('CI_DATABRICKS_SQL_DBT_TOKEN') }}"
type: databricks
sqlserver:
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'fivetran_log_integration_tests'
version: '1.7.2'
version: '1.7.3'

config-version: 2
profile: 'integration_tests'
Expand Down
2 changes: 1 addition & 1 deletion macros/fivetran_log_json_parse.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
{% macro postgres__fivetran_log_json_parse(string, string_path) %}

case when {{ string }} ~ '^\s*[\{].*[\}]?\s*$' -- Postgres has no native json check, so this will check the string for indicators of a JSON object
then {{ string }}::json #>> '{ {%- for s in string_path -%}{{ s }}{%- if not loop.last -%},{%- endif -%}{%- endfor -%} }'
then {{ string }}::jsonb #>> '{ {%- for s in string_path -%}{{ s }}{%- if not loop.last -%},{%- endif -%}{%- endfor -%} }'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is incredibly small, but we should call this out as a change in the CHANGELOG. Also, I am fairly certain this won't be breaking, but can you confirm that this datatype change won't cause any datatype changes for Postgres users? I tested myself and didn't see any issues when running prod then dev right after, but just want to double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to under the hood. I also tested this and it would make sense there is no issue since the end results in either version are strings, and the cast to json is an intermediate step.

else null end

{% endmacro %}
Expand Down
35 changes: 9 additions & 26 deletions macros/fivetran_log_lookback.sql
Original file line number Diff line number Diff line change
@@ -1,35 +1,18 @@
{% macro fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}
{% macro fivetran_log_lookback(from_date, datepart='day', interval=7, safety_date='2010-01-01') %}

{{ adapter.dispatch('fivetran_log_lookback', 'fivetran_log') (from_date, datepart='day', interval=7, default_start_date='2010-01-01') }}
{{ adapter.dispatch('fivetran_log_lookback', 'fivetran_log') (from_date, datepart='day', interval=7, safety_date='2010-01-01') }}

{%- endmacro %}

{% macro default__fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}
{% macro default__fivetran_log_lookback(from_date, datepart='day', interval=7, safety_date='2010-01-01') %}

coalesce(
(select {{ dbt.dateadd(datepart=datepart, interval=-interval, from_date_or_timestamp=from_date) }}
from {{ this }}),
{{ "'" ~ default_start_date ~ "'" }}
)
{% set sql_statement %}
select coalesce({{ from_date }}, {{ "'" ~ safety_date ~ "'" }})
from {{ this }}
{%- endset -%}

{% endmacro %}
{%- set result = dbt_utils.get_single_value(sql_statement) %}

{% macro bigquery__fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}

-- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs
{%- call statement('date_agg', fetch_result=True) -%}
select {{ from_date }} from {{ this }}
{%- endcall -%}

-- load the result from the above query into a new variable
{%- set query_result = load_result('date_agg') -%}

-- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value.
{%- set date_agg = query_result['data'][0][0] %}

coalesce(
{{ dbt.dateadd(datepart='day', interval=-7, from_date_or_timestamp="'" ~ date_agg ~ "'") }},
{{ "'" ~ default_start_date ~ "'" }}
)
{{ dbt.dateadd(datepart=datepart, interval=-interval, from_date_or_timestamp="cast('" ~ result ~ "' as date)") }}

{% endmacro %}
39 changes: 28 additions & 11 deletions models/fivetran_platform__audit_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,39 @@
file_format='delta' if is_databricks_sql_warehouse(target) else 'parquet'
) }}

with sync_log as (
with base as (

select
*,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name
select *
from {{ ref('stg_fivetran_platform__log') }}
where event_subtype in ('sync_start', 'sync_end', 'write_to_table_start', 'write_to_table_end', 'records_modified')

{% if is_incremental() %}

and cast(created_at as date) > {{ fivetran_log.fivetran_log_lookback(from_date='max(sync_start_day)', interval=7) }}

{% endif %}
),

sync_log as (
select
*,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
cast(null as {{ dbt.type_string() }}) as schema_name,
cast(null as {{ dbt.type_string() }}) as operation_type,
cast(null as {{ dbt.type_bigint() }}) as row_count
from base
where event_subtype in ('sync_start', 'sync_end', 'write_to_table_start', 'write_to_table_end')

union all

select
*,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }} as schema_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['operationType']) }} as operation_type,
cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_bigint() }}) as row_count
from base
where event_subtype = 'records_modified'
),

connector as (

select *
Expand Down Expand Up @@ -80,13 +98,12 @@ records_modified_log as (
select
connector_id,
created_at,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }} as schema_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['operationType']) }} as operation_type,
cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_bigint() }}) as row_count
table_name,
schema_name,
operation_type,
row_count
from sync_log
where event_subtype = 'records_modified'

),

sum_records_modified as (
Expand Down
2 changes: 1 addition & 1 deletion models/fivetran_platform__audit_user_activity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ with logs as (

select
*,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path='actor') }} as actor_email
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['actor']) }} as actor_email
from {{ ref('stg_fivetran_platform__log') }}
where lower(message_data) like '%actor%'
),
Expand Down
2 changes: 1 addition & 1 deletion models/staging/stg_fivetran_platform__log.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ final as (
)

select *
from final
from final