Skip to content

Commit

Permalink
Fix should update cluster information to the remote store first (#237)
Browse files Browse the repository at this point in the history
Currently, the cluster migration will update the local cluster before
the remote store, so it might cause the local cluster to stop checking the
cluster migration even if it failed to set the remote store.
  • Loading branch information
git-hulk authored Dec 23, 2024
1 parent 01c319b commit f848508
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 18 deletions.
45 changes: 27 additions & 18 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,18 @@ func (c *ClusterChecker) probeLoop() {
}
}

func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster *store.Cluster) {
func (c *ClusterChecker) updateCluster(cluster *store.Cluster) {
c.clusterMu.Lock()
c.cluster = cluster
c.clusterMu.Unlock()
}

func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedCluster *store.Cluster) {
log := logger.Get().With(
zap.String("namespace", c.namespace),
zap.String("cluster", c.clusterName))

for i, shard := range cluster.Shards {
for i, shard := range clonedCluster.Shards {
if !shard.IsMigrating() {
continue
}
Expand All @@ -283,38 +289,41 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster *
}
if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot {
log.Error("Mismatch migrate slot", zap.Int("slot", shard.MigratingSlot))
return
}
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(cluster.Shards) {
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex))
return
}
targetMasterNode := cluster.Shards[shard.TargetShardIndex].GetMasterNode()
targetMasterNode := clonedCluster.Shards[shard.TargetShardIndex].GetMasterNode()

switch sourceNodeClusterInfo.MigratingState {
case "none", "start":
continue
case "fail":
c.clusterMu.Lock()
cluster.Shards[i].ClearMigrateState()
c.clusterMu.Unlock()
if err := c.clusterStore.SetCluster(ctx, c.namespace, cluster); err != nil {
log.Error("Failed to clear the migrate state", zap.Error(err))
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
return
}
c.updateCluster(clonedCluster)
log.Warn("Failed to migrate the slot", zap.Int("slot", shard.MigratingSlot))
case "success":
err := cluster.SetSlot(ctx, shard.MigratingSlot, targetMasterNode.ID())
err := clonedCluster.SetSlot(ctx, shard.MigratingSlot, targetMasterNode.ID())
if err != nil {
log.Error("Failed to set the slot", zap.Error(err))
return
}
cluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(cluster.Shards[i].SlotRanges, shard.MigratingSlot)
cluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
cluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
cluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, cluster); err != nil {
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot)
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
} else {
log.Info("Migrate the slot successfully", zap.Int("slot", shard.MigratingSlot))
}
c.updateCluster(clonedCluster)
default:
log.Error("Unknown migrating state", zap.String("state", sourceNodeClusterInfo.MigratingState))
}
Expand All @@ -332,12 +341,12 @@ func (c *ClusterChecker) migrationLoop() {
return
case <-ticker.C:
c.clusterMu.Lock()
cluster := c.cluster
clonedCluster := c.cluster.Clone()
c.clusterMu.Unlock()
if cluster == nil {
if clonedCluster == nil {
continue
}
c.tryUpdateMigrationStatus(c.ctx, cluster)
c.tryUpdateMigrationStatus(c.ctx, clonedCluster)
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) {
return cluster, nil
}

func (cluster *Cluster) Clone() *Cluster {
clone := &Cluster{
Name: cluster.Name,
Shards: make([]*Shard, 0),
}
clone.Version.Store(cluster.Version.Load())
for _, shard := range cluster.Shards {
clone.Shards = append(clone.Shards, shard.Clone())
}
return clone
}

// SetPassword will set the password for all nodes in the cluster.
func (cluster *Cluster) SetPassword(password string) {
for i := 0; i < len(cluster.Shards); i++ {
Expand Down
11 changes: 11 additions & 0 deletions store/cluster_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ func NewShard() *Shard {
}
}

func (shard *Shard) Clone() *Shard {
clone := NewShard()
clone.SlotRanges = make([]SlotRange, len(shard.SlotRanges))
copy(clone.SlotRanges, shard.SlotRanges)
clone.TargetShardIndex = shard.TargetShardIndex
clone.MigratingSlot = shard.MigratingSlot
clone.Nodes = make([]Node, len(shard.Nodes))
copy(clone.Nodes, shard.Nodes)
return clone
}

func (shard *Shard) ClearMigrateState() {
shard.MigratingSlot = -1
shard.TargetShardIndex = -1
Expand Down

0 comments on commit f848508

Please sign in to comment.