Skip to content

Commit

Permalink
Merge pull request #99 from seung-lab/split_synaptor_queue
Browse files Browse the repository at this point in the history
Split synaptor queue

Split synaptor queue into multiple queues, one for each worker groups. This prevent wrong types of workers still other workers' tasks and causing the dag to go out of sync.
  • Loading branch information
ranlu authored Jul 2, 2024
2 parents fc4beb3 + 15d663e commit 626eee0
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 10 deletions.
2 changes: 2 additions & 0 deletions dags/cluster_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def estimate_optimal_number_of_workers(cluster, cluster_info):
else:
if cluster == "gpu":
num_tasks = check_queue(queue="chunkflow")
elif cluster.startswith("synaptor"):
num_tasks = check_queue(queue=f"{cluster}-tasks")
else:
num_tasks = check_queue(queue=cluster)
num_workers = estimate_worker_instances(num_tasks, cluster_info)
Expand Down
20 changes: 13 additions & 7 deletions dags/synaptor_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow import DAG
from airflow.models import Variable, BaseOperator

from helper_ops import scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op
from helper_ops import placeholder_op, scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op
from param_default import synaptor_param_default, default_synaptor_image
from synaptor_ops import manager_op, drain_op, self_destruct_op
from synaptor_ops import synaptor_op, wait_op, generate_op, nglink_op
Expand Down Expand Up @@ -92,10 +92,10 @@ def __init__(self, name):

def fill_dag(dag: DAG, tasklist: list[Task], collect_metrics: bool = True) -> DAG:
"""Fills a synaptor DAG from a list of Tasks."""
drain = drain_op(dag)
drain_tasks = [drain_op(dag, task_queue_name=f"synaptor-{t}-tasks") for t in ["cpu", "gpu", "seggraph"]]
init_cloudvols = manager_op(dag, "init_cloudvols", image=SYNAPTOR_IMAGE)

drain >> init_cloudvols
start_nfs_server >> drain_tasks >> init_cloudvols

curr_operator = init_cloudvols
if WORKFLOW_PARAMS.get("workspacetype", "File") == "Database":
Expand All @@ -105,7 +105,7 @@ def fill_dag(dag: DAG, tasklist: list[Task], collect_metrics: bool = True) -> DA

if collect_metrics:
metrics = collect_metrics_op(dag)
metrics >> drain
metrics >> drain_tasks

curr_cluster = ""
for task in tasklist:
Expand Down Expand Up @@ -167,6 +167,7 @@ def change_cluster_if_required(
i,
image=SYNAPTOR_IMAGE,
op_queue_name=next_task.cluster_key,
task_queue_name=f"{next_task.cluster_key}-tasks",
tag=new_tag,
use_gpus=next_task.use_gpus
)
Expand Down Expand Up @@ -233,12 +234,17 @@ def add_task(
elif task.cluster_key == "manager":
generate = manager_op(dag, task.name, image=SYNAPTOR_IMAGE)
else:
generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, tag=tag)
generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, tag=tag, task_queue_name=f"{task.cluster_key}-tasks")

if tag:
wait = wait_op(dag, f"{task.name}_{tag}")
wait_task_name = f"{task.name}_{tag}"
else:
wait = wait_op(dag, task.name)
wait_task_name = task.name

if task.cluster_key == "manager":
wait = placeholder_op(dag, wait_task_name)
else:
wait = wait_op(dag, wait_task_name, task_queue_name=f"{task.cluster_key}-tasks")

prev_operator >> generate >> wait

Expand Down
6 changes: 3 additions & 3 deletions dags/synaptor_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def drain_op(
"""Drains leftover messages from the RabbitMQ."""

return PythonOperator(
task_id="drain_messages",
task_id=f"drain_messages_{task_queue_name}",
python_callable=drain_messages,
priority_weight=100_000,
op_args=(airflow_broker_url, task_queue_name),
Expand Down Expand Up @@ -317,12 +317,12 @@ def synaptor_op(
)


def wait_op(dag: DAG, taskname: str) -> PythonOperator:
def wait_op(dag: DAG, taskname: str, task_queue_name: Optional[str] = TASK_QUEUE_NAME) -> PythonOperator:
"""Waits for a task to finish."""
return PythonOperator(
task_id=f"wait_for_queue_{taskname}",
python_callable=check_queue,
op_args=(TASK_QUEUE_NAME,),
op_args=(task_queue_name,),
priority_weight=100_000,
weight_rule=WeightRule.ABSOLUTE,
on_success_callback=task_done_alert,
Expand Down
3 changes: 3 additions & 0 deletions slackbot/cancel_run_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def cancel_run(msg):
drain_messages(broker_url, "custom-gpu")
drain_messages(broker_url, "chunkflow")
drain_messages(broker_url, "synaptor")
drain_messages(broker_url, "synaptor-cpu-tasks")
drain_messages(broker_url, "synaptor-gpu-tasks")
drain_messages(broker_url, "synaptor-seggraph-tasks")
drain_messages(broker_url, "deepem-gpu")
time.sleep(10)

Expand Down

0 comments on commit 626eee0

Please sign in to comment.