diff --git a/airflow/dags/mattermost_dags/transformation/deferred_merge.py b/airflow/dags/mattermost_dags/transformation/deferred_merge.py new file mode 100644 index 000000000..8e8225231 --- /dev/null +++ b/airflow/dags/mattermost_dags/transformation/deferred_merge.py @@ -0,0 +1,75 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator + +from mattermost_dags.airflow_utils import MATTERMOST_DATAWAREHOUSE_IMAGE, pod_defaults, pod_env_vars, send_alert +from mattermost_dags.kube_secrets import ( + SNOWFLAKE_ACCOUNT, + SNOWFLAKE_USER, + SNOWFLAKE_PASSWORD, + SNOWFLAKE_TRANSFORM_DATABASE, + SNOWFLAKE_TRANSFORM_ROLE, + SNOWFLAKE_TRANSFORM_LARGE_WAREHOUSE, +) + +# Load the env vars into a dict and set Secrets +env_vars = {**pod_env_vars, **{}} + +# Default arguments for the DAG +default_args = { + "depends_on_past": False, + "owner": "airflow", + "on_failure_callback": send_alert, + "sla": timedelta(hours=8), + "start_date": datetime(2019, 1, 1, 0, 0, 0), +} + +doc_md = """ +### Deferred merge + +#### Purpose +This DAG triggers deferred merge. It merges the event delta table into the base table. + +This DAG does not have a schedule. It can be triggered manually in Aiflow UI. +""" + + +with DAG( + "manual_deferred_merge", + default_args=default_args, + catchup=False, + max_active_runs=1, # Don't allow multiple concurrent dag executions + doc_md=doc_md, +) as dag: + manual_deferred_merge = KubernetesPodOperator( + **pod_defaults, + image=MATTERMOST_DATAWAREHOUSE_IMAGE, # Uses latest build from master + task_id="deferred-merge", + name="deferred-merge", + secrets=[ + SNOWFLAKE_USER, + SNOWFLAKE_PASSWORD, + SNOWFLAKE_ACCOUNT, + SNOWFLAKE_TRANSFORM_ROLE, + SNOWFLAKE_TRANSFORM_LARGE_WAREHOUSE, + SNOWFLAKE_TRANSFORM_DATABASE, + ], + env_vars=env_vars, + arguments=[ + "snowflake " + " -a ${SNOWFLAKE_ACCOUNT}" + " -u ${SNOWFLAKE_USER}" + " -p ${SNOWFLAKE_PASSWORD}" + " -d ${SNOWFLAKE_TRANSFORM_DATABASE}" + " -s {{ var.value.rudderstack_support_schema }}" + " -w ${SNOWFLAKE_TRANSFORM_LARGE_WAREHOUSE}" + " -r ${SNOWFLAKE_TRANSFORM_ROLE}" + " merge {{ var.value.base_events_table }} " + " {{ var.value.base_events_delta_schema }} {{ var.value.base_events_delta_table }}" + ], + dag=dag, + ) + + +manual_deferred_merge diff --git a/tests/utils/test_post_docs_data.py b/tests/utils/test_post_docs_data.py index 8314b0274..7918ec060 100644 --- a/tests/utils/test_post_docs_data.py +++ b/tests/utils/test_post_docs_data.py @@ -11,7 +11,6 @@ class TestPostDocsJob: - # This test validates output from method `format_row` @pytest.mark.parametrize( @@ -34,7 +33,6 @@ def test_format_row(self, config_data, input_row, output_row): # This test validates that script runs and data is post successfully to mattermost channel def test_post_to_channel_success(self, config_data, responses, post_data_ok, mock_snowflake_connector): - mock_snowflake_connector('utils.post_docs_data') data = { "text": "| Feedback | Path |\n|---------------------|---------------------|\n" @@ -53,7 +51,6 @@ def test_post_to_channel_success(self, config_data, responses, post_data_ok, moc # This test validates that script runs but data is not post to mattermost channel due to some error def test_post_to_channel_error(self, config_data, responses, post_data_error, mock_snowflake_connector): - mock_snowflake_connector('utils.post_docs_data') with pytest.raises(ValueError) as error: post_docs_data.post_docs() diff --git a/utils/packets/loaders.py b/utils/packets/loaders.py index e9ec33368..d532b2714 100644 --- a/utils/packets/loaders.py +++ b/utils/packets/loaders.py @@ -98,7 +98,6 @@ def load_support_packet_file( support_packet_zip_file: str | os.PathLike, ) -> Tuple[SupportPacketMetadata, SupportPacketV1]: with ZipFile(support_packet_zip_file, 'r') as zipfile: - if SUPPORT_PACKET_METADATA_FILE in zipfile.namelist(): # Metadata might not be present in older versions of the support packet with zipfile.open(SUPPORT_PACKET_METADATA_FILE) as metadata_fp: