diff --git a/pkg/workloads/console/runner/runner.go b/pkg/workloads/console/runner/runner.go index dd01a88f..90bccd2c 100644 --- a/pkg/workloads/console/runner/runner.go +++ b/pkg/workloads/console/runner/runner.go @@ -251,72 +251,93 @@ func (c *Runner) waitForSuccess(ctx context.Context, csl *workloadsv1alpha1.Cons } listOptions := metav1.SingleObject(pod.ObjectMeta) - w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions) - if err != nil { - return fmt.Errorf("error watching pod: %w", err) - } - // We need to fetch the pod again now we have a watcher to avoid a race - // where the pod completed before we were listening for watch events - pod, _, err = c.GetAttachablePod(ctx, csl) - if err != nil { - // If we can't find the pod, then we should assume it finished successfully. Otherwise - // we might race against the operator to access a pod it wants to delete, and cause - // our runner to exit with error when all is fine. - // - // TODO: It may be better to recheck the console and look in its status? - if apierrors.IsNotFound(err) { - return nil + // Attmept to watch the pod for a successful completion up to 3 times + // This loop will only iterate if the watch returns the expired error + maxAttempts := 3 + for i := 0; i < maxAttempts; i++ { + w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions) + if err != nil { + return fmt.Errorf("error watching pod: %w", err) } - return fmt.Errorf("error retrieving pod: %w", err) - } - - if succeeded(pod) { - return nil - } - - if !isRunning(pod) { - return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message) - } - - status := w.ResultChan() - defer w.Stop() - - for { - select { - case event, ok := <-status: - // If our channel is closed, exit with error, as we'll otherwise assume - // we were successful when we never reached this state. - if !ok { - return errors.New("watch channel closed") + // We need to fetch the pod again now we have a watcher to avoid a race + // where the pod completed before we were listening for watch events + pod, _, err = c.GetAttachablePod(ctx, csl) + if err != nil { + // If we can't find the pod, then we should assume it finished successfully. Otherwise + // we might race against the operator to access a pod it wants to delete, and cause + // our runner to exit with error when all is fine. + // + // TODO: It may be better to recheck the console and look in its status? + if apierrors.IsNotFound(err) { + return nil } - // We can receive *metav1.Status events in the situation where there's an error, in - // which case we should exit early. - if status, ok := event.Object.(*metav1.Status); ok { - return fmt.Errorf("received failure from Kubernetes: %s", status.Reason) - } + return fmt.Errorf("error retrieving pod: %w", err) + } - // We should be safe now, as a watcher should return either Status or the type we - // asked it for. But we've been wrong before, and it wasn't easy to figure out what - // happened when we didn't print the type of the event. - pod, ok := event.Object.(*corev1.Pod) - if !ok { - return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v", - reflect.TypeOf(event.Object)) - } + if succeeded(pod) { + return nil + } - if succeeded(pod) { - return nil - } - if !isRunning(pod) { - return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message) + if !isRunning(pod) { + return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message) + } + + status := w.ResultChan() + defer w.Stop() + + // If we get a watch expired error, we jump to this label, which will jump to the sleep after the for loop + WATCHEXPIRED: + for { + select { + case event, ok := <-status: + // If our channel is closed, exit with error, as we'll otherwise assume + // we were successful when we never reached this state. + if !ok { + return errors.New("watch channel closed") + } + + // We can receive *metav1.Status events in the situation where there's an error, in + // which case we should exit early, unless it is a watch expired error, in which case we try again. + if status, ok := event.Object.(*metav1.Status); ok { + // Handle the watch having expired + if status.Reason == metav1.StatusReasonExpired { + // If we want to create a watch again, we need to set the resource version to the empty string + // Otherwise we risk getting another expired watch + listOptions.ResourceVersion = "" + break WATCHEXPIRED + } + return fmt.Errorf("received failure from Kubernetes: %s: %s", status.Reason, status.Message) + } + + // We should be safe now, as a watcher should return either Status or the type we + // asked it for. But we've been wrong before, and it wasn't easy to figure out what + // happened when we didn't print the type of the event. + pod, ok = event.Object.(*corev1.Pod) + if !ok { + return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v", + reflect.TypeOf(event.Object)) + } + + if succeeded(pod) { + return nil + } + if !isRunning(pod) { + return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message) + } + case <-ctx.Done(): + return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err()) } - case <-ctx.Done(): - return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err()) } + + // Sleep for a bit before trying again + time.Sleep(time.Duration(500 * time.Millisecond)) + w.Stop() } + // This error will only be raised after we have used all attempts to get a successful watch for the pod + return fmt.Errorf("received watch expired %d times", maxAttempts) } type GetOptions struct {