From f84850816bb4d18193854ec32a31f55e6c00cc5f Mon Sep 17 00:00:00 2001 From: hulk Date: Mon, 23 Dec 2024 23:12:28 +0800 Subject: [PATCH] Fix should update cluster information to the remote store first (#237) Currently, the cluster migration will update the local cluster before the remote store, so it might cause the local cluster to stop checking the cluster migration even if it failed to set the remote store. --- controller/cluster.go | 45 +++++++++++++++++++++++++----------------- store/cluster.go | 12 +++++++++++ store/cluster_shard.go | 11 +++++++++++ 3 files changed, 50 insertions(+), 18 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index 4f449531..2b975d21 100644 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -266,12 +266,18 @@ func (c *ClusterChecker) probeLoop() { } } -func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster *store.Cluster) { +func (c *ClusterChecker) updateCluster(cluster *store.Cluster) { + c.clusterMu.Lock() + c.cluster = cluster + c.clusterMu.Unlock() +} + +func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedCluster *store.Cluster) { log := logger.Get().With( zap.String("namespace", c.namespace), zap.String("cluster", c.clusterName)) - for i, shard := range cluster.Shards { + for i, shard := range clonedCluster.Shards { if !shard.IsMigrating() { continue } @@ -283,38 +289,41 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster * } if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot { log.Error("Mismatch migrate slot", zap.Int("slot", shard.MigratingSlot)) + return } - if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(cluster.Shards) { + if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) { log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex)) + return } - targetMasterNode := cluster.Shards[shard.TargetShardIndex].GetMasterNode() + targetMasterNode := clonedCluster.Shards[shard.TargetShardIndex].GetMasterNode() switch sourceNodeClusterInfo.MigratingState { case "none", "start": continue case "fail": - c.clusterMu.Lock() - cluster.Shards[i].ClearMigrateState() - c.clusterMu.Unlock() - if err := c.clusterStore.SetCluster(ctx, c.namespace, cluster); err != nil { - log.Error("Failed to clear the migrate state", zap.Error(err)) + clonedCluster.Shards[i].ClearMigrateState() + if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { + log.Error("Failed to update the cluster", zap.Error(err)) + return } + c.updateCluster(clonedCluster) log.Warn("Failed to migrate the slot", zap.Int("slot", shard.MigratingSlot)) case "success": - err := cluster.SetSlot(ctx, shard.MigratingSlot, targetMasterNode.ID()) + err := clonedCluster.SetSlot(ctx, shard.MigratingSlot, targetMasterNode.ID()) if err != nil { log.Error("Failed to set the slot", zap.Error(err)) return } - cluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(cluster.Shards[i].SlotRanges, shard.MigratingSlot) - cluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( - cluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot) - cluster.Shards[i].ClearMigrateState() - if err := c.clusterStore.UpdateCluster(ctx, c.namespace, cluster); err != nil { + clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot) + clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( + clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot) + clonedCluster.Shards[i].ClearMigrateState() + if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) } else { log.Info("Migrate the slot successfully", zap.Int("slot", shard.MigratingSlot)) } + c.updateCluster(clonedCluster) default: log.Error("Unknown migrating state", zap.String("state", sourceNodeClusterInfo.MigratingState)) } @@ -332,12 +341,12 @@ func (c *ClusterChecker) migrationLoop() { return case <-ticker.C: c.clusterMu.Lock() - cluster := c.cluster + clonedCluster := c.cluster.Clone() c.clusterMu.Unlock() - if cluster == nil { + if clonedCluster == nil { continue } - c.tryUpdateMigrationStatus(c.ctx, cluster) + c.tryUpdateMigrationStatus(c.ctx, clonedCluster) } } } diff --git a/store/cluster.go b/store/cluster.go index d4beb06d..d4713dd0 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -77,6 +77,18 @@ func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) { return cluster, nil } +func (cluster *Cluster) Clone() *Cluster { + clone := &Cluster{ + Name: cluster.Name, + Shards: make([]*Shard, 0), + } + clone.Version.Store(cluster.Version.Load()) + for _, shard := range cluster.Shards { + clone.Shards = append(clone.Shards, shard.Clone()) + } + return clone +} + // SetPassword will set the password for all nodes in the cluster. func (cluster *Cluster) SetPassword(password string) { for i := 0; i < len(cluster.Shards); i++ { diff --git a/store/cluster_shard.go b/store/cluster_shard.go index e15333b2..1de4dc52 100644 --- a/store/cluster_shard.go +++ b/store/cluster_shard.go @@ -66,6 +66,17 @@ func NewShard() *Shard { } } +func (shard *Shard) Clone() *Shard { + clone := NewShard() + clone.SlotRanges = make([]SlotRange, len(shard.SlotRanges)) + copy(clone.SlotRanges, shard.SlotRanges) + clone.TargetShardIndex = shard.TargetShardIndex + clone.MigratingSlot = shard.MigratingSlot + clone.Nodes = make([]Node, len(shard.Nodes)) + copy(clone.Nodes, shard.Nodes) + return clone +} + func (shard *Shard) ClearMigrateState() { shard.MigratingSlot = -1 shard.TargetShardIndex = -1