diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 7f59b6ba..6b9ccc5e 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -31,6 +31,7 @@ calculate_hash_of_dir, calculate_job_input_hash, build_unique_component_name, + get_dask_component_name, ) from reana_db.database import Session from reana_db.models import Job, JobCache, Workflow, RunStatus, Service @@ -322,8 +323,8 @@ def _delete_dask_cluster(workflow: Workflow) -> None: group="kubernetes.dask.org", version="v1", plural="daskclusters", + name=get_dask_component_name(workflow.id_, "cluster"), namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - name=f"reana-run-dask-{workflow.id_}", ) if DASK_AUTOSCALER_ENABLED: @@ -331,17 +332,15 @@ def _delete_dask_cluster(workflow: Workflow) -> None: group="kubernetes.dask.org", version="v1", plural="daskautoscalers", + name=get_dask_component_name(workflow.id_, "autoscaler"), namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - name=f"dask-autoscaler-reana-run-dask-{workflow.id_}", ) - delete_dask_dashboard_ingress( - f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_ - ) + delete_dask_dashboard_ingress(workflow.id_) dask_service = ( Session.query(Service) - .filter_by(name=f"dask-dashboard-{workflow.id_}") + .filter_by(name=get_dask_component_name(workflow.id_, "db_service")) .one_or_none() ) workflow.services.remove(dask_service) diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py index 0d393707..159fc782 100644 --- a/reana_workflow_controller/dask.py +++ b/reana_workflow_controller/dask.py @@ -29,6 +29,7 @@ get_reana_shared_volume, ) from reana_commons.job_utils import kubernetes_memory_to_bytes +from reana_commons.utils import get_dask_component_name from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED from reana_workflow_controller.k8s import create_dask_dashboard_ingress @@ -39,7 +40,7 @@ class DaskResourceManager: def __init__( self, - cluster_name, + workflow_id, workflow_spec, workflow_workspace, user_id, @@ -57,10 +58,9 @@ def __init__( :param user_id: Id of the user :type user_id: str """ - self.cluster_name = cluster_name + self.cluster_name = get_dask_component_name(workflow_id, "cluster") self.num_of_workers = num_of_workers self.single_worker_memory = single_worker_memory - self.autoscaler_name = f"dask-autoscaler-{cluster_name}" self.workflow_spec = workflow_spec self.workflow_workspace = workflow_workspace self.workflow_id = workflow_workspace.split("/")[-1] @@ -69,7 +69,7 @@ def __init__( self.cluster_spec = workflow_spec.get("resources", {}).get("dask", []) self.cluster_body = self._load_dask_cluster_template() self.cluster_image = self.cluster_spec["image"] - self.dask_scheduler_uri = f"{self.cluster_name}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786" + self.dask_scheduler_uri = get_dask_scheduler_uri(workflow_id) self.secrets_store = UserSecretsStore.fetch(self.user_id) self.secret_env_vars = self.secrets_store.get_env_secrets_as_k8s_spec() @@ -79,7 +79,7 @@ def __init__( self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID if DASK_AUTOSCALER_ENABLED: - self.autoscaler_name = f"dask-autoscaler-{cluster_name}" + self.autoscaler_name = get_dask_component_name(workflow_id, "autoscaler") self.autoscaler_body = self._load_dask_autoscaler_template() def _load_dask_cluster_template(self): @@ -115,7 +115,7 @@ def create_dask_resources(self): self._prepare_autoscaler() self._create_dask_autoscaler() - create_dask_dashboard_ingress(self.cluster_name, self.workflow_id) + create_dask_dashboard_ingress(self.workflow_id) def _prepare_cluster(self): """Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers.""" @@ -128,6 +128,8 @@ def _prepare_cluster(self): # Add the name of the cluster, used in scheduler service name self.cluster_body["metadata"] = {"name": self.cluster_name} + # self.cluster_body["spec"]["worker"]["spec"]["metadata"] = {"name": "amcik"} + self.cluster_body["spec"]["scheduler"]["service"]["selector"][ "dask.org/cluster-name" ] = self.cluster_name @@ -516,3 +518,8 @@ def requires_dask(workflow): return bool( workflow.reana_specification["workflow"].get("resources", {}).get("dask", False) ) + + +def get_dask_scheduler_uri(workflow_id): + """Get Dask scheduler uri.""" + return f"reana-dask-{workflow_id}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786" diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index aab726ff..a50f10dd 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -24,6 +24,7 @@ get_k8s_cvmfs_volumes, get_workspace_volume, ) +from reana_commons.utils import get_dask_component_name from reana_workflow_controller.config import ( # isort:skip JUPYTER_INTERACTIVE_SESSION_DEFAULT_PORT, @@ -404,14 +405,16 @@ def delete_k8s_ingress_object(ingress_name, namespace): ) -def create_dask_dashboard_ingress(cluster_name, workflow_id): +def create_dask_dashboard_ingress(workflow_id): """Create K8S Ingress object for Dask dashboard.""" # Define the middleware spec middleware_spec = { "apiVersion": "traefik.io/v1alpha1", "kind": "Middleware", "metadata": { - "name": f"replacepath-{workflow_id}", + "name": get_dask_component_name( + workflow_id, "dashboard_ingress_middleware" + ), "namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE, }, "spec": { @@ -426,10 +429,10 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id): api_version="networking.k8s.io/v1", kind="Ingress", metadata=client.V1ObjectMeta( - name=f"dask-dashboard-ingress-{cluster_name}", + name=get_dask_component_name(workflow_id, "dashboard_ingress"), annotations={ **REANA_INGRESS_ANNOTATIONS, - "traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-replacepath-{workflow_id}@kubernetescrd", + "traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-{get_dask_component_name(workflow_id, 'dashboard_ingress_middleware')}@kubernetescrd", }, ), spec=client.V1IngressSpec( @@ -443,7 +446,9 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id): path_type="Prefix", backend=client.V1IngressBackend( service=client.V1IngressServiceBackend( - name=f"{cluster_name}-scheduler", + name=get_dask_component_name( + workflow_id, "dashboard_service" + ), port=client.V1ServiceBackendPort(number=8787), ) ), @@ -471,10 +476,10 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id): ) -def delete_dask_dashboard_ingress(cluster_name, workflow_id): +def delete_dask_dashboard_ingress(workflow_id): """Delete K8S Ingress Object for Dask dashboard.""" current_k8s_networking_api_client.delete_namespaced_ingress( - name=cluster_name, + name=get_dask_component_name(workflow_id, "dashboard_ingress"), namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, body=client.V1DeleteOptions(), ) @@ -483,7 +488,7 @@ def delete_dask_dashboard_ingress(cluster_name, workflow_id): version="v1alpha1", namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, plural="middlewares", - name=f"replacepath-{workflow_id}", + name=get_dask_component_name(workflow_id, "dashboard_ingress_middleware"), ) diff --git a/reana_workflow_controller/opensearch.py b/reana_workflow_controller/opensearch.py index 8acc17b3..fb509a00 100644 --- a/reana_workflow_controller/opensearch.py +++ b/reana_workflow_controller/opensearch.py @@ -11,6 +11,7 @@ import logging from opensearchpy import OpenSearch +from reana_commons.utils import get_dask_component_name from reana_workflow_controller.config import ( REANA_OPENSEARCH_CA_CERTS, REANA_OPENSEARCH_HOST, @@ -188,7 +189,7 @@ def fetch_dask_scheduler_logs(self, workflow_id: str) -> str | None: id=None, index=self.dask_index, matches={ - self.dask_log_matcher: f"reana-run-dask-{workflow_id}", + self.dask_log_matcher: get_dask_component_name(workflow_id, "cluster"), "kubernetes.labels.dask.org/component": "scheduler", }, ) @@ -205,7 +206,7 @@ def fetch_dask_worker_logs(self, workflow_id: str) -> str | None: id=None, index=self.dask_index, matches={ - self.dask_log_matcher: f"reana-run-dask-{workflow_id}", + self.dask_log_matcher: get_dask_component_name(workflow_id, "cluster"), "kubernetes.labels.dask.org/component": "worker", }, ) diff --git a/reana_workflow_controller/rest/workflows.py b/reana_workflow_controller/rest/workflows.py index f77966a5..8324df0a 100644 --- a/reana_workflow_controller/rest/workflows.py +++ b/reana_workflow_controller/rest/workflows.py @@ -22,7 +22,7 @@ from webargs import fields, validate from webargs.flaskparser import use_args, use_kwargs from reana_commons.config import WORKFLOW_TIME_FORMAT -from reana_commons.utils import build_unique_component_name +from reana_commons.utils import build_unique_component_name, get_dask_component_name from reana_db.database import Session from reana_db.models import ( RunStatus, @@ -413,7 +413,7 @@ def get_workflows(args, paginate=None): # noqa dask_service = workflow.services.first() if dask_service and dask_service.status == ServiceStatus.created: pod_status = check_pod_status_by_prefix( - pod_name_prefix=f"reana-run-dask-{workflow.id_}" + pod_name_prefix=get_dask_component_name(workflow.id_, "cluster") ) if pod_status == "Running": @@ -627,12 +627,12 @@ def create_workflow(): # noqa ) if requires_dask(workflow): dask_service = Service( - name=f"dask-dashboard-{workflow_uuid}", + name=get_dask_component_name(workflow.id_, "db_service"), uri=f"https://{REANA_HOSTNAME}/{workflow_uuid}/dashboard/status", type_=ServiceType.dask, status=ServiceStatus.created, - owner_id=request.args["user"], ) + workflow.services.append(dask_service) Session.add(workflow) diff --git a/reana_workflow_controller/templates/dask_cluster.yaml b/reana_workflow_controller/templates/dask_cluster.yaml index 12098eb9..6b97b1bd 100644 --- a/reana_workflow_controller/templates/dask_cluster.yaml +++ b/reana_workflow_controller/templates/dask_cluster.yaml @@ -9,11 +9,13 @@ spec: imagePullPolicy: "IfNotPresent" command: ["/bin/sh", "-c"] args: - - exec dask-worker --name $(DASK_WORKER_NAME) --dashboard --dashboard-address 8788 + - exec dask-worker --dashboard --dashboard-address 8788 ports: - name: http-dashboard containerPort: 8788 protocol: TCP + metadata: + generateName: dask-hello-world- scheduler: spec: containers: diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index fb98e962..bdac16a9 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -52,7 +52,11 @@ from reana_workflow_controller.errors import REANAInteractiveSessionError -from reana_workflow_controller.dask import DaskResourceManager, requires_dask +from reana_workflow_controller.dask import ( + DaskResourceManager, + requires_dask, + get_dask_scheduler_uri, +) from reana_workflow_controller.k8s import ( build_interactive_k8s_objects, @@ -374,9 +378,10 @@ def start_batch_workflow_run( try: # Create the dask cluster and required resources + if requires_dask(self.workflow): DaskResourceManager( - cluster_name=f"reana-run-dask-{self.workflow.id_}", + workflow_id=self.workflow.id_, workflow_spec=self.workflow.reana_specification["workflow"], workflow_workspace=self.workflow.workspace_path, user_id=self.workflow.owner_id, @@ -759,7 +764,7 @@ def _create_job_spec( job_controller_container.env.append( { "name": "DASK_SCHEDULER_URI", - "value": f"reana-run-dask-{self.workflow.id_}-scheduler.{REANA_RUNTIME_KUBERNETES_NAMESPACE}.svc.cluster.local:8786", + "value": get_dask_scheduler_uri(self.workflow.id_), }, )