Skip to content

Commit

Permalink
add pod check
Browse files Browse the repository at this point in the history
  • Loading branch information
simotw committed Dec 26, 2024
1 parent 9b85e43 commit a88d325
Showing 1 changed file with 43 additions and 9 deletions.
52 changes: 43 additions & 9 deletions ray-operator/test/e2e/rayjob_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
"testing"
"time"

"github.com/onsi/gomega"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
Expand All @@ -15,15 +16,15 @@ import (

func TestRayJobRecovery(t *testing.T) {
test := With(t)
g := gomega.NewWithT(t)
g := NewWithT(t)

// Create a namespace
namespace := test.NewTestNamespace()

// Job scripts
jobsAC := newConfigMap(namespace.Name, files(test, "long_running_counter.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)

test.T().Run("RayJob should recover after pod deletion", func(_ *testing.T) {
Expand All @@ -39,12 +40,12 @@ env_vars:
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

test.T().Logf("Waiting for RayJob %s/%s to start running", rayJob.Namespace, rayJob.Name)
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(gomega.WithTransform(RayJobStatus, gomega.Equal(rayv1.JobStatusRunning)))
Should(WithTransform(RayJobStatus, Equal(rayv1.JobStatusRunning)))
test.T().Logf("Find RayJob %s/%s running", rayJob.Namespace, rayJob.Name)
// wait for the job to run a bit
test.T().Logf("Sleep RayJob %s/%s 15 seconds", rayJob.Namespace, rayJob.Name)
Expand All @@ -54,7 +55,7 @@ env_vars:
jobpods, err := test.Client().Core().CoreV1().Pods(namespace.Name).List(test.Ctx(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", rayJob.Name),
})
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(err).NotTo(HaveOccurred())

// remove the running jobpods
propagationPolicy := metav1.DeletePropagationBackground
Expand All @@ -63,13 +64,46 @@ env_vars:
err = test.Client().Core().CoreV1().Pods(namespace.Name).Delete(test.Ctx(), pod.Name, metav1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
})
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(err).NotTo(HaveOccurred())
}

// Get old pod names for comparison
oldPodNames := make([]string, len(jobpods.Items))
for i, pod := range jobpods.Items {
oldPodNames[i] = pod.Name
}

test.T().Logf("Waiting for new pod to be created and running for RayJob %s/%s", namespace.Name, rayJob.Name)
g.Eventually(func() ([]corev1.Pod, error) {
pods, err := test.Client().Core().CoreV1().Pods(namespace.Name).List(
test.Ctx(),
metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", rayJob.Name),
},
)
g.Expect(err).NotTo(HaveOccurred())
return pods.Items, nil
}, TestTimeoutMedium).Should(
WithTransform(func(pods []corev1.Pod) bool {
for _, pod := range pods {
if pod.Status.Phase == corev1.PodRunning {
for _, oldPod := range jobpods.Items {
if pod.Name == oldPod.Name {
continue
}
}
test.T().Logf("Found new running pod %s/%s", pod.Namespace, pod.Name)
return true
}
}
return false
}, BeTrue()),
)

g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(gomega.WithTransform(RayJobStatus, gomega.Equal(rayv1.JobStatusSucceeded)))
Should(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded)))

g.Eventually(RayJob(test, namespace.Name, rayJob.Name), TestTimeoutMedium).
Should(gomega.WithTransform(RayJobDeploymentStatus, gomega.Equal(rayv1.JobDeploymentStatusComplete)))
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete)))
})
}

0 comments on commit a88d325

Please sign in to comment.