Skip to content

Commit

Permalink
Fix found bugs when testing raft
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Dec 17, 2024
1 parent dd231a5 commit f422e43
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 24 deletions.
1 change: 1 addition & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (c *Controller) resume(ctx context.Context) error {
}
for _, cluster := range clusters {
c.addCluster(ns, cluster)
logger.Get().Debug("Resume the cluster", zap.String("namespace", ns), zap.String("cluster", cluster))
}
}
return nil
Expand Down
44 changes: 41 additions & 3 deletions server/api/raft.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package api

import (
"errors"
"fmt"
"strings"

"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/server/helper"
"github.com/apache/kvrocks-controller/store/engine/raft"
"go.uber.org/zap"

"github.com/gin-gonic/gin"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -40,8 +61,10 @@ func (r *MemberRequest) validate() error {

func (handler *RaftHandler) ListPeers(c *gin.Context) {
raftNode, _ := c.MustGet(consts.ContextKeyRaftNode).(*raft.Node)
peers := raftNode.ListPeers()
helper.ResponseOK(c, gin.H{"peers": peers})
helper.ResponseOK(c, gin.H{
"leader": raftNode.GetRaftLead(),
"peers": raftNode.ListPeers(),
})
}

func (handler *RaftHandler) UpdatePeer(c *gin.Context) {
Expand All @@ -56,10 +79,25 @@ func (handler *RaftHandler) UpdatePeer(c *gin.Context) {
}

raftNode, _ := c.MustGet(consts.ContextKeyRaftNode).(*raft.Node)
peers := raftNode.ListPeers()

var err error
if req.Operation == OperationAdd {
for _, peer := range peers {
if peer == req.Peer {
helper.ResponseError(c, fmt.Errorf("peer '%s' already exists", req.Peer))
return
}
}
err = raftNode.AddPeer(c, req.ID, req.Peer)
} else {
if _, ok := peers[req.ID]; !ok {
helper.ResponseBadRequest(c, errors.New("peer not exists"))
}
if len(peers) == 1 {
helper.ResponseBadRequest(c, errors.New("can't remove the last peer"))
return
}
err = raftNode.RemovePeer(c, req.ID)
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func RedirectIfNotLeader(c *gin.Context) {
}

_, isRaftMode := storage.GetEngine().(*raft.Node)
// Raft engine will forward the request to the leader nod under the hood,
// Raft engine will forward the request to the leader node under the hood,
// so we don't need to do the redirect.
if !storage.IsLeader() && !isRaftMode {
if !c.GetBool(consts.HeaderIsRedirect) {
Expand Down
19 changes: 16 additions & 3 deletions store/engine/raft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,23 @@

package raft

import "errors"
import (
"errors"
"strings"
)

const (
ClusterStateNew = "new"
ClusterStateExisting = "existing"
)

type Config struct {
// ID is the identity of the local raft. ID cannot be 0.
ID uint64 `yaml:"id"`
// DataDir is the directory to store the raft data which includes snapshot and WALs.
DataDir string `yaml:"data_dir"`
// Join should be set to true if the node is joining an existing cluster.
Join bool `yaml:"join"`
// ClusterState is the state of the cluster, can be one of "new" and "existing".
ClusterState string `yaml:"cluster_state"`
// Peers is the list of raft peers.
Peers []string `yaml:"peers"`
// HeartbeatSeconds is the interval to send heartbeat message. Default is 2 seconds.
Expand All @@ -47,10 +55,15 @@ func (c *Config) validate() error {
if c.ID > uint64(len(c.Peers)) {
return errors.New("ID cannot be greater than the number of peers")
}
clusterState := strings.ToLower(c.ClusterState)
if clusterState != ClusterStateNew && clusterState != ClusterStateExisting {
return errors.New("cluster state must be one of [new, existing]")
}
return nil
}

func (c *Config) init() {
c.ClusterState = ClusterStateNew
if c.DataDir == "" {
c.DataDir = "."
}
Expand Down
7 changes: 7 additions & 0 deletions store/engine/raft/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

func TestConfig_Validate(t *testing.T) {
c := &Config{}
c.init()

// missing ID
require.ErrorContains(t, c.validate(), "ID cannot be 0")
Expand All @@ -40,6 +41,12 @@ func TestConfig_Validate(t *testing.T) {
// ID greater than the number of peers
c.ID = 2
require.ErrorContains(t, c.validate(), "ID cannot be greater than the number of peers")

c.ID = 1
c.ClusterState = "invalid"
require.ErrorContains(t, c.validate(), "cluster state must be one of [new, existing]")
c.ClusterState = ClusterStateNew
require.NoError(t, c.validate())
}

func TestConfig_Init(t *testing.T) {
Expand Down
38 changes: 30 additions & 8 deletions store/engine/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (n *Node) run() error {
n.snapshotIndex = snapshot.Metadata.Index
n.confState = snapshot.Metadata.ConfState

if n.config.Join || walExists {
if n.config.ClusterState == ClusterStateExisting || walExists {
n.raftNode = raft.RestartNode(raftConfig)
} else {
n.raftNode = raft.StartNode(raftConfig, peers)
Expand All @@ -175,6 +175,7 @@ func (n *Node) run() error {
if err := n.runTransport(); err != nil {
return err
}
n.watchLeaderChange()
return n.runRaftMessages()
}

Expand Down Expand Up @@ -225,10 +226,33 @@ func (n *Node) runTransport() error {
return nil
}

func (n *Node) watchLeaderChange() {
n.wg.Add(1)
go func() {
defer n.wg.Done()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-n.shutdown:
return
case <-ticker.C:
lead := n.GetRaftLead()
if lead != n.leader {
n.leader = lead
n.leaderChanged <- true
n.logger.Info("Found leader changed", zap.Uint64("leader", lead))
}
}
}
}()
}

func (n *Node) runRaftMessages() error {
n.wg.Add(1)
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
ticker := time.NewTicker(time.Second)
defer func() {
ticker.Stop()
n.wg.Done()
Expand All @@ -253,9 +277,7 @@ func (n *Node) runRaftMessages() error {
if err := n.applySnapshot(rd.Snapshot); err != nil {
n.logger.Error("Failed to apply snapshot", zap.Error(err))
}
if len(rd.Entries) > 0 {
_ = n.dataStore.raftStorage.Append(rd.Entries)
}
_ = n.dataStore.raftStorage.Append(rd.Entries)

for _, msg := range rd.Messages {
if msg.Type == raftpb.MsgApp {
Expand Down Expand Up @@ -466,14 +488,14 @@ func (n *Node) applyEntry(entry raftpb.Entry) error {
n.peers.Store(cc.NodeID, string(cc.Context))
}
case raftpb.ConfChangeRemoveNode:
n.peers.Delete(cc.NodeID)
if cc.NodeID == n.config.ID {
n.logger.Info("I have been removed from the cluster, will shutdown")
n.Close()
n.logger.Info("Node removed from the cluster")
return nil
}
n.logger.Info("Remove the peer", zap.Uint64("node_id", cc.NodeID))
n.transport.RemovePeer(types.ID(cc.NodeID))
n.peers.Delete(cc.NodeID)
n.logger.Info("Remove the peer", zap.Uint64("node_id", cc.NodeID))
case raftpb.ConfChangeUpdateNode:
n.transport.UpdatePeer(types.ID(cc.NodeID), []string{string(cc.Context)})
if _, ok := n.peers.Load(cc.NodeID); ok {
Expand Down
5 changes: 5 additions & 0 deletions store/engine/raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func NewTestCluster(n int) *TestCluster {
HeartbeatSeconds: 1,
ElectionSeconds: 2,
})
// drain leader change events
go func() {
for range nodes[i].LeaderChange() {
}
}()
}
return &TestCluster{nodes: nodes}
}
Expand Down
18 changes: 12 additions & 6 deletions store/engine/raft/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,19 @@ func (ds *DataStore) List(prefix string) []engine.Entry {
ds.mu.RLock()
defer ds.mu.RUnlock()
entries := make([]engine.Entry, 0)
for k := range ds.kvs {
if strings.HasPrefix(k, prefix) {
entries = append(entries, engine.Entry{
Key: strings.TrimLeft(strings.TrimPrefix(k, prefix), "/"),
Value: ds.kvs[k],
})
for key := range ds.kvs {
if !strings.HasPrefix(key, prefix) || key == prefix {
continue
}
trimmedKey := strings.TrimLeft(key[len(prefix)+1:], "/")
if strings.ContainsRune(trimmedKey, '/') {
continue
}

entries = append(entries, engine.Entry{
Key: trimmedKey,
Value: ds.kvs[trimmedKey],
})
}
slices.SortFunc(entries, func(i, j engine.Entry) int {
return strings.Compare(i.Key, j.Key)
Expand Down
6 changes: 3 additions & 3 deletions store/engine/raft/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func TestDataStore(t *testing.T) {
entries = store.List("ba")
require.Len(t, entries, 4)

entries = store.List("bar-2")
require.Len(t, entries, 1)
entries = store.List("bar")
require.Len(t, entries, 2)

entries = store.List("foo")
entries = store.List("fo")
require.Len(t, entries, 1)

store.Delete("bar-2")
Expand Down

0 comments on commit f422e43

Please sign in to comment.