Skip to content

Commit

Permalink
Update runner.go so it can handle expired error from watches
Browse files Browse the repository at this point in the history
  • Loading branch information
mbarrin-incident committed Dec 11, 2024
1 parent 2956e6f commit bbae076
Showing 1 changed file with 84 additions and 54 deletions.
138 changes: 84 additions & 54 deletions pkg/workloads/console/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -251,72 +252,101 @@ func (c *Runner) waitForSuccess(ctx context.Context, csl *workloadsv1alpha1.Cons
}

listOptions := metav1.SingleObject(pod.ObjectMeta)
w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions)
if err != nil {
return fmt.Errorf("error watching pod: %w", err)
}

// We need to fetch the pod again now we have a watcher to avoid a race
// where the pod completed before we were listening for watch events
pod, _, err = c.GetAttachablePod(ctx, csl)
if err != nil {
// If we can't find the pod, then we should assume it finished successfully. Otherwise
// we might race against the operator to access a pod it wants to delete, and cause
// our runner to exit with error when all is fine.
//
// TODO: It may be better to recheck the console and look in its status?
if apierrors.IsNotFound(err) {
return nil
var rv string

// Attmept to watch the pod for a successful completion up to 3 times
// This loop will only iterate if the watch returns the expired error
maxAttempts := 3
for i := 0; i < maxAttempts; i++ {
// If we have a resource version set, then use that for sending to the watch
// This struct is the name as the SingleObject function, just allowing for a customer ResourceVersion
if rv != "" {
listOptions = metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ObjectMeta.Name).String(),
ResourceVersion: rv,
}
}

return fmt.Errorf("error retrieving pod: %w", err)
}

if succeeded(pod) {
return nil
}

if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}

status := w.ResultChan()
defer w.Stop()
w, err := c.clientset.CoreV1().Pods(pod.Namespace).Watch(ctx, listOptions)
if err != nil {
return fmt.Errorf("error watching pod: %w", err)
}

for {
select {
case event, ok := <-status:
// If our channel is closed, exit with error, as we'll otherwise assume
// we were successful when we never reached this state.
if !ok {
return errors.New("watch channel closed")
// We need to fetch the pod again now we have a watcher to avoid a race
// where the pod completed before we were listening for watch events
pod, _, err = c.GetAttachablePod(ctx, csl)
if err != nil {
// If we can't find the pod, then we should assume it finished successfully. Otherwise
// we might race against the operator to access a pod it wants to delete, and cause
// our runner to exit with error when all is fine.
//
// TODO: It may be better to recheck the console and look in its status?
if apierrors.IsNotFound(err) {
return nil
}

// We can receive *metav1.Status events in the situation where there's an error, in
// which case we should exit early.
if status, ok := event.Object.(*metav1.Status); ok {
return fmt.Errorf("received failure from Kubernetes: %s", status.Reason)
}
return fmt.Errorf("error retrieving pod: %w", err)
}

// We should be safe now, as a watcher should return either Status or the type we
// asked it for. But we've been wrong before, and it wasn't easy to figure out what
// happened when we didn't print the type of the event.
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v",
reflect.TypeOf(event.Object))
}
if succeeded(pod) {
return nil
}

if succeeded(pod) {
return nil
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}

status := w.ResultChan()
defer w.Stop()

for {
select {
case event, ok := <-status:
// If our channel is closed, exit with error, as we'll otherwise assume
// we were successful when we never reached this state.
if !ok {
return errors.New("watch channel closed")
}

// We can receive *metav1.Status events in the situation where there's an error, in
// which case we should exit early.
if status, ok := event.Object.(*metav1.Status); ok {
// Handle the case where a watch is expired but the pod no longer exists
// We can assume? That is finished successfully
if status.Reason == metav1.StatusReasonNotFound {
// We get the resource version in the status object.
// We set it here, so when this code loops again, we pick up from where we left off.
rv = status.ResourceVersion
break
}
return fmt.Errorf("received failure from Kubernetes: %s", status.Reason)
}

// We should be safe now, as a watcher should return either Status or the type we
// asked it for. But we've been wrong before, and it wasn't easy to figure out what
// happened when we didn't print the type of the event.
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("received an event that didn't reference a pod, which is unexpected: %v",
reflect.TypeOf(event.Object))
}

if succeeded(pod) {
return nil
}
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
}
case <-ctx.Done():
return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err())
}
if !isRunning(pod) {
return fmt.Errorf("pod in unsuccessful state %s: %s", pod.Status.Phase, pod.Status.Message)
if rv != "" {
break
}
case <-ctx.Done():
return fmt.Errorf("pod's last phase was: %v: %w", pod.Status.Phase, ctx.Err())
}
}
return fmt.Errorf("received watch expired %d times", maxAttempts)
}

type GetOptions struct {
Expand Down

0 comments on commit bbae076

Please sign in to comment.