From 6426637f8573c563cfa37cefba835de6487dcac0 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Tue, 2 Jul 2024 13:14:09 -0400 Subject: [PATCH 1/3] Split the synaptor queue The tasks for different workers are now send to synaptor-{cpu, gpu, seggraph}-tasks queues. This prevent the workers of the wrong type steal the tasks and causing the dag to go out of sync. --- dags/synaptor_dags.py | 20 +++++++++++++------- dags/synaptor_ops.py | 6 +++--- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/dags/synaptor_dags.py b/dags/synaptor_dags.py index 7d84617d..54e6ea5a 100644 --- a/dags/synaptor_dags.py +++ b/dags/synaptor_dags.py @@ -6,7 +6,7 @@ from airflow import DAG from airflow.models import Variable, BaseOperator -from helper_ops import scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op +from helper_ops import placeholder_op, scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op from param_default import synaptor_param_default, default_synaptor_image from synaptor_ops import manager_op, drain_op, self_destruct_op from synaptor_ops import synaptor_op, wait_op, generate_op, nglink_op @@ -92,10 +92,10 @@ def __init__(self, name): def fill_dag(dag: DAG, tasklist: list[Task], collect_metrics: bool = True) -> DAG: """Fills a synaptor DAG from a list of Tasks.""" - drain = drain_op(dag) + drain_tasks = [drain_op(dag, task_queue_name=f"synaptor-{t}-tasks") for t in ["cpu", "gpu", "seggraph"]] init_cloudvols = manager_op(dag, "init_cloudvols", image=SYNAPTOR_IMAGE) - drain >> init_cloudvols + start_nfs_server >> drain_tasks >> init_cloudvols curr_operator = init_cloudvols if WORKFLOW_PARAMS.get("workspacetype", "File") == "Database": @@ -105,7 +105,7 @@ def fill_dag(dag: DAG, tasklist: list[Task], collect_metrics: bool = True) -> DA if collect_metrics: metrics = collect_metrics_op(dag) - metrics >> drain + metrics >> drain_tasks curr_cluster = "" for task in tasklist: @@ -167,6 +167,7 @@ def change_cluster_if_required( i, image=SYNAPTOR_IMAGE, op_queue_name=next_task.cluster_key, + task_queue_name=f"{next_task.cluster_key}-tasks", tag=new_tag, use_gpus=next_task.use_gpus ) @@ -233,12 +234,17 @@ def add_task( elif task.cluster_key == "manager": generate = manager_op(dag, task.name, image=SYNAPTOR_IMAGE) else: - generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, tag=tag) + generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, tag=tag, task_queue_name=f"{task.cluster_key}-tasks") if tag: - wait = wait_op(dag, f"{task.name}_{tag}") + wait_task_name = f"{task.name}_{tag}" else: - wait = wait_op(dag, task.name) + wait_task_name = task.name + + if task.cluster_key == "manager": + wait = placeholder_op(dag, wait_task_name) + else: + wait = wait_op(dag, wait_task_name, task_queue_name=f"{task.cluster_key}-tasks") prev_operator >> generate >> wait diff --git a/dags/synaptor_ops.py b/dags/synaptor_ops.py index 61604cc6..0896e1c2 100644 --- a/dags/synaptor_ops.py +++ b/dags/synaptor_ops.py @@ -193,7 +193,7 @@ def drain_op( """Drains leftover messages from the RabbitMQ.""" return PythonOperator( - task_id="drain_messages", + task_id=f"drain_messages_{task_queue_name}", python_callable=drain_messages, priority_weight=100_000, op_args=(airflow_broker_url, task_queue_name), @@ -317,12 +317,12 @@ def synaptor_op( ) -def wait_op(dag: DAG, taskname: str) -> PythonOperator: +def wait_op(dag: DAG, taskname: str, task_queue_name: Optional[str] = TASK_QUEUE_NAME) -> PythonOperator: """Waits for a task to finish.""" return PythonOperator( task_id=f"wait_for_queue_{taskname}", python_callable=check_queue, - op_args=(TASK_QUEUE_NAME,), + op_args=(task_queue_name,), priority_weight=100_000, weight_rule=WeightRule.ABSOLUTE, on_success_callback=task_done_alert, From 1814272471ac80c1721a6de11cae703eac879cc1 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Tue, 2 Jul 2024 13:17:04 -0400 Subject: [PATCH 2/3] Estimate the number of synaptor workers based on the new queues --- dags/cluster_dag.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dags/cluster_dag.py b/dags/cluster_dag.py index 1152a894..33341830 100644 --- a/dags/cluster_dag.py +++ b/dags/cluster_dag.py @@ -61,6 +61,8 @@ def estimate_optimal_number_of_workers(cluster, cluster_info): else: if cluster == "gpu": num_tasks = check_queue(queue="chunkflow") + elif cluster.startswith("synaptor"): + num_tasks = check_queue(queue=f"{cluster}-tasks") else: num_tasks = check_queue(queue=cluster) num_workers = estimate_worker_instances(num_tasks, cluster_info) From 15d663e50458b4480a23fda0ad36098e1aad75c6 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Tue, 2 Jul 2024 13:19:20 -0400 Subject: [PATCH 3/3] Drain new synaptor queues when user cancels --- slackbot/cancel_run_commands.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/slackbot/cancel_run_commands.py b/slackbot/cancel_run_commands.py index 609a78e0..eadbfc72 100644 --- a/slackbot/cancel_run_commands.py +++ b/slackbot/cancel_run_commands.py @@ -64,6 +64,9 @@ def cancel_run(msg): drain_messages(broker_url, "custom-gpu") drain_messages(broker_url, "chunkflow") drain_messages(broker_url, "synaptor") + drain_messages(broker_url, "synaptor-cpu-tasks") + drain_messages(broker_url, "synaptor-gpu-tasks") + drain_messages(broker_url, "synaptor-seggraph-tasks") drain_messages(broker_url, "deepem-gpu") time.sleep(10)