diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go index 3ec550c83e0..9b7daab4d41 100644 --- a/ray-operator/apis/ray/v1/rayjob_types.go +++ b/ray-operator/apis/ray/v1/rayjob_types.go @@ -67,7 +67,7 @@ type DeletionPolicy string const ( DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster" DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers" - DeleteSelfDeltionPolicy DeletionPolicy = "DeleteSelf" + DeleteSelfDeletionPolicy DeletionPolicy = "DeleteSelf" DeleteNoneDeletionPolicy DeletionPolicy = "None" ) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index a1f4c9a5b40..db5adef490c 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -377,7 +377,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) case rayv1.DeleteWorkersDeletionPolicy: logger.Info("Scaling all worker replicas to 0", "RayCluster", rayJobInstance.Status.RayClusterName) _, err = r.scaleWorkerReplicasToZero(ctx, rayJobInstance) - case rayv1.DeleteSelfDeltionPolicy: + case rayv1.DeleteSelfDeletionPolicy: logger.Info("Deleting RayJob") err = r.Client.Delete(ctx, rayJobInstance) default: diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go index 856d8e8a7f3..2e57b85fb71 100644 --- a/ray-operator/controllers/ray/rayjob_controller_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_test.go @@ -30,6 +30,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/ray-project/kuberay/ray-operator/pkg/features" "github.com/ray-project/kuberay/ray-operator/test/support" batchv1 "k8s.io/api/batch/v1" @@ -835,4 +836,389 @@ var _ = Context("RayJob with different submission modes", func() { }) }) }) + + Describe("RayJob with DeletionPolicy=DeleteCluster", Ordered, func() { + features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true) + + ctx := context.Background() + namespace := "default" + rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deletecluster", namespace) + deletionPolicy := rayv1.DeleteClusterDeletionPolicy + rayJob.Spec.DeletionPolicy = &deletionPolicy + rayJob.Spec.ShutdownAfterJobFinishes = false + rayCluster := &rayv1.RayCluster{} + + It("Verify RayJob spec", func() { + Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteClusterDeletionPolicy)) + }) + + It("Create a RayJob custom resource", func() { + err := k8sClient.Create(ctx, rayJob) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob") + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name) + }) + + It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set. + Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty()) + Expect(rayJob.Status.JobId).NotTo(BeEmpty()) + Expect(rayJob.Status.StartTime).NotTo(BeNil()) + }) + + It("In Initializing state, the RayCluster should eventually be created.", func() { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Check whether RayCluster is consistent with RayJob's RayClusterSpec. + Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)) + Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion)) + + // TODO (kevin85421): Check the RayCluster labels and annotations. + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name)) + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD))) + }) + + It("Make RayCluster.Status.State to be rayv1.Ready", func() { + // The RayCluster is not 'Ready' yet because Pods are not running and ready. + Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 + + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + + // The RayCluster.Status.State should be Ready. + Eventually( + getClusterState(ctx, namespace, rayCluster.Name), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) + }) + + It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Running state, the RayJob's Status.DashboardURL must be set. + Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty()) + + // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + }) + + It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() { + // Update fake dashboard client to return job info with "Succeeded" status. + getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required + return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil + } + fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo) + defer fakeRayDashboardClient.GetJobInfoMock.Store(nil) + + // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed. + Consistently( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // Update the submitter Kubernetes Job to Complete. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + + // Update the submitter Kubernetes Job to Complete. + conditions := []batchv1.JobCondition{ + {Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + } + job.Status.Conditions = conditions + Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed()) + + // RayJob transitions to Complete. + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusComplete), "jobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + }) + + It("If DeletionPolicy=DeleteCluster, RayCluster should be deleted, but not the submitter Job.", func() { + Eventually( + func() bool { + return apierrors.IsNotFound(getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster)()) + }, + time.Second*3, time.Millisecond*500).Should(BeTrue()) + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + Consistently( + getResourceFunc(ctx, namespacedName, job), + time.Second*3, time.Millisecond*500).Should(BeNil()) + }) + }) + + Describe("RayJob with DeletionPolicy=DeleteWorkers", Ordered, func() { + features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true) + + ctx := context.Background() + namespace := "default" + rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deleteworkers", namespace) + deletionPolicy := rayv1.DeleteWorkersDeletionPolicy + rayJob.Spec.DeletionPolicy = &deletionPolicy + rayJob.Spec.ShutdownAfterJobFinishes = false + rayCluster := &rayv1.RayCluster{} + + It("Verify RayJob spec", func() { + Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteWorkersDeletionPolicy)) + }) + + It("Create a RayJob custom resource", func() { + err := k8sClient.Create(ctx, rayJob) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob") + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name) + }) + + It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set. + Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty()) + Expect(rayJob.Status.JobId).NotTo(BeEmpty()) + Expect(rayJob.Status.StartTime).NotTo(BeNil()) + }) + + It("In Initializing state, the RayCluster should eventually be created.", func() { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Check whether RayCluster is consistent with RayJob's RayClusterSpec. + Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)) + Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion)) + + // TODO (kevin85421): Check the RayCluster labels and annotations. + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name)) + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD))) + }) + + It("Make RayCluster.Status.State to be rayv1.Ready", func() { + // The RayCluster is not 'Ready' yet because Pods are not running and ready. + Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 + + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + + // The RayCluster.Status.State should be Ready. + Eventually( + getClusterState(ctx, namespace, rayCluster.Name), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) + }) + + It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Running state, the RayJob's Status.DashboardURL must be set. + Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty()) + + // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + }) + + It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() { + // Update fake dashboard client to return job info with "Succeeded" status. + getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required + return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil + } + fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo) + defer fakeRayDashboardClient.GetJobInfoMock.Store(nil) + + // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed. + Consistently( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // Update the submitter Kubernetes Job to Complete. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + + // Update the submitter Kubernetes Job to Complete. + conditions := []batchv1.JobCondition{ + {Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + } + job.Status.Conditions = conditions + Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed()) + + // RayJob transitions to Complete. + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusComplete), "jobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + }) + + It("If DeletionPolicy=DeleteWorkers, all workers should be deleted, but not the Head pod and submitter Job", func() { + // RayCluster exists + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Worker replicas set to 0 + Expect(*rayCluster.Spec.WorkerGroupSpecs[0].MinReplicas).To(Equal(int32(0))) + Expect(*rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(int32(0))) + + // 0 worker Pods exist + workerPods := corev1.PodList{} + workerLabels := common.RayClusterWorkerPodsAssociationOptions(rayCluster).ToListOptions() + Eventually( + listResourceFunc(ctx, &workerPods, workerLabels...), + time.Second*3, time.Millisecond*500).Should(Equal(0), "expected 0 workers") + + // Head Pod is still running + headPods := corev1.PodList{} + headLabels := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions() + Eventually( + listResourceFunc(ctx, &headPods, headLabels...), + time.Second*3, time.Millisecond*500).Should(Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items) + + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + Consistently( + getResourceFunc(ctx, namespacedName, job), + time.Second*3, time.Millisecond*500).Should(BeNil()) + }) + }) + + Describe("RayJob with DeletionPolicy=DeleteSelf", Ordered, func() { + features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true) + + ctx := context.Background() + namespace := "default" + rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deleteself", namespace) + deletionPolicy := rayv1.DeleteSelfDeletionPolicy + rayJob.Spec.DeletionPolicy = &deletionPolicy + rayJob.Spec.ShutdownAfterJobFinishes = false + rayCluster := &rayv1.RayCluster{} + + It("Verify RayJob spec", func() { + Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteSelfDeletionPolicy)) + }) + + It("Create a RayJob custom resource", func() { + err := k8sClient.Create(ctx, rayJob) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob") + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name) + }) + + It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set. + Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty()) + Expect(rayJob.Status.JobId).NotTo(BeEmpty()) + Expect(rayJob.Status.StartTime).NotTo(BeNil()) + }) + + It("In Initializing state, the RayCluster should eventually be created.", func() { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Check whether RayCluster is consistent with RayJob's RayClusterSpec. + Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)) + Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion)) + + // TODO (kevin85421): Check the RayCluster labels and annotations. + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name)) + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD))) + }) + + It("Make RayCluster.Status.State to be rayv1.Ready", func() { + // The RayCluster is not 'Ready' yet because Pods are not running and ready. + Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 + + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + + // The RayCluster.Status.State should be Ready. + Eventually( + getClusterState(ctx, namespace, rayCluster.Name), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) + }) + + It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Running state, the RayJob's Status.DashboardURL must be set. + Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty()) + + // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + }) + + It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() { + // Update fake dashboard client to return job info with "Succeeded" status. + getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required + return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil + } + fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo) + defer fakeRayDashboardClient.GetJobInfoMock.Store(nil) + + // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed. + Consistently( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // Update the submitter Kubernetes Job to Complete. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + + // Update the submitter Kubernetes Job to Complete. + conditions := []batchv1.JobCondition{ + {Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + } + job.Status.Conditions = conditions + Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed()) + }) + + It("If DeletionPolicy=DeleteSelf, all resources, including the RayJob, should be deleted", func() { + Eventually( + func() bool { + return apierrors.IsNotFound(getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob)()) + }, time.Second*3, time.Millisecond*500).Should(BeTrue()) + + Eventually( + func() bool { + return apierrors.IsNotFound(getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayCluster)()) + }, time.Second*3, time.Millisecond*500).Should(BeTrue()) + + job := &batchv1.Job{} + Eventually( + func() bool { + return apierrors.IsNotFound(getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, job)()) + }, time.Second*3, time.Millisecond*500).Should(BeTrue()) + }) + }) + })