Skip to content

Commit

Permalink
Fix data race when updating the raft snapshot and compact threshold (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk authored Jan 7, 2025
1 parent 77461f2 commit dd7a8c7
Showing 1 changed file with 14 additions and 17 deletions.
31 changes: 14 additions & 17 deletions store/engine/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ type Node struct {
logger *zap.Logger
peers sync.Map

mu sync.Mutex
leader uint64
appliedIndex uint64
snapshotIndex uint64
confState raftpb.ConfState
snapshotThreshold uint64
compactThreshold uint64
snapshotThreshold atomic.Uint64
compactThreshold atomic.Uint64

wg sync.WaitGroup
shutdown chan struct{}
Expand All @@ -97,14 +96,14 @@ func New(config *Config) (*Node, error) {

logger := logger.Get().With(zap.Uint64("node_id", config.ID))
n := &Node{
config: config,
leader: raft.None,
dataStore: NewDataStore(config.DataDir),
leaderChanged: make(chan bool),
snapshotThreshold: defaultSnapshotThreshold,
compactThreshold: defaultCompactThreshold,
logger: logger,
}
config: config,
leader: raft.None,
dataStore: NewDataStore(config.DataDir),
leaderChanged: make(chan bool),
logger: logger,
}
n.snapshotThreshold.Store(defaultSnapshotThreshold)
n.compactThreshold.Store(defaultCompactThreshold)
if err := n.run(); err != nil {
return nil, err
}
Expand All @@ -127,9 +126,7 @@ func (n *Node) ListPeers() map[uint64]string {
}

func (n *Node) SetSnapshotThreshold(threshold uint64) {
n.mu.Lock()
defer n.mu.Unlock()
n.snapshotThreshold = threshold
n.snapshotThreshold.Store(threshold)
}

func (n *Node) run() error {
Expand Down Expand Up @@ -309,7 +306,7 @@ func (n *Node) runRaftMessages() error {
}

func (n *Node) triggerSnapshotIfNeed() error {
if n.appliedIndex-n.snapshotIndex <= n.snapshotThreshold {
if n.appliedIndex-n.snapshotIndex <= n.snapshotThreshold.Load() {
return nil
}
snapshotBytes, err := n.dataStore.GetDataStoreSnapshot()
Expand All @@ -325,8 +322,8 @@ func (n *Node) triggerSnapshotIfNeed() error {
}

compactIndex := uint64(1)
if n.appliedIndex > n.compactThreshold {
compactIndex = n.appliedIndex - n.compactThreshold
if n.appliedIndex > n.compactThreshold.Load() {
compactIndex = n.appliedIndex - n.compactThreshold.Load()
}
if err := n.dataStore.raftStorage.Compact(compactIndex); err != nil && !errors.Is(err, raft.ErrCompacted) {
return err
Expand Down

0 comments on commit dd7a8c7

Please sign in to comment.