Skip to content

Commit

Permalink
Improve handling of watch expired errors for attached pods
Browse files Browse the repository at this point in the history
We are currently getting issues on a regular basis with pods succeeding,
but Theatre thinking there were issues.

This updates the code to have a specific handling of the expire error,
and to retry 2 more times, with growing backoff.
  • Loading branch information
mbarrin-incident committed Dec 12, 2024
1 parent 2956e6f commit 69c42db
Showing 1 changed file with 77 additions and 56 deletions.
133 changes: 77 additions & 56 deletions pkg/workloads/console/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,72 +251,93 @@ func (c *Runner) waitForSuccess(ctx context.Context, csl *workloadsv1alpha1.Cons
}

listOptions := metav1.SingleObject(pod.ObjectMeta)
w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions)
if err != nil {
return fmt.Errorf("error watching pod: %w", err)
}

// We need to fetch the pod again now we have a watcher to avoid a race
// where the pod completed before we were listening for watch events
pod, _, err = c.GetAttachablePod(ctx, csl)
if err != nil {
// If we can't find the pod, then we should assume it finished successfully. Otherwise
// we might race against the operator to access a pod it wants to delete, and cause
// our runner to exit with error when all is fine.
//
// TODO: It may be better to recheck the console and look in its status?
if apierrors.IsNotFound(err) {
return nil
// Attmept to watch the pod for a successful completion up to 3 times
// This loop will only iterate if the watch returns the expired error
maxAttempts := 3
for i := 0; i < maxAttempts; i++ {
w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions)
if err != nil {
return fmt.Errorf("error watching pod: %w", err)
}

return fmt.Errorf("error retrieving pod: %w", err)
}

if succeeded(pod) {
return nil
}

if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}

status := w.ResultChan()
defer w.Stop()

for {
select {
case event, ok := <-status:
// If our channel is closed, exit with error, as we'll otherwise assume
// we were successful when we never reached this state.
if !ok {
return errors.New("watch channel closed")
// We need to fetch the pod again now we have a watcher to avoid a race
// where the pod completed before we were listening for watch events
pod, _, err = c.GetAttachablePod(ctx, csl)
if err != nil {
// If we can't find the pod, then we should assume it finished successfully. Otherwise
// we might race against the operator to access a pod it wants to delete, and cause
// our runner to exit with error when all is fine.
//
// TODO: It may be better to recheck the console and look in its status?
if apierrors.IsNotFound(err) {
return nil
}

// We can receive *metav1.Status events in the situation where there's an error, in
// which case we should exit early.
if status, ok := event.Object.(*metav1.Status); ok {
return fmt.Errorf("received failure from Kubernetes: %s", status.Reason)
}
return fmt.Errorf("error retrieving pod: %w", err)
}

// We should be safe now, as a watcher should return either Status or the type we
// asked it for. But we've been wrong before, and it wasn't easy to figure out what
// happened when we didn't print the type of the event.
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v",
reflect.TypeOf(event.Object))
}
if succeeded(pod) {
return nil
}

if succeeded(pod) {
return nil
}
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}

status := w.ResultChan()
defer w.Stop()

// If we get a watch expired error, we jump to this label, which will jump to the sleep after the for loop
WATCHEXPIRED:
for {
select {
case event, ok := <-status:
// If our channel is closed, exit with error, as we'll otherwise assume
// we were successful when we never reached this state.
if !ok {
return errors.New("watch channel closed")
}

// We can receive *metav1.Status events in the situation where there's an error, in
// which case we should exit early, unless it is a watch expired error, in which case we try again.
if status, ok := event.Object.(*metav1.Status); ok {
// Handle the watch having expired
if status.Reason == metav1.StatusReasonExpired {
// If we want to create a watch again, we need to set the resource version to the empty string
// Otherwise we risk getting another expired watch
listOptions.ResourceVersion = ""
break WATCHEXPIRED
}
return fmt.Errorf("received failure from Kubernetes: %s: %s", status.Reason, status.Message)
}

// We should be safe now, as a watcher should return either Status or the type we
// asked it for. But we've been wrong before, and it wasn't easy to figure out what
// happened when we didn't print the type of the event.
pod, ok = event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v",
reflect.TypeOf(event.Object))
}

if succeeded(pod) {
return nil
}
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}
case <-ctx.Done():
return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err())
}
case <-ctx.Done():
return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err())
}

// Sleep for a bit before trying again
time.Sleep(time.Duration(500 * time.Millisecond))
w.Stop()
}
// This error will only be raised after we have used all attempts to get a successful watch for the pod
return fmt.Errorf("received watch expired %d times", maxAttempts)
}

type GetOptions struct {
Expand Down

0 comments on commit 69c42db

Please sign in to comment.