From ba0988659758dd9363c7994262c092fcebda60d5 Mon Sep 17 00:00:00 2001 From: Matthew Barrington Date: Wed, 11 Dec 2024 17:48:44 +0000 Subject: [PATCH] Runner --- pkg/workloads/console/runner/runner.go | 138 +++++++++++++++---------- 1 file changed, 84 insertions(+), 54 deletions(-) diff --git a/pkg/workloads/console/runner/runner.go b/pkg/workloads/console/runner/runner.go index dd01a88f..179a6103 100644 --- a/pkg/workloads/console/runner/runner.go +++ b/pkg/workloads/console/runner/runner.go @@ -17,6 +17,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -251,72 +252,101 @@ func (c *Runner) waitForSuccess(ctx context.Context, csl *workloadsv1alpha1.Cons } listOptions := metav1.SingleObject(pod.ObjectMeta) - w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions) - if err != nil { - return fmt.Errorf("error watching pod: %w", err) - } - // We need to fetch the pod again now we have a watcher to avoid a race - // where the pod completed before we were listening for watch events - pod, _, err = c.GetAttachablePod(ctx, csl) - if err != nil { - // If we can't find the pod, then we should assume it finished successfully. Otherwise - // we might race against the operator to access a pod it wants to delete, and cause - // our runner to exit with error when all is fine. - // - // TODO: It may be better to recheck the console and look in its status? - if apierrors.IsNotFound(err) { - return nil + var rv string + + // Attmept to watch the pod for a successful completion up to 3 times + // This loop will only iterate if the watch returns the expired error + maxAttempts := 3 + for i := 0; i < maxAttempts; i++ { + // If we have a resource version set, then use that for sending to the watch + // This struct is the name as the SingleObject function, just allowing for a customer ResourceVersion + if rv != "" { + listOptions = metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ObjectMeta.Name).String(), + ResourceVersion: rv, + } } - return fmt.Errorf("error retrieving pod: %w", err) - } - - if succeeded(pod) { - return nil - } - - if !isRunning(pod) { - return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message) - } - - status := w.ResultChan() - defer w.Stop() + w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions) + if err != nil { + return fmt.Errorf("error watching pod: %w", err) + } - for { - select { - case event, ok := <-status: - // If our channel is closed, exit with error, as we'll otherwise assume - // we were successful when we never reached this state. - if !ok { - return errors.New("watch channel closed") + // We need to fetch the pod again now we have a watcher to avoid a race + // where the pod completed before we were listening for watch events + pod, _, err = c.GetAttachablePod(ctx, csl) + if err != nil { + // If we can't find the pod, then we should assume it finished successfully. Otherwise + // we might race against the operator to access a pod it wants to delete, and cause + // our runner to exit with error when all is fine. + // + // TODO: It may be better to recheck the console and look in its status? + if apierrors.IsNotFound(err) { + return nil } - // We can receive *metav1.Status events in the situation where there's an error, in - // which case we should exit early. - if status, ok := event.Object.(*metav1.Status); ok { - return fmt.Errorf("received failure from Kubernetes: %s", status.Reason) - } + return fmt.Errorf("error retrieving pod: %w", err) + } - // We should be safe now, as a watcher should return either Status or the type we - // asked it for. But we've been wrong before, and it wasn't easy to figure out what - // happened when we didn't print the type of the event. - pod, ok := event.Object.(*corev1.Pod) - if !ok { - return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v", - reflect.TypeOf(event.Object)) - } + if succeeded(pod) { + return nil + } - if succeeded(pod) { - return nil + if !isRunning(pod) { + return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message) + } + + status := w.ResultChan() + defer w.Stop() + + for { + select { + case event, ok := <-status: + // If our channel is closed, exit with error, as we'll otherwise assume + // we were successful when we never reached this state. + if !ok { + return errors.New("watch channel closed") + } + + // We can receive *metav1.Status events in the situation where there's an error, in + // which case we should exit early. + if status, ok := event.Object.(*metav1.Status); ok { + // Handle the case where a watch is expired but the pod no longer exists + // We can assume? That is finished successfully + if status.Reason == metav1.StatusReasonNotFound { + // We get the resource version in the status object. + // We set it here, so when this code loops again, we pick up from where we left off. + rv = status.ResourceVersion + break + } + return fmt.Errorf("received failure from Kubernetes: %s", status.Reason) + } + + // We should be safe now, as a watcher should return either Status or the type we + // asked it for. But we've been wrong before, and it wasn't easy to figure out what + // happened when we didn't print the type of the event. + pod, ok := event.Object.(*corev1.Pod) + if !ok { + return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v", + reflect.TypeOf(event.Object)) + } + + if succeeded(pod) { + return nil + } + if !isRunning(pod) { + return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message) + } + case <-ctx.Done(): + return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err()) } - if !isRunning(pod) { - return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message) + if rv != "" { + break } - case <-ctx.Done(): - return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err()) } } + return fmt.Errorf("received watch expired %d times", maxAttempts) } type GetOptions struct {