Skip to content

Commit

Permalink
Merge pull request #619 from ploubser/balancer_maint
Browse files Browse the repository at this point in the history
(maint) Clean up output and test for broken leader name
  • Loading branch information
ripienaar authored Jan 23, 2025
2 parents d4b9b1a + 713e7e2 commit 5623676
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,22 @@ func (b *Balancer) createClusterMappings(e balanceEntity, clusterMap map[string]
leadername := info.Leader
active := clusterMap[clustername]

_, ok = active.peers[leadername]
if !ok {
tmp := peer{
name: leadername,
entities: []balanceEntity{},
offset: 0,
// This can happen if there very outdated or broken streams in the cluster. Very edge case
// but if we don't check for it really messes up the balancer math.
if leadername == "" {
b.log.Warnf("Found entity without leader %s", e.Name())
} else {
_, ok = active.peers[leadername]
if !ok {
tmp := peer{
name: leadername,
entities: []balanceEntity{},
offset: 0,
}
active.peers[leadername] = &tmp
}
active.peers[leadername] = &tmp
active.peers[leadername].entities = append(active.peers[leadername].entities, e)
}
active.peers[leadername].entities = append(active.peers[leadername].entities, e)

for _, replica := range info.Replicas {
_, ok = active.peers[replica.Name]
Expand Down Expand Up @@ -204,7 +210,6 @@ func (b *Balancer) calcLeaderOffset(p map[string]*peer, distribution int) {

func (b Balancer) logClusterStats(clusterMap map[string]*cluster) {
for k, v := range clusterMap {
b.log.Infof("")
b.log.Infof("Found cluster %s with a balanced distribution of %d", k, v.balancedDistribution)
b.log.Infof("Cluster %s has %d available peers", k, len(v.peers))
for _, server := range v.peers {
Expand Down Expand Up @@ -239,9 +244,9 @@ func (b *Balancer) BalanceStreams(streams []*jsm.Stream) (int, error) {
b.calcLeaderOffset(v.peers, v.balancedDistribution)
}

b.logClusterStats(clusterMap)
// Balance each cluster
for k, v := range clusterMap {
b.logClusterStats(clusterMap)
b.log.Debugf("balancing streams on cluster %s", k)
b, err := b.balance(v.peers, v.balancedDistribution, k, "stream")
if err != nil {
Expand Down Expand Up @@ -278,9 +283,9 @@ func (b *Balancer) BalanceConsumers(consumers []*jsm.Consumer) (int, error) {
b.calcLeaderOffset(v.peers, v.balancedDistribution)
}

b.logClusterStats(clusterMap)
// Balance each cluster
for k, v := range clusterMap {
b.logClusterStats(clusterMap)
b.log.Debugf("balancing consumers on cluster %s", k)
b, err := b.balance(v.peers, v.balancedDistribution, k, "consumer")
if err != nil {
Expand Down

0 comments on commit 5623676

Please sign in to comment.