Skip to content

Commit

Permalink
Minor refactor to scale-up orchestrator for more re-usability
Browse files Browse the repository at this point in the history
  • Loading branch information
kawych committed Jan 10, 2025
1 parent 5cd491a commit 6f51229
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type AsyncNodeGroupInitializer struct {
atomicScaleUp bool
}

func newAsyncNodeGroupInitializer(
// NewAsyncNodeGroupInitializer creates a new AsyncNodeGroupInitializer instance.
func NewAsyncNodeGroupInitializer(
nodeGroup cloudprovider.NodeGroup,
nodeInfo *framework.NodeInfo,
scaleUpExecutor *scaleUpExecutor,
Expand Down
63 changes: 1 addition & 62 deletions cluster-autoscaler/core/scaleup/orchestrator/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package orchestrator

import (
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -138,7 +136,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
failedNodeGroups[i] = result.info.Group
scaleUpErrors[i] = result.err
}
return combineConcurrentScaleUpErrors(scaleUpErrors), failedNodeGroups
return errors.CombineConcurrentScaleUpErrors(scaleUpErrors), failedNodeGroups
}
return nil, nil
}
Expand Down Expand Up @@ -188,65 +186,6 @@ func (e *scaleUpExecutor) executeScaleUp(
return nil
}

func combineConcurrentScaleUpErrors(errs []errors.AutoscalerError) errors.AutoscalerError {
if len(errs) == 0 {
return nil
}
if len(errs) == 1 {
return errs[0]
}
uniqueMessages := make(map[string]bool)
uniqueTypes := make(map[errors.AutoscalerErrorType]bool)
for _, err := range errs {
uniqueTypes[err.Type()] = true
uniqueMessages[err.Error()] = true
}
if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 {
return errs[0]
}
// sort to stabilize the results and easier log aggregation
sort.Slice(errs, func(i, j int) bool {
errA := errs[i]
errB := errs[j]
if errA.Type() == errB.Type() {
return errs[i].Error() < errs[j].Error()
}
return errA.Type() < errB.Type()
})
firstErr := errs[0]
printErrorTypes := len(uniqueTypes) > 1
message := formatMessageFromConcurrentErrors(errs, printErrorTypes)
return errors.NewAutoscalerError(firstErr.Type(), message)
}

func formatMessageFromConcurrentErrors(errs []errors.AutoscalerError, printErrorTypes bool) string {
firstErr := errs[0]
var builder strings.Builder
builder.WriteString(firstErr.Error())
builder.WriteString(" ...and other concurrent errors: [")
formattedErrs := map[errors.AutoscalerError]bool{
firstErr: true,
}
for _, err := range errs {
if _, has := formattedErrs[err]; has {
continue
}
formattedErrs[err] = true
var message string
if printErrorTypes {
message = fmt.Sprintf("[%s] %s", err.Type(), err.Error())
} else {
message = err.Error()
}
if len(formattedErrs) > 2 {
builder.WriteString(", ")
}
builder.WriteString(fmt.Sprintf("%q", message))
}
builder.WriteString("]")
return builder.String()
}

// Checks if all groups are scaled only once.
// Scaling one group multiple times concurrently may cause problems.
func checkUniqueNodeGroups(scaleUpInfos []nodegroupset.ScaleUpInfo) errors.AutoscalerError {
Expand Down
7 changes: 4 additions & 3 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ func (o *ScaleUpOrchestrator) ScaleUp(
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
var scaleUpStatus *status.ScaleUpStatus
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, allOrNothing)
oldId := bestOption.NodeGroup.Id()
initializer := NewAsyncNodeGroupInitializer(bestOption.NodeGroup, nodeInfos[oldId], o.scaleUpExecutor, o.taintConfig, daemonSets, o.processors.ScaleUpStatusProcessor, o.autoscalingContext, allOrNothing)
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, initializer)
if aErr != nil {
return scaleUpStatus, aErr
}
Expand Down Expand Up @@ -501,15 +503,14 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup(
schedulablePodGroups map[string][]estimator.PodEquivalenceGroup,
podEquivalenceGroups []*equivalence.PodGroup,
daemonSets []*appsv1.DaemonSet,
allOrNothing bool,
initializer nodegroups.AsyncNodeGroupInitializer,
) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) {
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)

oldId := initialOption.NodeGroup.Id()
var createNodeGroupResult nodegroups.CreateNodeGroupResult
var aErr errors.AutoscalerError
if o.autoscalingContext.AsyncNodeGroupsEnabled {
initializer := newAsyncNodeGroupInitializer(initialOption.NodeGroup, nodeInfos[oldId], o.scaleUpExecutor, o.taintConfig, daemonSets, o.processors.ScaleUpStatusProcessor, o.autoscalingContext, allOrNothing)
createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroupAsync(o.autoscalingContext, initialOption.NodeGroup, initializer)
} else {
createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, initialOption.NodeGroup)
Expand Down
62 changes: 62 additions & 0 deletions cluster-autoscaler/utils/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package errors

import (
"fmt"
"sort"
"strings"
)

// AutoscalerErrorType describes a high-level category of a given error
Expand Down Expand Up @@ -131,3 +133,63 @@ func (e autoscalerErrorImpl) Type() AutoscalerErrorType {
func (e autoscalerErrorImpl) AddPrefix(msg string, args ...interface{}) AutoscalerError {
return autoscalerErrorImpl{errorType: e.errorType, wrappedErr: e, msg: fmt.Sprintf(msg, args...)}
}

// CombineConcurrentScaleUpErrors returns combined scale-up error to report after multiple concurrent scale-ups might haver failed.
func CombineConcurrentScaleUpErrors(errs []AutoscalerError) AutoscalerError {
if len(errs) == 0 {
return nil
}
if len(errs) == 1 {
return errs[0]
}
uniqueMessages := make(map[string]bool)
uniqueTypes := make(map[AutoscalerErrorType]bool)
for _, err := range errs {
uniqueTypes[err.Type()] = true
uniqueMessages[err.Error()] = true
}
if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 {
return errs[0]
}
// sort to stabilize the results and easier log aggregation
sort.Slice(errs, func(i, j int) bool {
errA := errs[i]
errB := errs[j]
if errA.Type() == errB.Type() {
return errs[i].Error() < errs[j].Error()
}
return errA.Type() < errB.Type()
})
firstErr := errs[0]
printErrorTypes := len(uniqueTypes) > 1
message := formatMessageFromConcurrentErrors(errs, printErrorTypes)
return NewAutoscalerError(firstErr.Type(), message)
}

func formatMessageFromConcurrentErrors(errs []AutoscalerError, printErrorTypes bool) string {
firstErr := errs[0]
var builder strings.Builder
builder.WriteString(firstErr.Error())
builder.WriteString(" ...and other concurrent errors: [")
formattedErrs := map[AutoscalerError]bool{
firstErr: true,
}
for _, err := range errs {
if _, has := formattedErrs[err]; has {
continue
}
formattedErrs[err] = true
var message string
if printErrorTypes {
message = fmt.Sprintf("[%s] %s", err.Type(), err.Error())
} else {
message = err.Error()
}
if len(formattedErrs) > 2 {
builder.WriteString(", ")
}
builder.WriteString(fmt.Sprintf("%q", message))
}
builder.WriteString("]")
return builder.String()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,113 +14,111 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator
package errors

import (
"testing"

"k8s.io/autoscaler/cluster-autoscaler/utils/errors"

"github.com/stretchr/testify/assert"
)

func TestCombinedConcurrentScaleUpErrors(t *testing.T) {
cloudProviderErr := errors.NewAutoscalerError(errors.CloudProviderError, "provider error")
internalErr := errors.NewAutoscalerError(errors.InternalError, "internal error")
cloudProviderErr := NewAutoscalerError(CloudProviderError, "provider error")
internalErr := NewAutoscalerError(InternalError, "internal error")
testCases := []struct {
desc string
errors []errors.AutoscalerError
expectedErr errors.AutoscalerError
errors []AutoscalerError
expectedErr AutoscalerError
}{
{
desc: "no errors",
errors: []errors.AutoscalerError{},
errors: []AutoscalerError{},
expectedErr: nil,
},
{
desc: "single error",
errors: []errors.AutoscalerError{internalErr},
errors: []AutoscalerError{internalErr},
expectedErr: internalErr,
},
{
desc: "two duplicated errors",
errors: []errors.AutoscalerError{
errors: []AutoscalerError{
internalErr,
internalErr,
},
expectedErr: internalErr,
},
{
desc: "two different errors",
errors: []errors.AutoscalerError{
errors: []AutoscalerError{
cloudProviderErr,
internalErr,
},
expectedErr: errors.NewAutoscalerError(
errors.CloudProviderError,
expectedErr: NewAutoscalerError(
CloudProviderError,
"provider error ...and other concurrent errors: [\"[internalError] internal error\"]",
),
},
{
desc: "two different errors - reverse alphabetical order",
errors: []errors.AutoscalerError{
errors: []AutoscalerError{
internalErr,
cloudProviderErr,
},
expectedErr: errors.NewAutoscalerError(
errors.CloudProviderError,
expectedErr: NewAutoscalerError(
CloudProviderError,
"provider error ...and other concurrent errors: [\"[internalError] internal error\"]",
),
},
{
desc: "errors with the same type and different messages",
errors: []errors.AutoscalerError{
errors.NewAutoscalerError(errors.InternalError, "A"),
errors.NewAutoscalerError(errors.InternalError, "B"),
errors.NewAutoscalerError(errors.InternalError, "C"),
errors: []AutoscalerError{
NewAutoscalerError(InternalError, "A"),
NewAutoscalerError(InternalError, "B"),
NewAutoscalerError(InternalError, "C"),
},
expectedErr: errors.NewAutoscalerError(
errors.InternalError,
expectedErr: NewAutoscalerError(
InternalError,
"A ...and other concurrent errors: [\"B\", \"C\"]"),
},
{
desc: "errors with the same type and some duplicated messages",
errors: []errors.AutoscalerError{
errors.NewAutoscalerError(errors.InternalError, "A"),
errors.NewAutoscalerError(errors.InternalError, "B"),
errors.NewAutoscalerError(errors.InternalError, "A"),
errors: []AutoscalerError{
NewAutoscalerError(InternalError, "A"),
NewAutoscalerError(InternalError, "B"),
NewAutoscalerError(InternalError, "A"),
},
expectedErr: errors.NewAutoscalerError(
errors.InternalError,
expectedErr: NewAutoscalerError(
InternalError,
"A ...and other concurrent errors: [\"B\"]"),
},
{
desc: "some duplicated errors",
errors: []errors.AutoscalerError{
errors.NewAutoscalerError(errors.CloudProviderError, "A"),
errors.NewAutoscalerError(errors.CloudProviderError, "A"),
errors.NewAutoscalerError(errors.CloudProviderError, "B"),
errors.NewAutoscalerError(errors.InternalError, "A"),
errors: []AutoscalerError{
NewAutoscalerError(CloudProviderError, "A"),
NewAutoscalerError(CloudProviderError, "A"),
NewAutoscalerError(CloudProviderError, "B"),
NewAutoscalerError(InternalError, "A"),
},
expectedErr: errors.NewAutoscalerError(
errors.CloudProviderError,
expectedErr: NewAutoscalerError(
CloudProviderError,
"A ...and other concurrent errors: [\"[cloudProviderError] B\", \"[internalError] A\"]"),
},
{
desc: "different errors with quotes in messages",
errors: []errors.AutoscalerError{
errors.NewAutoscalerError(errors.InternalError, "\"first\""),
errors.NewAutoscalerError(errors.InternalError, "\"second\""),
errors: []AutoscalerError{
NewAutoscalerError(InternalError, "\"first\""),
NewAutoscalerError(InternalError, "\"second\""),
},
expectedErr: errors.NewAutoscalerError(
errors.InternalError,
expectedErr: NewAutoscalerError(
InternalError,
"\"first\" ...and other concurrent errors: [\"\\\"second\\\"\"]"),
},
}

for _, testCase := range testCases {
t.Run(testCase.desc, func(t *testing.T) {
combinedErr := combineConcurrentScaleUpErrors(testCase.errors)
combinedErr := CombineConcurrentScaleUpErrors(testCase.errors)
assert.Equal(t, testCase.expectedErr, combinedErr)
})
}
Expand Down

0 comments on commit 6f51229

Please sign in to comment.