Skip to content

Commit

Permalink
another simple win checking replication factor on topics
Browse files Browse the repository at this point in the history
  • Loading branch information
alejandroEsc committed Apr 2, 2024
1 parent c978f89 commit ca0dac1
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions src/go/k8s/internal/controller/redpanda/redpanda_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"maps"
"reflect"
"strconv"
"time"

helmv2beta1 "github.com/fluxcd/helm-controller/api/v2beta1"
Expand Down Expand Up @@ -61,6 +62,11 @@ const (

revisionPath = "/revision"
componentLabelValue = "redpanda-statefulset"

minimumTopicReplicas = 3

// these constants can be removed after versions older that v22.3.1 are no longer supported
defaultTopicReplicationKey = "default_topic_replications"
)

var errWaitForReleaseDeletion = errors.New("wait for helm release deletion")
Expand Down Expand Up @@ -936,5 +942,40 @@ func validateHelmReleaseReplicaCount(rp *v1alpha1.Redpanda, hr *helmv2beta2.Helm
return fmt.Errorf("requested replicas of %d is less than %d neeed to maintain quorum", requestedReplicas, minForQuorum)
}

// If quorum may be preserved, then find out about topic replication and ensure we do not go below default
specConfigs := clusterSpec.Config
doCheckDefMinTopicReplicas := true
if clusterSpec.Config != nil {
clusterInfo := specConfigs.Cluster
if clusterInfo != nil {
clusterMap := make(map[string]string)
errUnmar := json.Unmarshal(clusterInfo.Raw, clusterMap)
if errUnmar != nil {
return fmt.Errorf("cannot unmarshal cluster config data %w", errUnmar)
}

minReplicationFactor, ok := clusterMap[defaultTopicReplicationKey]
if ok {
doCheckDefMinTopicReplicas = false
minReplicationFactorInt, errConvert := strconv.Atoi(minReplicationFactor)
if errConvert != nil {
return fmt.Errorf("cannot unmarshal cluster config data %w", errConvert)
}

if requestedReplicas < minReplicationFactorInt {
// nolint:goerr113 // error is not wrapping existing error
return fmt.Errorf("requested replicas of %d is less than replication factor %s", requestedReplicas, minReplicationFactorInt)
}
}
}
}

if doCheckDefMinTopicReplicas {
if requestedReplicas < minimumTopicReplicas {
// nolint:goerr113 // error is not wrapping existing error
return fmt.Errorf("requested replicas of %d is less than replication factor %s", requestedReplicas, minimumTopicReplicas)
}
}

return nil
}

0 comments on commit ca0dac1

Please sign in to comment.