Skip to content

Commit

Permalink
MM-60600: add manual trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
ifoukarakis committed Oct 23, 2024
1 parent 3c2b1d3 commit c83a068
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 4 deletions.
75 changes: 75 additions & 0 deletions airflow/dags/mattermost_dags/transformation/deferred_merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from mattermost_dags.airflow_utils import MATTERMOST_DATAWAREHOUSE_IMAGE, pod_defaults, pod_env_vars, send_alert
from mattermost_dags.kube_secrets import (
SNOWFLAKE_ACCOUNT,
SNOWFLAKE_USER,
SNOWFLAKE_PASSWORD,
SNOWFLAKE_TRANSFORM_DATABASE,
SNOWFLAKE_TRANSFORM_ROLE,
SNOWFLAKE_TRANSFORM_LARGE_WAREHOUSE,
)

# Load the env vars into a dict and set Secrets
env_vars = {**pod_env_vars, **{}}

# Default arguments for the DAG
default_args = {
"depends_on_past": False,
"owner": "airflow",
"on_failure_callback": send_alert,
"sla": timedelta(hours=8),
"start_date": datetime(2019, 1, 1, 0, 0, 0),
}

doc_md = """
### Deferred merge
#### Purpose
This DAG triggers deferred merge. It merges the event delta table into the base table.
This DAG does not have a schedule. It can be triggered manually in Aiflow UI.
"""


with DAG(
"manual_deferred_merge",
default_args=default_args,
catchup=False,
max_active_runs=1, # Don't allow multiple concurrent dag executions
doc_md=doc_md,
) as dag:
manual_deferred_merge = KubernetesPodOperator(
**pod_defaults,
image=MATTERMOST_DATAWAREHOUSE_IMAGE, # Uses latest build from master
task_id="deferred-merge",
name="deferred-merge",
secrets=[
SNOWFLAKE_USER,
SNOWFLAKE_PASSWORD,
SNOWFLAKE_ACCOUNT,
SNOWFLAKE_TRANSFORM_ROLE,
SNOWFLAKE_TRANSFORM_LARGE_WAREHOUSE,
SNOWFLAKE_TRANSFORM_DATABASE,
],
env_vars=env_vars,
arguments=[
"snowflake "
" -a ${SNOWFLAKE_ACCOUNT}"
" -u ${SNOWFLAKE_USER}"
" -p ${SNOWFLAKE_PASSWORD}"
" -d ${SNOWFLAKE_TRANSFORM_DATABASE}"
" -s {{ var.value.rudderstack_support_schema }}"
" -w ${SNOWFLAKE_TRANSFORM_LARGE_WAREHOUSE}"
" -r ${SNOWFLAKE_TRANSFORM_ROLE}"
" merge {{ var.value.base_events_table }} "
" {{ var.value.base_events_delta_schema }} {{ var.value.base_events_delta_table }}"
],
dag=dag,
)


manual_deferred_merge
3 changes: 0 additions & 3 deletions tests/utils/test_post_docs_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@


class TestPostDocsJob:

# This test validates output from method `format_row`

@pytest.mark.parametrize(
Expand All @@ -34,7 +33,6 @@ def test_format_row(self, config_data, input_row, output_row):
# This test validates that script runs and data is post successfully to mattermost channel

def test_post_to_channel_success(self, config_data, responses, post_data_ok, mock_snowflake_connector):

mock_snowflake_connector('utils.post_docs_data')
data = {
"text": "| Feedback | Path |\n|---------------------|---------------------|\n"
Expand All @@ -53,7 +51,6 @@ def test_post_to_channel_success(self, config_data, responses, post_data_ok, moc
# This test validates that script runs but data is not post to mattermost channel due to some error

def test_post_to_channel_error(self, config_data, responses, post_data_error, mock_snowflake_connector):

mock_snowflake_connector('utils.post_docs_data')
with pytest.raises(ValueError) as error:
post_docs_data.post_docs()
Expand Down
1 change: 0 additions & 1 deletion utils/packets/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ def load_support_packet_file(
support_packet_zip_file: str | os.PathLike,
) -> Tuple[SupportPacketMetadata, SupportPacketV1]:
with ZipFile(support_packet_zip_file, 'r') as zipfile:

if SUPPORT_PACKET_METADATA_FILE in zipfile.namelist():
# Metadata might not be present in older versions of the support packet
with zipfile.open(SUPPORT_PACKET_METADATA_FILE) as metadata_fp:
Expand Down

0 comments on commit c83a068

Please sign in to comment.