Skip to content

Commit

Permalink
fix(trafficrouting): Fix rollback behavior for canary with trafficrou…
Browse files Browse the repository at this point in the history
…ting and .DynamicStableScale=true

Without the fix, controller always scales stable up to 100%, while dynamically scaling canary down.
If rollback or abort occurs on later steps of rollout, it can cause various issues, e.g. surge of hundreds of new pods.

This fix ensures that when a rollout is aborted and .DynamicStableScale is enabled,
the StableRS dynamically scales up to 100% as NewRS scales down based on steps in reverse order.

Signed-off-by: Armen Shakhbazian <[email protected]>
  • Loading branch information
ArenSH committed Jan 8, 2025
1 parent 097f5be commit 5be5b26
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 26 deletions.
2 changes: 1 addition & 1 deletion rollout/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *rolloutContext) reconcileCanaryStableReplicaSet() (bool, error) {
// causing us to flap and scale up the stable 100 temporarily (before scaling down to 0 later).
// Therefore, we send c.rollout.Status.Canary.Weights so that the stable scaling happens in
// a *susbsequent*, follow-up reconciliation, lagging behind the setWeight and service switch.
_, desiredStableRSReplicaCount = replicasetutil.CalculateReplicaCountsForTrafficRoutedCanary(c.rollout, c.rollout.Status.Canary.Weights)
_, desiredStableRSReplicaCount = replicasetutil.CalculateReplicaCountsForTrafficRoutedCanary(c.rollout, c.newRS, c.stableRS, c.rollout.Status.Canary.Weights)
}
scaled, _, err := c.scaleReplicaSetAndRecordEvent(c.stableRS, desiredStableRSReplicaCount)
if err != nil {
Expand Down
31 changes: 25 additions & 6 deletions rollout/trafficrouting.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,34 @@ func (c *rolloutContext) calculateDesiredWeightOnAbortOrStableRollback() int32 {
// that it is fully scaled up
return 0
}
// When using dynamic stable scaling, we must dynamically decreasing the weight to the canary
// according to the availability of the stable (whatever it can support).
desiredWeight := maxInt(0, weightutil.MaxTrafficWeight(c.rollout)-((weightutil.MaxTrafficWeight(c.rollout)*c.stableRS.Status.AvailableReplicas) / *c.rollout.Spec.Replicas))
if c.rollout.Status.Canary.Weights != nil {

maxWeight := weightutil.MaxTrafficWeight(c.rollout)
// On rollback with .DynamicStableScale, we roll back based on step weights in reverse order
// therefore we need to scale based on canary availability
desiredCanaryWeight := replicasetutil.GetDesiredCanaryWeight(c.rollout, c.newRS, c.stableRS)
// canary weight computed based on available stable replicas
expectedCanaryWeight := maxInt(0, maxWeight-((maxWeight*c.stableRS.Status.AvailableReplicas) / *c.rollout.Spec.Replicas))
if c.rollout.Status.Canary.Weights == nil {
return maxInt(expectedCanaryWeight, desiredCanaryWeight)
}

currentCanaryWeight := c.rollout.Status.Canary.Weights.Canary.Weight
if desiredCanaryWeight <= 0 {
// This ensures that if we are already at a lower weight, then we will not
// increase the weight because stable availability is flapping (e.g. pod restarts)
desiredWeight = minInt(desiredWeight, c.rollout.Status.Canary.Weights.Canary.Weight)
return minInt(expectedCanaryWeight, currentCanaryWeight)
}
return desiredWeight

// this logic __heavily__ relies on the fact that CalculateReplicaCountsForTrafficRoutedCanary.
// Controller will scale canary down only if weight is shifted to primary,
// therefore we can safely delay shifting weight to primary until enough of them are available.
currentStableReplicasWeight := (maxWeight * c.stableRS.Status.AvailableReplicas) / *c.rollout.Spec.Replicas
if desiredCanaryWeight > 0 && currentStableReplicasWeight < (maxWeight-desiredCanaryWeight) {
// Current stable is still scalingUp, keep canary weight as is
return currentCanaryWeight
}

return minInt(desiredCanaryWeight, currentCanaryWeight)
}

// trafficWeightUpdatedMessage returns a message we emit for the kubernetes event whenever we adjust traffic weights
Expand Down
75 changes: 72 additions & 3 deletions rollout/trafficrouting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,9 +967,9 @@ func TestDynamicScalingDontIncreaseWeightWhenAborted(t *testing.T) {
f.run(getKey(r1, t))
}

// TestDynamicScalingDecreaseWeightAccordingToStableAvailabilityWhenAborted verifies we decrease the weight
// to the canary depending on the availability of the stable ReplicaSet when aborting
func TestDynamicScalingDecreaseWeightAccordingToStableAvailabilityWhenAborted(t *testing.T) {
// TestDynamicScalingDecreaseWeightAccordingToStepWeightsAvailabilityWhenAborted verifies we decrease the weight
// to the canary depending on the weight in previous Step when aborting
func TestDynamicScalingDecreaseWeightAccordingToStepWeightsAvailabilityWhenAborted(t *testing.T) {
f := newFixture(t)
defer f.Close()

Expand All @@ -994,6 +994,75 @@ func TestDynamicScalingDecreaseWeightAccordingToStableAvailabilityWhenAborted(t
r1.Status.AbortedAt = &metav1.Time{Time: time.Now().Add(-1 * time.Minute)}
r2 := bumpVersion(r1)

rs1 := newReplicaSetWithStatus(r1, 5, 3)
rs2 := newReplicaSetWithStatus(r2, 4, 4)

rs1PodHash := rs1.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
rs2PodHash := rs2.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
canarySelector := map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: rs2PodHash}
stableSelector := map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: rs1PodHash}
canarySvc := newService("canary", 80, canarySelector, r1)
stableSvc := newService("stable", 80, stableSelector, r1)
r2.Status.StableRS = rs1PodHash
r2.Status.Canary.Weights = &v1alpha1.TrafficWeights{
Canary: v1alpha1.WeightDestination{
Weight: 100,
ServiceName: "canary",
PodTemplateHash: rs2PodHash,
},
Stable: v1alpha1.WeightDestination{
Weight: 0,
ServiceName: "stable",
PodTemplateHash: rs1PodHash,
},
}

f.kubeobjects = append(f.kubeobjects, rs1, rs2, canarySvc, stableSvc)
f.replicaSetLister = append(f.replicaSetLister, rs1, rs2)

f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

f.expectPatchRolloutAction(r2)
f.expectUpdateReplicaSetAction(rs1)

f.fakeTrafficRouting = newUnmockedFakeTrafficRoutingReconciler()
f.fakeTrafficRouting.On("UpdateHash", mock.Anything, mock.Anything, mock.Anything).Return(nil)
f.fakeTrafficRouting.On("SetWeight", mock.Anything, mock.Anything).Return(func(desiredWeight int32, additionalDestinations ...v1alpha1.WeightDestination) error {
// make sure SetWeight was called with correct value
assert.Equal(t, int32(50), desiredWeight)
return nil
})
f.fakeTrafficRouting.On("SetHeaderRoute", mock.Anything, mock.Anything).Return(nil)
f.fakeTrafficRouting.On("RemoveManagedRoutes", mock.Anything, mock.Anything).Return(nil)
f.fakeTrafficRouting.On("VerifyWeight", mock.Anything).Return(pointer.BoolPtr(true), nil)
f.run(getKey(r1, t))
}

// TestDynamicScalingDecreaseWeightAccordingToStableAvailabilityWhenAborted verifies we decrease the weight
// to the canary depending on the availability of the stable ReplicaSet when aborting
func TestDynamicScalingDecreaseWeightAccordingToStableAvailabilityWhenAborted(t *testing.T) {
f := newFixture(t)
defer f.Close()

steps := []v1alpha1.CanaryStep{
{
Pause: &v1alpha1.RolloutPause{},
},
}
r1 := newCanaryRollout("foo", 5, nil, steps, pointer.Int32Ptr(1), intstr.FromInt(1), intstr.FromInt(1))
r1.Spec.Strategy.Canary.DynamicStableScale = true
r1.Spec.Strategy.Canary.TrafficRouting = &v1alpha1.RolloutTrafficRouting{
SMI: &v1alpha1.SMITrafficRouting{},
}
r1.Spec.Strategy.Canary.CanaryService = "canary"
r1.Spec.Strategy.Canary.StableService = "stable"
r1.Status.ReadyReplicas = 5
r1.Status.AvailableReplicas = 5
r1.Status.Abort = true
r1.Status.AbortedAt = &metav1.Time{Time: time.Now().Add(-1 * time.Minute)}
r2 := bumpVersion(r1)

rs1 := newReplicaSetWithStatus(r1, 5, 1)
rs2 := newReplicaSetWithStatus(r2, 4, 4)

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/canary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,8 @@ func (s *CanarySuite) TestCanaryDynamicStableScale() {
AbortRollout().
MarkPodsReady("1", 2). // mark 2 stable pods as ready (3/4 stable are ready)
WaitForRevisionPodCount("2", 1).
WaitForRevisionPodCount("1", 4).
Then().
ExpectRevisionPodCount("1", 4).
// Assert that the canary service selector is still not set to stable rs because of dynamic stable scale still in progress
Assert(func(t *fixtures.Then) {
canarySvc, stableSvc := t.GetServices()
Expand Down
46 changes: 39 additions & 7 deletions utils/replicaset/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func AtDesiredReplicaCountsForCanary(ro *v1alpha1.Rollout, newRS, stableRS *apps
if ro.Spec.Strategy.Canary.TrafficRouting == nil {
desiredNewRSReplicaCount, desiredStableRSReplicaCount = CalculateReplicaCountsForBasicCanary(ro, newRS, stableRS, olderRSs)
} else {
desiredNewRSReplicaCount, desiredStableRSReplicaCount = CalculateReplicaCountsForTrafficRoutedCanary(ro, weights)
desiredNewRSReplicaCount, desiredStableRSReplicaCount = CalculateReplicaCountsForTrafficRoutedCanary(ro, newRS, stableRS, weights)
}
if !allDesiredAreAvailable(newRS, desiredNewRSReplicaCount) {
return false
Expand Down Expand Up @@ -93,7 +93,7 @@ func AtDesiredReplicaCountsForCanary(ro *v1alpha1.Rollout, newRS, stableRS *apps
// For more examples, check the CalculateReplicaCountsForBasicCanary test in canary/canary_test.go
func CalculateReplicaCountsForBasicCanary(rollout *v1alpha1.Rollout, newRS *appsv1.ReplicaSet, stableRS *appsv1.ReplicaSet, oldRSs []*appsv1.ReplicaSet) (int32, int32) {
rolloutSpecReplica := defaults.GetReplicasOrDefault(rollout.Spec.Replicas)
_, desiredWeight := GetCanaryReplicasOrWeight(rollout)
_, desiredWeight := GetCanaryReplicasOrWeight(rollout, newRS, stableRS)
maxSurge := MaxSurge(rollout)
maxWeight := weightutil.MaxTrafficWeight(rollout)

Expand Down Expand Up @@ -338,10 +338,10 @@ func CheckMinPodsPerReplicaSet(rollout *v1alpha1.Rollout, count int32) int32 {
// when using canary with traffic routing. If current traffic weights are supplied, we factor the
// those weights into the and return the higher of current traffic scale vs. desired traffic scale
// If MinPodsPerReplicaSet is defined and the number of replicas in either RS is not 0, then return at least MinPodsPerReplicaSet
func CalculateReplicaCountsForTrafficRoutedCanary(rollout *v1alpha1.Rollout, weights *v1alpha1.TrafficWeights) (int32, int32) {
func CalculateReplicaCountsForTrafficRoutedCanary(rollout *v1alpha1.Rollout, newRS, stableRS *appsv1.ReplicaSet, weights *v1alpha1.TrafficWeights) (int32, int32) {
var canaryCount, stableCount int32
rolloutSpecReplica := defaults.GetReplicasOrDefault(rollout.Spec.Replicas)
setCanaryScaleReplicas, desiredWeight := GetCanaryReplicasOrWeight(rollout)
setCanaryScaleReplicas, desiredWeight := GetCanaryReplicasOrWeight(rollout, newRS, stableRS)
maxWeight := weightutil.MaxTrafficWeight(rollout)
if setCanaryScaleReplicas != nil {
// a canary count was explicitly set
Expand Down Expand Up @@ -463,8 +463,7 @@ func GetCurrentCanaryStep(rollout *v1alpha1.Rollout) (*v1alpha1.CanaryStep, *int
return &rollout.Spec.Strategy.Canary.Steps[currentStepIndex], &currentStepIndex
}

// GetCanaryReplicasOrWeight either returns a static set of replicas or a weight percentage
func GetCanaryReplicasOrWeight(rollout *v1alpha1.Rollout) (*int32, int32) {
func GetCanaryReplicasOrWeight(rollout *v1alpha1.Rollout, newRS, stableRS *appsv1.ReplicaSet) (*int32, int32) {
if rollout.Status.PromoteFull || rollout.Status.StableRS == "" || rollout.Status.CurrentPodHash == rollout.Status.StableRS {
return nil, weightutil.MaxTrafficWeight(rollout)
}
Expand All @@ -475,7 +474,40 @@ func GetCanaryReplicasOrWeight(rollout *v1alpha1.Rollout) (*int32, int32) {
return nil, *scs.Weight
}
}
return nil, GetCurrentSetWeight(rollout)

return nil, GetDesiredCanaryWeight(rollout, newRS, stableRS)
}

// GetDesiredCanaryWeight either returns a weight percentage
func GetDesiredCanaryWeight(rollout *v1alpha1.Rollout, newRS, stableRS *appsv1.ReplicaSet) int32 {
// if rollout is aborted and .DynamicStableScale is true
// StableRS should dynamically scale up to 100%
// based on steps in reverse order. This way if abort happened in the last steps
// rollback will not create a surge of new pods by scaling StableRS to 100% immediately
if rollout.Status.Abort && rollout.Spec.Strategy.Canary.DynamicStableScale && newRS != nil && stableRS != nil {
maxWeight := weightutil.MaxTrafficWeight(rollout)
rolloutSpecReplica := defaults.GetReplicasOrDefault(rollout.Spec.Replicas)
// since caller function will compute the desired stableRS replicas number based on canary weight
// here the computation is done backwards
expectedCanaryReplicas := rolloutSpecReplica - stableRS.Status.AvailableReplicas
// max makes sure that scaling down NewRS replicas will catch up with scaling up stableRS replicas
canaryReplicas := max(expectedCanaryReplicas, newRS.Status.AvailableReplicas)
// find next step to scale down NewRS to
for i := len(rollout.Spec.Strategy.Canary.Steps) - 1; i >= 0; i-- {
step := rollout.Spec.Strategy.Canary.Steps[i]
if step.SetWeight != nil {
stepReplicas := trafficWeightToReplicas(rolloutSpecReplica, *step.SetWeight, maxWeight)
if stepReplicas < canaryReplicas {
return *step.SetWeight
}
}
}

// if nothing found, return 0
return 0
}

return GetCurrentSetWeight(rollout)
}

// GetCurrentSetWeight grabs the current setWeight used by the rollout by iterating backwards from the current step
Expand Down
Loading

0 comments on commit 5be5b26

Please sign in to comment.