Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split synaptor queue #99

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dags/cluster_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def estimate_optimal_number_of_workers(cluster, cluster_info):
else:
if cluster == "gpu":
num_tasks = check_queue(queue="chunkflow")
elif cluster.startswith("synaptor"):
num_tasks = check_queue(queue=f"{cluster}-tasks")
else:
num_tasks = check_queue(queue=cluster)
num_workers = estimate_worker_instances(num_tasks, cluster_info)
Expand Down
20 changes: 13 additions & 7 deletions dags/synaptor_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow import DAG
from airflow.models import Variable, BaseOperator

from helper_ops import scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op
from helper_ops import placeholder_op, scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op
from param_default import synaptor_param_default, default_synaptor_image
from synaptor_ops import manager_op, drain_op, self_destruct_op
from synaptor_ops import synaptor_op, wait_op, generate_op, nglink_op
Expand Down Expand Up @@ -92,10 +92,10 @@ def __init__(self, name):

def fill_dag(dag: DAG, tasklist: list[Task], collect_metrics: bool = True) -> DAG:
"""Fills a synaptor DAG from a list of Tasks."""
drain = drain_op(dag)
drain_tasks = [drain_op(dag, task_queue_name=f"synaptor-{t}-tasks") for t in ["cpu", "gpu", "seggraph"]]
init_cloudvols = manager_op(dag, "init_cloudvols", image=SYNAPTOR_IMAGE)

drain >> init_cloudvols
start_nfs_server >> drain_tasks >> init_cloudvols

curr_operator = init_cloudvols
if WORKFLOW_PARAMS.get("workspacetype", "File") == "Database":
Expand All @@ -105,7 +105,7 @@ def fill_dag(dag: DAG, tasklist: list[Task], collect_metrics: bool = True) -> DA

if collect_metrics:
metrics = collect_metrics_op(dag)
metrics >> drain
metrics >> drain_tasks

curr_cluster = ""
for task in tasklist:
Expand Down Expand Up @@ -167,6 +167,7 @@ def change_cluster_if_required(
i,
image=SYNAPTOR_IMAGE,
op_queue_name=next_task.cluster_key,
task_queue_name=f"{next_task.cluster_key}-tasks",
tag=new_tag,
use_gpus=next_task.use_gpus
)
Expand Down Expand Up @@ -233,12 +234,17 @@ def add_task(
elif task.cluster_key == "manager":
generate = manager_op(dag, task.name, image=SYNAPTOR_IMAGE)
else:
generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, tag=tag)
generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, tag=tag, task_queue_name=f"{task.cluster_key}-tasks")

if tag:
wait = wait_op(dag, f"{task.name}_{tag}")
wait_task_name = f"{task.name}_{tag}"
else:
wait = wait_op(dag, task.name)
wait_task_name = task.name

if task.cluster_key == "manager":
wait = placeholder_op(dag, wait_task_name)
else:
wait = wait_op(dag, wait_task_name, task_queue_name=f"{task.cluster_key}-tasks")

prev_operator >> generate >> wait

Expand Down
6 changes: 3 additions & 3 deletions dags/synaptor_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def drain_op(
"""Drains leftover messages from the RabbitMQ."""

return PythonOperator(
task_id="drain_messages",
task_id=f"drain_messages_{task_queue_name}",
python_callable=drain_messages,
priority_weight=100_000,
op_args=(airflow_broker_url, task_queue_name),
Expand Down Expand Up @@ -317,12 +317,12 @@ def synaptor_op(
)


def wait_op(dag: DAG, taskname: str) -> PythonOperator:
def wait_op(dag: DAG, taskname: str, task_queue_name: Optional[str] = TASK_QUEUE_NAME) -> PythonOperator:
"""Waits for a task to finish."""
return PythonOperator(
task_id=f"wait_for_queue_{taskname}",
python_callable=check_queue,
op_args=(TASK_QUEUE_NAME,),
op_args=(task_queue_name,),
priority_weight=100_000,
weight_rule=WeightRule.ABSOLUTE,
on_success_callback=task_done_alert,
Expand Down
3 changes: 3 additions & 0 deletions slackbot/cancel_run_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def cancel_run(msg):
drain_messages(broker_url, "custom-gpu")
drain_messages(broker_url, "chunkflow")
drain_messages(broker_url, "synaptor")
drain_messages(broker_url, "synaptor-cpu-tasks")
drain_messages(broker_url, "synaptor-gpu-tasks")
drain_messages(broker_url, "synaptor-seggraph-tasks")
drain_messages(broker_url, "deepem-gpu")
time.sleep(10)

Expand Down