Skip to content

Commit

Permalink
Fix should update cluster information to the remote store first
Browse files Browse the repository at this point in the history
Currently, the cluster migration will update the local cluster before
the remote store, so it might cause the local cluste stop checking the
cluster migration even if it failed to set the remote store.
  • Loading branch information
git-hulk committed Dec 23, 2024
1 parent 01c319b commit 059c6c2
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
41 changes: 25 additions & 16 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,18 @@ func (c *ClusterChecker) probeLoop() {
}
}

func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster *store.Cluster) {
func (c *ClusterChecker) updateCluster(cluster *store.Cluster) {
c.clusterMu.Lock()
c.cluster = cluster
c.clusterMu.Unlock()
}

func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedCluster *store.Cluster) {
log := logger.Get().With(
zap.String("namespace", c.namespace),
zap.String("cluster", c.clusterName))

for i, shard := range cluster.Shards {
for i, shard := range clonedCluster.Shards {
if !shard.IsMigrating() {
continue
}
Expand All @@ -283,38 +289,41 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster *
}
if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot {
log.Error("Mismatch migrate slot", zap.Int("slot", shard.MigratingSlot))
return
}
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(cluster.Shards) {
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex))
return
}
targetMasterNode := cluster.Shards[shard.TargetShardIndex].GetMasterNode()
targetMasterNode := clonedCluster.Shards[shard.TargetShardIndex].GetMasterNode()

switch sourceNodeClusterInfo.MigratingState {
case "none", "start":
continue
case "fail":
c.clusterMu.Lock()
cluster.Shards[i].ClearMigrateState()
c.clusterMu.Unlock()
if err := c.clusterStore.SetCluster(ctx, c.namespace, cluster); err != nil {
log.Error("Failed to clear the migrate state", zap.Error(err))
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
return
}
c.updateCluster(clonedCluster)
log.Warn("Failed to migrate the slot", zap.Int("slot", shard.MigratingSlot))
case "success":
err := cluster.SetSlot(ctx, shard.MigratingSlot, targetMasterNode.ID())
err := clonedCluster.SetSlot(ctx, shard.MigratingSlot, targetMasterNode.ID())
if err != nil {
log.Error("Failed to set the slot", zap.Error(err))
return
}
cluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(cluster.Shards[i].SlotRanges, shard.MigratingSlot)
cluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
cluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
cluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, cluster); err != nil {
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot)
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
} else {
log.Info("Migrate the slot successfully", zap.Int("slot", shard.MigratingSlot))
}
c.updateCluster(clonedCluster)
default:
log.Error("Unknown migrating state", zap.String("state", sourceNodeClusterInfo.MigratingState))
}
Expand All @@ -337,7 +346,7 @@ func (c *ClusterChecker) migrationLoop() {
if cluster == nil {
continue
}
c.tryUpdateMigrationStatus(c.ctx, cluster)
c.tryUpdateMigrationStatus(c.ctx, cluster.Clone())
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) {
return cluster, nil
}

func (cluster *Cluster) Clone() *Cluster {
clone := &Cluster{
Name: cluster.Name,
Shards: make([]*Shard, 0),
}
clone.Version.Store(cluster.Version.Load())
for _, shard := range cluster.Shards {
clone.Shards = append(clone.Shards, shard.Clone())
}
return clone
}

// SetPassword will set the password for all nodes in the cluster.
func (cluster *Cluster) SetPassword(password string) {
for i := 0; i < len(cluster.Shards); i++ {
Expand Down
11 changes: 11 additions & 0 deletions store/cluster_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ func NewShard() *Shard {
}
}

func (shard *Shard) Clone() *Shard {
clone := NewShard()
clone.SlotRanges = make([]SlotRange, len(shard.SlotRanges))
copy(clone.SlotRanges, shard.SlotRanges)
clone.TargetShardIndex = shard.TargetShardIndex
clone.MigratingSlot = shard.MigratingSlot
clone.Nodes = make([]Node, len(shard.Nodes))
copy(clone.Nodes, shard.Nodes)
return clone
}

func (shard *Shard) ClearMigrateState() {
shard.MigratingSlot = -1
shard.TargetShardIndex = -1
Expand Down

0 comments on commit 059c6c2

Please sign in to comment.