Skip to content

Commit

Permalink
refactor(dask): use centralized function for Dask component names (#613)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Jan 23, 2025
1 parent 51fad95 commit f2aaa0e
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 29 deletions.
11 changes: 5 additions & 6 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
calculate_hash_of_dir,
calculate_job_input_hash,
build_unique_component_name,
get_dask_component_name,
)
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, RunStatus, Service
Expand Down Expand Up @@ -322,26 +323,24 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
name=get_dask_component_name(workflow.id_, "cluster"),
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=f"reana-run-dask-{workflow.id_}",
)

if DASK_AUTOSCALER_ENABLED:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
name=get_dask_component_name(workflow.id_, "autoscaler"),
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)

delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
delete_dask_dashboard_ingress(workflow.id_)

dask_service = (
Session.query(Service)
.filter_by(name=f"dask-dashboard-{workflow.id_}")
.filter_by(name=get_dask_component_name(workflow.id_, "database_model_service"))
.one_or_none()
)
workflow.services.remove(dask_service)
Expand Down
19 changes: 13 additions & 6 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
get_reana_shared_volume,
)
from reana_commons.job_utils import kubernetes_memory_to_bytes
from reana_commons.utils import get_dask_component_name

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.k8s import create_dask_dashboard_ingress
Expand All @@ -39,7 +40,7 @@ class DaskResourceManager:

def __init__(
self,
cluster_name,
workflow_id,
workflow_spec,
workflow_workspace,
user_id,
Expand All @@ -57,10 +58,9 @@ def __init__(
:param user_id: Id of the user
:type user_id: str
"""
self.cluster_name = cluster_name
self.cluster_name = get_dask_component_name(workflow_id, "cluster")
self.num_of_workers = num_of_workers
self.single_worker_memory = single_worker_memory
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.workflow_spec = workflow_spec
self.workflow_workspace = workflow_workspace
self.workflow_id = workflow_workspace.split("/")[-1]
Expand All @@ -69,7 +69,7 @@ def __init__(
self.cluster_spec = workflow_spec.get("resources", {}).get("dask", [])
self.cluster_body = self._load_dask_cluster_template()
self.cluster_image = self.cluster_spec["image"]
self.dask_scheduler_uri = f"{self.cluster_name}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786"
self.dask_scheduler_uri = get_dask_scheduler_uri(workflow_id)

self.secrets_store = UserSecretsStore.fetch(self.user_id)
self.secret_env_vars = self.secrets_store.get_env_secrets_as_k8s_spec()
Expand All @@ -79,7 +79,7 @@ def __init__(
self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID

if DASK_AUTOSCALER_ENABLED:
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.autoscaler_name = get_dask_component_name(workflow_id, "autoscaler")
self.autoscaler_body = self._load_dask_autoscaler_template()

def _load_dask_cluster_template(self):
Expand Down Expand Up @@ -115,7 +115,7 @@ def create_dask_resources(self):
self._prepare_autoscaler()
self._create_dask_autoscaler()

create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)
create_dask_dashboard_ingress(self.workflow_id)

def _prepare_cluster(self):
"""Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers."""
Expand All @@ -128,6 +128,8 @@ def _prepare_cluster(self):
# Add the name of the cluster, used in scheduler service name
self.cluster_body["metadata"] = {"name": self.cluster_name}

# self.cluster_body["spec"]["worker"]["spec"]["metadata"] = {"name": "amcik"}

self.cluster_body["spec"]["scheduler"]["service"]["selector"][
"dask.org/cluster-name"
] = self.cluster_name
Expand Down Expand Up @@ -516,3 +518,8 @@ def requires_dask(workflow):
return bool(
workflow.reana_specification["workflow"].get("resources", {}).get("dask", False)
)


def get_dask_scheduler_uri(workflow_id):
"""Get Dask scheduler uri."""
return f"reana-dask-{workflow_id}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786"
21 changes: 13 additions & 8 deletions reana_workflow_controller/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
get_k8s_cvmfs_volumes,
get_workspace_volume,
)
from reana_commons.utils import get_dask_component_name

from reana_workflow_controller.config import ( # isort:skip
JUPYTER_INTERACTIVE_SESSION_DEFAULT_PORT,
Expand Down Expand Up @@ -404,14 +405,16 @@ def delete_k8s_ingress_object(ingress_name, namespace):
)


def create_dask_dashboard_ingress(cluster_name, workflow_id):
def create_dask_dashboard_ingress(workflow_id):
"""Create K8S Ingress object for Dask dashboard."""
# Define the middleware spec
middleware_spec = {
"apiVersion": "traefik.io/v1alpha1",
"kind": "Middleware",
"metadata": {
"name": f"replacepath-{workflow_id}",
"name": get_dask_component_name(
workflow_id, "dashboard_ingress_middleware"
),
"namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE,
},
"spec": {
Expand All @@ -426,10 +429,10 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
api_version="networking.k8s.io/v1",
kind="Ingress",
metadata=client.V1ObjectMeta(
name=f"dask-dashboard-ingress-{cluster_name}",
name=get_dask_component_name(workflow_id, "dashboard_ingress"),
annotations={
**REANA_INGRESS_ANNOTATIONS,
"traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-replacepath-{workflow_id}@kubernetescrd",
"traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-{get_dask_component_name(workflow_id, 'dashboard_ingress_middleware')}@kubernetescrd",
},
),
spec=client.V1IngressSpec(
Expand All @@ -443,7 +446,9 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
path_type="Prefix",
backend=client.V1IngressBackend(
service=client.V1IngressServiceBackend(
name=f"{cluster_name}-scheduler",
name=get_dask_component_name(
workflow_id, "dashboard_service"
),
port=client.V1ServiceBackendPort(number=8787),
)
),
Expand Down Expand Up @@ -471,10 +476,10 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id):
)


def delete_dask_dashboard_ingress(cluster_name, workflow_id):
def delete_dask_dashboard_ingress(workflow_id):
"""Delete K8S Ingress Object for Dask dashboard."""
current_k8s_networking_api_client.delete_namespaced_ingress(
name=cluster_name,
name=get_dask_component_name(workflow_id, "dashboard_ingress"),
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
body=client.V1DeleteOptions(),
)
Expand All @@ -483,7 +488,7 @@ def delete_dask_dashboard_ingress(cluster_name, workflow_id):
version="v1alpha1",
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
plural="middlewares",
name=f"replacepath-{workflow_id}",
name=get_dask_component_name(workflow_id, "dashboard_ingress_middleware"),
)


Expand Down
5 changes: 3 additions & 2 deletions reana_workflow_controller/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
from opensearchpy import OpenSearch

from reana_commons.utils import get_dask_component_name
from reana_workflow_controller.config import (
REANA_OPENSEARCH_CA_CERTS,
REANA_OPENSEARCH_HOST,
Expand Down Expand Up @@ -188,7 +189,7 @@ def fetch_dask_scheduler_logs(self, workflow_id: str) -> str | None:
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
self.dask_log_matcher: get_dask_component_name(workflow_id, "cluster"),
"kubernetes.labels.dask.org/component": "scheduler",
},
)
Expand All @@ -205,7 +206,7 @@ def fetch_dask_worker_logs(self, workflow_id: str) -> str | None:
id=None,
index=self.dask_index,
matches={
self.dask_log_matcher: f"reana-run-dask-{workflow_id}",
self.dask_log_matcher: get_dask_component_name(workflow_id, "cluster"),
"kubernetes.labels.dask.org/component": "worker",
},
)
Expand Down
8 changes: 4 additions & 4 deletions reana_workflow_controller/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from webargs import fields, validate
from webargs.flaskparser import use_args, use_kwargs
from reana_commons.config import WORKFLOW_TIME_FORMAT
from reana_commons.utils import build_unique_component_name
from reana_commons.utils import build_unique_component_name, get_dask_component_name
from reana_db.database import Session
from reana_db.models import (
RunStatus,
Expand Down Expand Up @@ -413,7 +413,7 @@ def get_workflows(args, paginate=None): # noqa
dask_service = workflow.services.first()
if dask_service and dask_service.status == ServiceStatus.created:
pod_status = check_pod_status_by_prefix(
pod_name_prefix=f"reana-run-dask-{workflow.id_}"
pod_name_prefix=get_dask_component_name(workflow.id_, "cluster")
)

if pod_status == "Running":
Expand Down Expand Up @@ -627,12 +627,12 @@ def create_workflow(): # noqa
)
if requires_dask(workflow):
dask_service = Service(
name=f"dask-dashboard-{workflow_uuid}",
name=get_dask_component_name(workflow.id_, "database_model_service"),
uri=f"https://{REANA_HOSTNAME}/{workflow_uuid}/dashboard/status",
type_=ServiceType.dask,
status=ServiceStatus.created,
owner_id=request.args["user"],
)

workflow.services.append(dask_service)

Session.add(workflow)
Expand Down
11 changes: 8 additions & 3 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@

from reana_workflow_controller.errors import REANAInteractiveSessionError

from reana_workflow_controller.dask import DaskResourceManager, requires_dask
from reana_workflow_controller.dask import (
DaskResourceManager,
requires_dask,
get_dask_scheduler_uri,
)

from reana_workflow_controller.k8s import (
build_interactive_k8s_objects,
Expand Down Expand Up @@ -374,9 +378,10 @@ def start_batch_workflow_run(

try:
# Create the dask cluster and required resources

if requires_dask(self.workflow):
DaskResourceManager(
cluster_name=f"reana-run-dask-{self.workflow.id_}",
workflow_id=self.workflow.id_,
workflow_spec=self.workflow.reana_specification["workflow"],
workflow_workspace=self.workflow.workspace_path,
user_id=self.workflow.owner_id,
Expand Down Expand Up @@ -759,7 +764,7 @@ def _create_job_spec(
job_controller_container.env.append(
{
"name": "DASK_SCHEDULER_URI",
"value": f"reana-run-dask-{self.workflow.id_}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786",
"value": get_dask_scheduler_uri(self.workflow.id_),
},
)

Expand Down

0 comments on commit f2aaa0e

Please sign in to comment.