From 6f5122938f1efb672abcf19ea841cf192da8bbff Mon Sep 17 00:00:00 2001 From: Karol Wychowaniec Date: Mon, 30 Dec 2024 13:36:59 +0000 Subject: [PATCH] Minor refactor to scale-up orchestrator for more re-usability --- .../scaleup/orchestrator/async_initializer.go | 3 +- .../core/scaleup/orchestrator/executor.go | 63 +-------------- .../core/scaleup/orchestrator/orchestrator.go | 7 +- cluster-autoscaler/utils/errors/errors.go | 62 ++++++++++++++ .../errors/errors_test.go} | 80 +++++++++---------- 5 files changed, 108 insertions(+), 107 deletions(-) rename cluster-autoscaler/{core/scaleup/orchestrator/executor_test.go => utils/errors/errors_test.go} (52%) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/async_initializer.go b/cluster-autoscaler/core/scaleup/orchestrator/async_initializer.go index 0a681fd5a903..92e6a5235abc 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/async_initializer.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/async_initializer.go @@ -50,7 +50,8 @@ type AsyncNodeGroupInitializer struct { atomicScaleUp bool } -func newAsyncNodeGroupInitializer( +// NewAsyncNodeGroupInitializer creates a new AsyncNodeGroupInitializer instance. +func NewAsyncNodeGroupInitializer( nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, scaleUpExecutor *scaleUpExecutor, diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor.go b/cluster-autoscaler/core/scaleup/orchestrator/executor.go index 5c835e5d384d..873a34c26225 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/executor.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor.go @@ -18,8 +18,6 @@ package orchestrator import ( "fmt" - "sort" - "strings" "sync" "time" @@ -138,7 +136,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel( failedNodeGroups[i] = result.info.Group scaleUpErrors[i] = result.err } - return combineConcurrentScaleUpErrors(scaleUpErrors), failedNodeGroups + return errors.CombineConcurrentScaleUpErrors(scaleUpErrors), failedNodeGroups } return nil, nil } @@ -188,65 +186,6 @@ func (e *scaleUpExecutor) executeScaleUp( return nil } -func combineConcurrentScaleUpErrors(errs []errors.AutoscalerError) errors.AutoscalerError { - if len(errs) == 0 { - return nil - } - if len(errs) == 1 { - return errs[0] - } - uniqueMessages := make(map[string]bool) - uniqueTypes := make(map[errors.AutoscalerErrorType]bool) - for _, err := range errs { - uniqueTypes[err.Type()] = true - uniqueMessages[err.Error()] = true - } - if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 { - return errs[0] - } - // sort to stabilize the results and easier log aggregation - sort.Slice(errs, func(i, j int) bool { - errA := errs[i] - errB := errs[j] - if errA.Type() == errB.Type() { - return errs[i].Error() < errs[j].Error() - } - return errA.Type() < errB.Type() - }) - firstErr := errs[0] - printErrorTypes := len(uniqueTypes) > 1 - message := formatMessageFromConcurrentErrors(errs, printErrorTypes) - return errors.NewAutoscalerError(firstErr.Type(), message) -} - -func formatMessageFromConcurrentErrors(errs []errors.AutoscalerError, printErrorTypes bool) string { - firstErr := errs[0] - var builder strings.Builder - builder.WriteString(firstErr.Error()) - builder.WriteString(" ...and other concurrent errors: [") - formattedErrs := map[errors.AutoscalerError]bool{ - firstErr: true, - } - for _, err := range errs { - if _, has := formattedErrs[err]; has { - continue - } - formattedErrs[err] = true - var message string - if printErrorTypes { - message = fmt.Sprintf("[%s] %s", err.Type(), err.Error()) - } else { - message = err.Error() - } - if len(formattedErrs) > 2 { - builder.WriteString(", ") - } - builder.WriteString(fmt.Sprintf("%q", message)) - } - builder.WriteString("]") - return builder.String() -} - // Checks if all groups are scaled only once. // Scaling one group multiple times concurrently may cause problems. func checkUniqueNodeGroups(scaleUpInfos []nodegroupset.ScaleUpInfo) errors.AutoscalerError { diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index d57c2a6299b3..2ad6b989ab25 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -222,7 +222,9 @@ func (o *ScaleUpOrchestrator) ScaleUp( return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil } var scaleUpStatus *status.ScaleUpStatus - createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, allOrNothing) + oldId := bestOption.NodeGroup.Id() + initializer := NewAsyncNodeGroupInitializer(bestOption.NodeGroup, nodeInfos[oldId], o.scaleUpExecutor, o.taintConfig, daemonSets, o.processors.ScaleUpStatusProcessor, o.autoscalingContext, allOrNothing) + createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets, initializer) if aErr != nil { return scaleUpStatus, aErr } @@ -501,7 +503,7 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup( schedulablePodGroups map[string][]estimator.PodEquivalenceGroup, podEquivalenceGroups []*equivalence.PodGroup, daemonSets []*appsv1.DaemonSet, - allOrNothing bool, + initializer nodegroups.AsyncNodeGroupInitializer, ) ([]nodegroups.CreateNodeGroupResult, *status.ScaleUpStatus, errors.AutoscalerError) { createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0) @@ -509,7 +511,6 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup( var createNodeGroupResult nodegroups.CreateNodeGroupResult var aErr errors.AutoscalerError if o.autoscalingContext.AsyncNodeGroupsEnabled { - initializer := newAsyncNodeGroupInitializer(initialOption.NodeGroup, nodeInfos[oldId], o.scaleUpExecutor, o.taintConfig, daemonSets, o.processors.ScaleUpStatusProcessor, o.autoscalingContext, allOrNothing) createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroupAsync(o.autoscalingContext, initialOption.NodeGroup, initializer) } else { createNodeGroupResult, aErr = o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, initialOption.NodeGroup) diff --git a/cluster-autoscaler/utils/errors/errors.go b/cluster-autoscaler/utils/errors/errors.go index 24bc0c8355c6..6e429009c7d6 100644 --- a/cluster-autoscaler/utils/errors/errors.go +++ b/cluster-autoscaler/utils/errors/errors.go @@ -18,6 +18,8 @@ package errors import ( "fmt" + "sort" + "strings" ) // AutoscalerErrorType describes a high-level category of a given error @@ -131,3 +133,63 @@ func (e autoscalerErrorImpl) Type() AutoscalerErrorType { func (e autoscalerErrorImpl) AddPrefix(msg string, args ...interface{}) AutoscalerError { return autoscalerErrorImpl{errorType: e.errorType, wrappedErr: e, msg: fmt.Sprintf(msg, args...)} } + +// CombineConcurrentScaleUpErrors returns combined scale-up error to report after multiple concurrent scale-ups might haver failed. +func CombineConcurrentScaleUpErrors(errs []AutoscalerError) AutoscalerError { + if len(errs) == 0 { + return nil + } + if len(errs) == 1 { + return errs[0] + } + uniqueMessages := make(map[string]bool) + uniqueTypes := make(map[AutoscalerErrorType]bool) + for _, err := range errs { + uniqueTypes[err.Type()] = true + uniqueMessages[err.Error()] = true + } + if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 { + return errs[0] + } + // sort to stabilize the results and easier log aggregation + sort.Slice(errs, func(i, j int) bool { + errA := errs[i] + errB := errs[j] + if errA.Type() == errB.Type() { + return errs[i].Error() < errs[j].Error() + } + return errA.Type() < errB.Type() + }) + firstErr := errs[0] + printErrorTypes := len(uniqueTypes) > 1 + message := formatMessageFromConcurrentErrors(errs, printErrorTypes) + return NewAutoscalerError(firstErr.Type(), message) +} + +func formatMessageFromConcurrentErrors(errs []AutoscalerError, printErrorTypes bool) string { + firstErr := errs[0] + var builder strings.Builder + builder.WriteString(firstErr.Error()) + builder.WriteString(" ...and other concurrent errors: [") + formattedErrs := map[AutoscalerError]bool{ + firstErr: true, + } + for _, err := range errs { + if _, has := formattedErrs[err]; has { + continue + } + formattedErrs[err] = true + var message string + if printErrorTypes { + message = fmt.Sprintf("[%s] %s", err.Type(), err.Error()) + } else { + message = err.Error() + } + if len(formattedErrs) > 2 { + builder.WriteString(", ") + } + builder.WriteString(fmt.Sprintf("%q", message)) + } + builder.WriteString("]") + return builder.String() +} diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor_test.go b/cluster-autoscaler/utils/errors/errors_test.go similarity index 52% rename from cluster-autoscaler/core/scaleup/orchestrator/executor_test.go rename to cluster-autoscaler/utils/errors/errors_test.go index a7ef5d60f575..019fb2303409 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/executor_test.go +++ b/cluster-autoscaler/utils/errors/errors_test.go @@ -14,37 +14,35 @@ See the License for the specific language governing permissions and limitations under the License. */ -package orchestrator +package errors import ( "testing" - "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - "github.com/stretchr/testify/assert" ) func TestCombinedConcurrentScaleUpErrors(t *testing.T) { - cloudProviderErr := errors.NewAutoscalerError(errors.CloudProviderError, "provider error") - internalErr := errors.NewAutoscalerError(errors.InternalError, "internal error") + cloudProviderErr := NewAutoscalerError(CloudProviderError, "provider error") + internalErr := NewAutoscalerError(InternalError, "internal error") testCases := []struct { desc string - errors []errors.AutoscalerError - expectedErr errors.AutoscalerError + errors []AutoscalerError + expectedErr AutoscalerError }{ { desc: "no errors", - errors: []errors.AutoscalerError{}, + errors: []AutoscalerError{}, expectedErr: nil, }, { desc: "single error", - errors: []errors.AutoscalerError{internalErr}, + errors: []AutoscalerError{internalErr}, expectedErr: internalErr, }, { desc: "two duplicated errors", - errors: []errors.AutoscalerError{ + errors: []AutoscalerError{ internalErr, internalErr, }, @@ -52,75 +50,75 @@ func TestCombinedConcurrentScaleUpErrors(t *testing.T) { }, { desc: "two different errors", - errors: []errors.AutoscalerError{ + errors: []AutoscalerError{ cloudProviderErr, internalErr, }, - expectedErr: errors.NewAutoscalerError( - errors.CloudProviderError, + expectedErr: NewAutoscalerError( + CloudProviderError, "provider error ...and other concurrent errors: [\"[internalError] internal error\"]", ), }, { desc: "two different errors - reverse alphabetical order", - errors: []errors.AutoscalerError{ + errors: []AutoscalerError{ internalErr, cloudProviderErr, }, - expectedErr: errors.NewAutoscalerError( - errors.CloudProviderError, + expectedErr: NewAutoscalerError( + CloudProviderError, "provider error ...and other concurrent errors: [\"[internalError] internal error\"]", ), }, { desc: "errors with the same type and different messages", - errors: []errors.AutoscalerError{ - errors.NewAutoscalerError(errors.InternalError, "A"), - errors.NewAutoscalerError(errors.InternalError, "B"), - errors.NewAutoscalerError(errors.InternalError, "C"), + errors: []AutoscalerError{ + NewAutoscalerError(InternalError, "A"), + NewAutoscalerError(InternalError, "B"), + NewAutoscalerError(InternalError, "C"), }, - expectedErr: errors.NewAutoscalerError( - errors.InternalError, + expectedErr: NewAutoscalerError( + InternalError, "A ...and other concurrent errors: [\"B\", \"C\"]"), }, { desc: "errors with the same type and some duplicated messages", - errors: []errors.AutoscalerError{ - errors.NewAutoscalerError(errors.InternalError, "A"), - errors.NewAutoscalerError(errors.InternalError, "B"), - errors.NewAutoscalerError(errors.InternalError, "A"), + errors: []AutoscalerError{ + NewAutoscalerError(InternalError, "A"), + NewAutoscalerError(InternalError, "B"), + NewAutoscalerError(InternalError, "A"), }, - expectedErr: errors.NewAutoscalerError( - errors.InternalError, + expectedErr: NewAutoscalerError( + InternalError, "A ...and other concurrent errors: [\"B\"]"), }, { desc: "some duplicated errors", - errors: []errors.AutoscalerError{ - errors.NewAutoscalerError(errors.CloudProviderError, "A"), - errors.NewAutoscalerError(errors.CloudProviderError, "A"), - errors.NewAutoscalerError(errors.CloudProviderError, "B"), - errors.NewAutoscalerError(errors.InternalError, "A"), + errors: []AutoscalerError{ + NewAutoscalerError(CloudProviderError, "A"), + NewAutoscalerError(CloudProviderError, "A"), + NewAutoscalerError(CloudProviderError, "B"), + NewAutoscalerError(InternalError, "A"), }, - expectedErr: errors.NewAutoscalerError( - errors.CloudProviderError, + expectedErr: NewAutoscalerError( + CloudProviderError, "A ...and other concurrent errors: [\"[cloudProviderError] B\", \"[internalError] A\"]"), }, { desc: "different errors with quotes in messages", - errors: []errors.AutoscalerError{ - errors.NewAutoscalerError(errors.InternalError, "\"first\""), - errors.NewAutoscalerError(errors.InternalError, "\"second\""), + errors: []AutoscalerError{ + NewAutoscalerError(InternalError, "\"first\""), + NewAutoscalerError(InternalError, "\"second\""), }, - expectedErr: errors.NewAutoscalerError( - errors.InternalError, + expectedErr: NewAutoscalerError( + InternalError, "\"first\" ...and other concurrent errors: [\"\\\"second\\\"\"]"), }, } for _, testCase := range testCases { t.Run(testCase.desc, func(t *testing.T) { - combinedErr := combineConcurrentScaleUpErrors(testCase.errors) + combinedErr := CombineConcurrentScaleUpErrors(testCase.errors) assert.Equal(t, testCase.expectedErr, combinedErr) }) }