Skip to content

Commit

Permalink
Merge pull request #7 from incident-io/matthewbarrington/inf-1277-fix…
Browse files Browse the repository at this point in the history
…-issues-with-theatre-pod-watches-expiring

Update runner.go so it can handle expired error from watches
  • Loading branch information
mbarrin-incident authored Dec 12, 2024
2 parents 1bf2d63 + a3961f3 commit 8571d8c
Showing 1 changed file with 72 additions and 56 deletions.
128 changes: 72 additions & 56 deletions pkg/workloads/console/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,72 +251,88 @@ func (c *Runner) waitForSuccess(ctx context.Context, csl *workloadsv1alpha1.Cons
}

listOptions := metav1.SingleObject(pod.ObjectMeta)
w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions)
if err != nil {
return fmt.Errorf("error watching pod: %w", err)
}

// We need to fetch the pod again now we have a watcher to avoid a race
// where the pod completed before we were listening for watch events
pod, _, err = c.GetAttachablePod(ctx, csl)
if err != nil {
// If we can't find the pod, then we should assume it finished successfully. Otherwise
// we might race against the operator to access a pod it wants to delete, and cause
// our runner to exit with error when all is fine.
//
// TODO: It may be better to recheck the console and look in its status?
if apierrors.IsNotFound(err) {
return nil
// Handle an expired watch up to 3 times
// This loop only interates on an expired watch, all other code paths return from this function
maxAttempts := 3
for i := 0; i < maxAttempts; i++ {
w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions)
if err != nil {
return fmt.Errorf("error watching pod: %w", err)
}

return fmt.Errorf("error retrieving pod: %w", err)
}

if succeeded(pod) {
return nil
}

if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}

status := w.ResultChan()
defer w.Stop()

for {
select {
case event, ok := <-status:
// If our channel is closed, exit with error, as we'll otherwise assume
// we were successful when we never reached this state.
if !ok {
return errors.New("watch channel closed")
// We need to fetch the pod again now we have a watcher to avoid a race
// where the pod completed before we were listening for watch events
pod, _, err = c.GetAttachablePod(ctx, csl)
if err != nil {
// If we can't find the pod, then we should assume it finished successfully. Otherwise
// we might race against the operator to access a pod it wants to delete, and cause
// our runner to exit with error when all is fine.
//
// TODO: It may be better to recheck the console and look in its status?
if apierrors.IsNotFound(err) {
return nil
}

// We can receive *metav1.Status events in the situation where there's an error, in
// which case we should exit early.
if status, ok := event.Object.(*metav1.Status); ok {
return fmt.Errorf("received failure from Kubernetes: %s", status.Reason)
}
return fmt.Errorf("error retrieving pod: %w", err)
}

// We should be safe now, as a watcher should return either Status or the type we
// asked it for. But we've been wrong before, and it wasn't easy to figure out what
// happened when we didn't print the type of the event.
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v",
reflect.TypeOf(event.Object))
}
if succeeded(pod) {
return nil
}

if succeeded(pod) {
return nil
}
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}

status := w.ResultChan()
defer w.Stop()

// If we get a watch expired error, we jump to this label, which will jump to the sleep after the for loop
WATCHEXPIRED:
for {
select {
case event, ok := <-status:
// If our channel is closed, exit with error, as we'll otherwise assume
// we were successful when we never reached this state.
if !ok {
return errors.New("watch channel closed")
}

// We can receive *metav1.Status events in the situation where there's an error, in
// which case we should exit early, unless it is a watch expired error, in which case we try again.
if status, ok := event.Object.(*metav1.Status); ok {
if status.Reason == metav1.StatusReasonExpired {
// Recreating a watch from the previous ResourceVersion is not guaranteed to work, and can just return another expired watch.
// Setting the ResourceVersion to an empty string will cause the watch to start from the start of that pod's history, which should avoid the issue.
listOptions.ResourceVersion = ""
break WATCHEXPIRED
}
return fmt.Errorf("received failure from Kubernetes: %s: %s", status.Reason, status.Message)
}

// We should be safe now, as a watcher should return either Status or the type we
// asked it for. But we've been wrong before, and it wasn't easy to figure out what
// happened when we didn't print the type of the event.
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v",
reflect.TypeOf(event.Object))
}

if succeeded(pod) {
return nil
}
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}
case <-ctx.Done():
return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err())
}
case <-ctx.Done():
return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err())
}
}
// This error will only be raised after we have used all attempts to get a successful watch for the pod
return fmt.Errorf("received watch expired %d times", maxAttempts)
}

type GetOptions struct {
Expand Down

0 comments on commit 8571d8c

Please sign in to comment.