diff --git a/controller/cluster.go b/controller/cluster.go index 4f44953..2b975d2 100644 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -266,12 +266,18 @@ func (c *ClusterChecker) probeLoop() { } } -func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster *store.Cluster) { +func (c *ClusterChecker) updateCluster(cluster *store.Cluster) { + c.clusterMu.Lock() + c.cluster = cluster + c.clusterMu.Unlock() +} + +func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedCluster *store.Cluster) { log := logger.Get().With( zap.String("namespace", c.namespace), zap.String("cluster", c.clusterName)) - for i, shard := range cluster.Shards { + for i, shard := range clonedCluster.Shards { if !shard.IsMigrating() { continue } @@ -283,38 +289,41 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster * } if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot { log.Error("Mismatch migrate slot", zap.Int("slot", shard.MigratingSlot)) + return } - if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(cluster.Shards) { + if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) { log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex)) + return } - targetMasterNode := cluster.Shards[shard.TargetShardIndex].GetMasterNode() + targetMasterNode := clonedCluster.Shards[shard.TargetShardIndex].GetMasterNode() switch sourceNodeClusterInfo.MigratingState { case "none", "start": continue case "fail": - c.clusterMu.Lock() - cluster.Shards[i].ClearMigrateState() - c.clusterMu.Unlock() - if err := c.clusterStore.SetCluster(ctx, c.namespace, cluster); err != nil { - log.Error("Failed to clear the migrate state", zap.Error(err)) + clonedCluster.Shards[i].ClearMigrateState() + if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { + log.Error("Failed to update the cluster", zap.Error(err)) + return } + c.updateCluster(clonedCluster) log.Warn("Failed to migrate the slot", zap.Int("slot", shard.MigratingSlot)) case "success": - err := cluster.SetSlot(ctx, shard.MigratingSlot, targetMasterNode.ID()) + err := clonedCluster.SetSlot(ctx, shard.MigratingSlot, targetMasterNode.ID()) if err != nil { log.Error("Failed to set the slot", zap.Error(err)) return } - cluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(cluster.Shards[i].SlotRanges, shard.MigratingSlot) - cluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( - cluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot) - cluster.Shards[i].ClearMigrateState() - if err := c.clusterStore.UpdateCluster(ctx, c.namespace, cluster); err != nil { + clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot) + clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( + clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot) + clonedCluster.Shards[i].ClearMigrateState() + if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) } else { log.Info("Migrate the slot successfully", zap.Int("slot", shard.MigratingSlot)) } + c.updateCluster(clonedCluster) default: log.Error("Unknown migrating state", zap.String("state", sourceNodeClusterInfo.MigratingState)) } @@ -332,12 +341,12 @@ func (c *ClusterChecker) migrationLoop() { return case <-ticker.C: c.clusterMu.Lock() - cluster := c.cluster + clonedCluster := c.cluster.Clone() c.clusterMu.Unlock() - if cluster == nil { + if clonedCluster == nil { continue } - c.tryUpdateMigrationStatus(c.ctx, cluster) + c.tryUpdateMigrationStatus(c.ctx, clonedCluster) } } } diff --git a/store/cluster.go b/store/cluster.go index d4beb06..d4713dd 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -77,6 +77,18 @@ func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) { return cluster, nil } +func (cluster *Cluster) Clone() *Cluster { + clone := &Cluster{ + Name: cluster.Name, + Shards: make([]*Shard, 0), + } + clone.Version.Store(cluster.Version.Load()) + for _, shard := range cluster.Shards { + clone.Shards = append(clone.Shards, shard.Clone()) + } + return clone +} + // SetPassword will set the password for all nodes in the cluster. func (cluster *Cluster) SetPassword(password string) { for i := 0; i < len(cluster.Shards); i++ { diff --git a/store/cluster_shard.go b/store/cluster_shard.go index e15333b..1de4dc5 100644 --- a/store/cluster_shard.go +++ b/store/cluster_shard.go @@ -66,6 +66,17 @@ func NewShard() *Shard { } } +func (shard *Shard) Clone() *Shard { + clone := NewShard() + clone.SlotRanges = make([]SlotRange, len(shard.SlotRanges)) + copy(clone.SlotRanges, shard.SlotRanges) + clone.TargetShardIndex = shard.TargetShardIndex + clone.MigratingSlot = shard.MigratingSlot + clone.Nodes = make([]Node, len(shard.Nodes)) + copy(clone.Nodes, shard.Nodes) + return clone +} + func (shard *Shard) ClearMigrateState() { shard.MigratingSlot = -1 shard.TargetShardIndex = -1