diff --git a/.github/workflows/long-running-azure.yaml b/.github/workflows/long-running-azure.yaml index c6a81067f1..718c742af7 100644 --- a/.github/workflows/long-running-azure.yaml +++ b/.github/workflows/long-running-azure.yaml @@ -46,6 +46,12 @@ permissions: on: # Enable manual trigger to deploy the latest changes from main. workflow_dispatch: + inputs: + skip-build: + description: 'Skip build (true/false)' + required: false + default: 'true' + type: string schedule: # Run every 2 hours - cron: "0 */2 * * *" @@ -115,20 +121,29 @@ jobs: path: ./dist/cache key: radius-test-latest- - name: Skip build if build is still valid - if: github.event_name != 'pull_request' && github.event_name != 'workflow_dispatch' + if: github.event_name != 'pull_request' id: skip-build run: | # check if the last build time to see if we need to build again + SKIP_BUILD="false" if [ -f ./dist/cache/.lastbuildtime ]; then lastbuild=$(cat ./dist/cache/.lastbuildtime) current_time=$(date +%s) if [ $((current_time-lastbuild)) -lt ${{ env.VALID_RADIUS_BUILD_WINDOW }} ]; then - echo "Skipping build as the last build is still valid." - echo "SKIP_BUILD=true" >> $GITHUB_OUTPUT + echo "Last build is still within valid window" + SKIP_BUILD="true" fi fi + + # Check override in workflow_dispatch mode + if [ "${{ github.event_name }}" = "workflow_dispatch" ] && [ "${{ github.event.inputs.skip-build }}" = "false" ]; then + echo "Manual run with skip-build=false, forcing build" + SKIP_BUILD="false" + fi + + echo "SKIP_BUILD=${SKIP_BUILD}" >> $GITHUB_OUTPUT - name: Set up checkout target (scheduled) - if: steps.skip-build.outputs.SKIP_BUILD != 'true' && github.event_name == 'schedule' + if: steps.skip-build.outputs.SKIP_BUILD != 'true' && (github.event_name == 'schedule' || github.event_name == 'workflow_dispatch') run: | echo "CHECKOUT_REPO=${{ github.repository }}" >> $GITHUB_ENV echo "CHECKOUT_REF=refs/heads/main" >> $GITHUB_ENV @@ -436,8 +451,8 @@ jobs: exit 1 fi - # Poll logs for up to iterations, 30 seconds each (upto 3 minutes total) - for i in {1..6}; do + # Poll logs for up to 20 iterations, 30 seconds each (up to 10 minutes total) + for i in {1..20}; do kubectl logs "$POD_NAME" -n radius-system | tee registermanifest_logs.txt > /dev/null # Exit on error @@ -459,7 +474,7 @@ jobs: # Final check to ensure success message was found if ! grep -q "Successfully registered manifests" registermanifest_logs.txt; then - echo "Manifests not registered after 3 minutes." + echo "Manifests not registered after 10 minutes." exit 1 fi - name: Create a list of resources not to be deleted diff --git a/pkg/cli/manifest/registermanifest.go b/pkg/cli/manifest/registermanifest.go index 7ba5fdbe83..909cb69def 100644 --- a/pkg/cli/manifest/registermanifest.go +++ b/pkg/cli/manifest/registermanifest.go @@ -18,15 +18,23 @@ package manifest import ( "context" + "errors" "fmt" "os" "path/filepath" + "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" v1 "github.com/radius-project/radius/pkg/armrpc/api/v1" "github.com/radius-project/radius/pkg/to" "github.com/radius-project/radius/pkg/ucp/api/v20231001preview" ) +const ( + initialBackoff = 2 * time.Second + maxRetries = 5 +) + // RegisterFile registers a manifest file func RegisterFile(ctx context.Context, clientFactory *v20231001preview.ClientFactory, planeName string, filePath string, logger func(format string, args ...any)) error { if filePath == "" { @@ -51,15 +59,22 @@ func RegisterFile(ctx context.Context, clientFactory *v20231001preview.ClientFac } logIfEnabled(logger, "Creating resource provider %s at location %s", resourceProvider.Name, locationName) - resourceProviderPoller, err := clientFactory.NewResourceProvidersClient().BeginCreateOrUpdate(ctx, planeName, resourceProvider.Name, v20231001preview.ResourceProviderResource{ - Location: to.Ptr(locationName), - Properties: &v20231001preview.ResourceProviderProperties{}, - }, nil) - if err != nil { - return err - } - - _, err = resourceProviderPoller.PollUntilDone(ctx, nil) + err = retryOperation(ctx, func() error { + resourceProviderPoller, err := clientFactory.NewResourceProvidersClient().BeginCreateOrUpdate( + ctx, planeName, resourceProvider.Name, + v20231001preview.ResourceProviderResource{ + Location: to.Ptr(locationName), + Properties: &v20231001preview.ResourceProviderProperties{}, + }, nil) + if err != nil { + return err + } + _, err = resourceProviderPoller.PollUntilDone(ctx, nil) + if err != nil { + return err // also retried if error indicates a 409 conflict + } + return nil + }, logger) if err != nil { return err } @@ -74,17 +89,22 @@ func RegisterFile(ctx context.Context, clientFactory *v20231001preview.ClientFac for resourceTypeName, resourceType := range resourceProvider.Types { logIfEnabled(logger, "Creating resource type %s/%s", resourceProvider.Name, resourceTypeName) - resourceTypePoller, err := clientFactory.NewResourceTypesClient().BeginCreateOrUpdate(ctx, planeName, resourceProvider.Name, resourceTypeName, v20231001preview.ResourceTypeResource{ - Properties: &v20231001preview.ResourceTypeProperties{ - Capabilities: to.SliceOfPtrs(resourceType.Capabilities...), - DefaultAPIVersion: resourceType.DefaultAPIVersion, - }, - }, nil) - if err != nil { - return err - } - - _, err = resourceTypePoller.PollUntilDone(ctx, nil) + err = retryOperation(ctx, func() error { + resourceTypePoller, err := clientFactory.NewResourceTypesClient().BeginCreateOrUpdate(ctx, planeName, resourceProvider.Name, resourceTypeName, v20231001preview.ResourceTypeResource{ + Properties: &v20231001preview.ResourceTypeProperties{ + Capabilities: to.SliceOfPtrs(resourceType.Capabilities...), + DefaultAPIVersion: resourceType.DefaultAPIVersion, + }, + }, nil) + if err != nil { + return err + } + _, err = resourceTypePoller.PollUntilDone(ctx, nil) + if err != nil { + return err + } + return nil + }, logger) if err != nil { return err } @@ -95,18 +115,22 @@ func RegisterFile(ctx context.Context, clientFactory *v20231001preview.ClientFac for apiVersionName := range resourceType.APIVersions { logIfEnabled(logger, "Creating API Version %s/%s@%s", resourceProvider.Name, resourceTypeName, apiVersionName) - apiVersionsPoller, err := clientFactory.NewAPIVersionsClient().BeginCreateOrUpdate(ctx, planeName, resourceProvider.Name, resourceTypeName, apiVersionName, v20231001preview.APIVersionResource{ - Properties: &v20231001preview.APIVersionProperties{}, - }, nil) + err = retryOperation(ctx, func() error { + apiVersionsPoller, err := clientFactory.NewAPIVersionsClient().BeginCreateOrUpdate(ctx, planeName, resourceProvider.Name, resourceTypeName, apiVersionName, v20231001preview.APIVersionResource{ + Properties: &v20231001preview.APIVersionProperties{}, + }, nil) + if err != nil { + return err + } + _, err = apiVersionsPoller.PollUntilDone(ctx, nil) + if err != nil { + return err + } + return nil + }, logger) if err != nil { return err } - - _, err = apiVersionsPoller.PollUntilDone(ctx, nil) - if err != nil { - return err - } - locationResourceType.APIVersions[apiVersionName] = map[string]any{} } @@ -118,12 +142,17 @@ func RegisterFile(ctx context.Context, clientFactory *v20231001preview.ClientFac } logIfEnabled(logger, "Creating location %s/%s/%s", resourceProvider.Name, locationName, address) - locationPoller, err := clientFactory.NewLocationsClient().BeginCreateOrUpdate(ctx, planeName, resourceProvider.Name, locationName, locationResource, nil) - if err != nil { - return err - } - - _, err = locationPoller.PollUntilDone(ctx, nil) + err = retryOperation(ctx, func() error { + locationPoller, err := clientFactory.NewLocationsClient().BeginCreateOrUpdate(ctx, planeName, resourceProvider.Name, locationName, locationResource, nil) + if err != nil { + return err + } + _, err = locationPoller.PollUntilDone(ctx, nil) + if err != nil { + return err + } + return nil + }, logger) if err != nil { return err } @@ -275,3 +304,43 @@ func logIfEnabled(logger func(format string, args ...any), format string, args . logger(format, args...) } } + +// retryOperation retries an operation with exponential backoff upon a 409 conflict. +// It also handles context cancellation or timeouts, returning immediately if ctx is done. +func retryOperation(ctx context.Context, operation func() error, logger func(format string, args ...any)) error { + backoff := initialBackoff + + for attempt := 1; attempt <= maxRetries; attempt++ { + err := operation() + if err != nil { + if is409ConflictError(err) { + if logger != nil { + logger("Got 409 conflict on attempt %d/%d. Retrying in %s...", attempt, maxRetries, backoff) + } + // Wait for either the context to be cancelled or the backoff duration to pass + select { + case <-ctx.Done(): + // Context cancelled or timed out + return ctx.Err() + case <-time.After(backoff): + // Increase backoff and try again + backoff *= 2 + continue + } + } + return err + } + return nil + } + return fmt.Errorf("exceeded %d retries", maxRetries) +} + +// is409ConflictError returns true if the error is a 409 Conflict error +func is409ConflictError(err error) bool { + if err == nil { + return false + } + + var respErr *azcore.ResponseError + return errors.As(err, &respErr) && respErr.StatusCode == 409 +} diff --git a/pkg/cli/manifest/registermanifest_test.go b/pkg/cli/manifest/registermanifest_test.go index 847939b8bc..b96af1eb4b 100644 --- a/pkg/cli/manifest/registermanifest_test.go +++ b/pkg/cli/manifest/registermanifest_test.go @@ -20,10 +20,13 @@ import ( "bytes" "context" "fmt" + "strings" "testing" + "time" // armpolicy "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/radius-project/radius/pkg/to" "github.com/stretchr/testify/require" ) @@ -240,3 +243,231 @@ func TestRegisterType(t *testing.T) { }) } } +func TestRetryOperation(t *testing.T) { + tests := []struct { + name string + operation func() error + attempts int + expectedError string + }{ + { + name: "success on first attempt", + operation: func() error { + // No retries needed; always succeeds. + return nil + }, + attempts: 1, + }, + { + name: "success after retry", + // Return a closure that keeps track of how many times it's invoked. + // The first call returns 409, the second returns nil. + operation: (func() func() error { + var attempt int + return func() error { + attempt++ + if attempt == 1 { + return &azcore.ResponseError{StatusCode: 409} + } + return nil + } + })(), + attempts: 2, + }, + { + name: "non-409 error", + operation: func() error { + // Will fail immediately, no retry. + return fmt.Errorf("non-409 error") + }, + attempts: 1, + expectedError: "non-409 error", + }, + { + name: "verify increasing backoff", + // Test that each retry is spaced out longer than the previous one. + operation: (func() func() error { + var lastTime time.Time + var lastDuration time.Duration + var attempt int + return func() error { + now := time.Now() + if attempt > 0 { + // Measure how long since last invocation + currentDuration := now.Sub(lastTime) + if currentDuration <= lastDuration { + return fmt.Errorf("backoff did not increase: previous %v, current %v", + lastDuration, currentDuration) + } + lastDuration = currentDuration + } + lastTime = now + + attempt++ + // Force 409 until the third attempt + if attempt < 3 { + return &azcore.ResponseError{StatusCode: 409} + } + return nil + } + })(), + attempts: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // We'll capture log output here + var logBuffer bytes.Buffer + logger := func(format string, args ...any) { + fmt.Fprintf(&logBuffer, format+"\n", args...) + } + + var actualAttempts int + + // wrappedOp is what's passed to retryOperation(). + // Each retry calls this, so we increment actualAttempts each time. + wrappedOp := func() error { + actualAttempts++ + return tt.operation() + } + + err := retryOperation(context.Background(), wrappedOp, logger) + + if tt.expectedError != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.expectedError) + } else { + require.NoError(t, err) + } + + require.Equal(t, tt.attempts, actualAttempts, "unexpected number of attempts") + + // If more than 1 attempt, we expect conflict logs. + if tt.attempts > 1 { + logContent := logBuffer.String() + require.Contains(t, logContent, "Got 409 conflict on attempt") + + lines := strings.Split(strings.TrimSpace(logContent), "\n") + // We'll see one log line per retry. E.g. if attempts=3, that means 2 retries logged. + require.Equal(t, tt.attempts-1, len(lines), "expected retry log messages don't match attempts") + } + }) + } +} + +func TestRetryOperationWithContext(t *testing.T) { + tests := []struct { + name string + operation func() error + setupCtx func() context.Context + attempts int + expectedError string + }{ + { + name: "context cancelled", + operation: func() error { + return &azcore.ResponseError{StatusCode: 409} + }, + setupCtx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + return ctx + }, + attempts: 1, + expectedError: "context canceled", + }, + { + name: "context timeout", + operation: func() error { + return &azcore.ResponseError{StatusCode: 409} + }, + setupCtx: func() context.Context { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + // Ensure cancel is called after context is done + go func() { + <-ctx.Done() + cancel() + }() + return ctx + }, + attempts: 1, + expectedError: "context deadline exceeded", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var logBuffer bytes.Buffer + logger := func(format string, args ...any) { + fmt.Fprintf(&logBuffer, format+"\n", args...) + } + + actualAttempts := 0 + wrappedOp := func() error { + actualAttempts++ + return tt.operation() + } + + ctx := tt.setupCtx() + err := retryOperation(ctx, wrappedOp, logger) + + require.Error(t, err) + require.Contains(t, err.Error(), tt.expectedError) + require.Equal(t, tt.attempts, actualAttempts, "unexpected number of attempts") + }) + } +} + +func TestIs409ConflictError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "nil error", + err: nil, + want: false, + }, + { + name: "simple 409 conflict", + err: &azcore.ResponseError{ + StatusCode: 409, + }, + want: true, + }, + { + name: "409 error with code=Conflict", + err: &azcore.ResponseError{ + StatusCode: 409, + ErrorCode: "Conflict", + }, + want: true, + }, + { + name: "different status code (404)", + err: &azcore.ResponseError{ + StatusCode: 404, + }, + want: false, + }, + { + name: "non-ResponseError type", + err: fmt.Errorf("some other error"), + want: false, + }, + { + name: "wrapped 409 error", + err: fmt.Errorf("wrapped: %w", &azcore.ResponseError{StatusCode: 409}), + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := is409ConflictError(tt.err) + require.Equal(t, tt.want, got) + }) + } +}