From aca4ed7225f8b4151a0b77fb66681bc01c9b0242 Mon Sep 17 00:00:00 2001 From: Ioannis Foukarakis Date: Mon, 1 Apr 2024 19:15:34 +0300 Subject: [PATCH] MM-56793: remove old ingestion code --- .../mattermost_dags/extract/push_proxy.py | 69 ------------------- 1 file changed, 69 deletions(-) delete mode 100644 airflow/dags/mattermost_dags/extract/push_proxy.py diff --git a/airflow/dags/mattermost_dags/extract/push_proxy.py b/airflow/dags/mattermost_dags/extract/push_proxy.py deleted file mode 100644 index 9045214a5..000000000 --- a/airflow/dags/mattermost_dags/extract/push_proxy.py +++ /dev/null @@ -1,69 +0,0 @@ -from datetime import datetime, timedelta - -from mattermost_dags.airflow_utils import MATTERMOST_DATAWAREHOUSE_IMAGE, pod_defaults, send_alert -from mattermost_dags.kube_secrets import ( - AWS_ACCOUNT_ID, - SNOWFLAKE_ACCOUNT, - SNOWFLAKE_LOAD_DATABASE, - SNOWFLAKE_LOAD_PASSWORD, - SNOWFLAKE_LOAD_USER, - SNOWFLAKE_LOAD_WAREHOUSE, -) - -from airflow import DAG -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator - -# Default arguments for the DAG -default_args = { - "depends_on_past": False, - "on_failure_callback": send_alert, - "owner": "airflow", - "retries": 0, - "retry_delay": timedelta(minutes=1), - "start_date": datetime(2019, 1, 1), -} - -# Set the command for the container -# Note the {{{{ }}}} is because we format this string but want the resulting string to just have {{ execution_date... }} -CMD_TEMPLATE = """ - python -m extract.s3_extract.push_proxy_job {} {{{{ execution_date.strftime("%Y/%m/%d") }}}} -""" - -# Create the DAG -dag = DAG( - "push_proxy", - default_args=default_args, - schedule="0 3 * * *", - catchup=False, - max_active_runs=1, # Don't allow multiple concurrent dag executions -) - - -def get_push_proxy_job(log_type, cmd): - return KubernetesPodOperator( - **pod_defaults, - image=MATTERMOST_DATAWAREHOUSE_IMAGE, # Uses latest build from master - task_id=f"push-proxy-{log_type}", - name=f"push-proxy-{log_type}", - secrets=[ - AWS_ACCOUNT_ID, - SNOWFLAKE_LOAD_USER, - SNOWFLAKE_LOAD_PASSWORD, - SNOWFLAKE_ACCOUNT, - SNOWFLAKE_LOAD_DATABASE, - SNOWFLAKE_LOAD_WAREHOUSE, - ], - env_vars={}, - arguments=[cmd], - dag=dag, - ) - - -job = None -for log_type in ["US", "TEST", "DE"]: - new_job = get_push_proxy_job(log_type.lower(), CMD_TEMPLATE.format(log_type)) - - if job is not None: - job >> new_job - - job = new_job