Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Raft demo in Go #89

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ store/
.DS_Store
.idea
/demo/rust/target
/demo/go/cmd/maelstrom-raft/maelstrom-raft
/demo/go/cmd/maelstrom-raft/vendor
maelstrom
__pycache__/
51 changes: 51 additions & 0 deletions demo/go/cmd/maelstrom-raft/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Raft Demo in Go with Maelstrom

This repository contains a demo of the Raft consensus algorithm implemented in Go, designed to run within the Maelstrom framework.

## Prerequisites

To run this demo, you will need the following installed on your system:

- Go (version 1.21.0 or later)
- Maelstrom (latest version)

## Getting Started

Follow these steps to build and run the Raft demo:

### 1. Clone the Repository

Clone the Maelstrom repository if you haven't already:

```shell
git clone https://github.com/jepsen-io/maelstrom.git
cd maelstrom
```

### 2. Build the Raft Demo

Navigate to the Raft demo directory and build the Go application:
```shell
(cd demo/go/cmd/maelstrom-raft/ && go build)
```

### 3. Run the Demo

Execute the following command to run the Raft demo using Maelstrom with the specified parameters:

```shell
./maelstrom test -w lin-kv --bin demo/go/cmd/maelstrom-raft/maelstrom-raft --node-count 3 --concurrency 4n --rate 40 --time-limit 60 --nemesis partition --nemesis-interval 10 --test-count 1
```

### Explanation of the Command

- `./maelstrom test` : Invokes the Maelstrom test runner.
- `-w lin-kv`: Specifies the workload to be linearizable key-value store.
- `--bin demo/go/cmd/maelstrom-raft/maelstrom-raft`: Points to the built Raft demo binary.
- `--node-count 3`: Sets the number of nodes in the Raft cluster to 3.
- `--concurrency 4n`: Sets the concurrency level to 4 operations per node.
- `--rate 30`: Sets the rate of operations per second.
- `--time-limit 60`: Sets the time limit for the test to 60 seconds.
- `--nemesis partition`: Introduces network partitions as the fault injection.
- `--nemesis-interval 10`: time in seconds between nemesis operations, on average.
` `--test-count 10`: Runs the test for 10 counts.
12 changes: 12 additions & 0 deletions demo/go/cmd/maelstrom-raft/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/jepsen-io/maelstrom/demo/go/cmd/maelstrom-raft

go 1.21.0

require (
github.com/jepsen-io/maelstrom/demo/go v0.0.0-20240408130303-0186f398f965
github.com/samber/lo v1.39.0
golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d
)

// Use local dependency
replace github.com/jepsen-io/maelstrom/demo/go => ../../
4 changes: 4 additions & 0 deletions demo/go/cmd/maelstrom-raft/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d h1:N0hmiNbwsSNwHBAvR3QB5w25pUwH4tK0Y/RltD1j1h4=
golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
180 changes: 180 additions & 0 deletions demo/go/cmd/maelstrom-raft/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package main

import (
"encoding/json"
"fmt"
maelstrom "github.com/jepsen-io/maelstrom/demo/go"
"log"
)

// When a node requests our vote...
func (raft *RaftNode) requestVote(msg maelstrom.Message) error {
var requestVoteMsgBody RequestVoteMsgBody
if err := json.Unmarshal(msg.Body, &requestVoteMsgBody); err != nil {
return err
}

raft.maybeStepDown(requestVoteMsgBody.Term)
grant := false

if requestVoteMsgBody.Term < raft.currentTerm {
log.Printf("candidate Term %d lower than %d not granting vote \n", requestVoteMsgBody.Term, raft.currentTerm)
} else if raft.votedFor != "" {
log.Printf("already voted for %s not granting vote \n", raft.votedFor)
} else if requestVoteMsgBody.LastLogTerm < raft.log.lastTerm() {
log.Printf("have log Entries From Term %d which is newer than remote Term %d not granting vote\n", raft.log.lastTerm(), requestVoteMsgBody.LastLogTerm)
} else if requestVoteMsgBody.LastLogTerm == raft.log.lastTerm() && requestVoteMsgBody.LastLogIndex < raft.log.size() {
log.Printf("our logs are both at Term %d but our log is %d and theirs is only %d \n", raft.log.lastTerm(), raft.log.size(), requestVoteMsgBody.LastLogIndex)
} else {
log.Printf("before raft.votedFor %s\n", raft.votedFor)
log.Printf("CandidateId: %s\n", requestVoteMsgBody.CandidateId)
log.Printf("Granting vote To %s\n", msg.Src)
grant = true
raft.votedFor = requestVoteMsgBody.CandidateId
raft.resetElectionDeadline()
log.Printf("after raft.votedFor %s\n", raft.votedFor)
}

err := raft.node.Reply(msg, map[string]interface{}{
"type": MsgTypeRequestVoteResult,
"term": raft.currentTerm,
"vote_granted": grant,
})
if err != nil {
return err
}

return nil
}

func (raft *RaftNode) appendEntries(msg maelstrom.Message) error {
var appendEntriesMsgBody AppendEntriesMsgBody
err := json.Unmarshal(msg.Body, &appendEntriesMsgBody)
if err != nil {
panic(err)
}

raft.maybeStepDown(appendEntriesMsgBody.Term)

result := map[string]interface{}{
"type": MsgTypeAppendEntriesResult,
"term": raft.currentTerm,
"success": false,
}

if appendEntriesMsgBody.Term < raft.currentTerm {
// leader is behind us
raft.node.Reply(msg, result)
return nil
}

// This leader is valid; remember them and don't try to run our own election for a bit
raft.leaderId = appendEntriesMsgBody.LeaderId
raft.resetElectionDeadline()

// Check previous entry To see if it matches
if appendEntriesMsgBody.PrevLogIndex <= 0 {
panic(fmt.Errorf("out of bounds previous log index %d \n", appendEntriesMsgBody.PrevLogIndex))
}

if appendEntriesMsgBody.PrevLogIndex > len(raft.log.Entries) || (appendEntriesMsgBody.PrevLogTerm != raft.log.get(appendEntriesMsgBody.PrevLogIndex).Term) {
// We disagree on the previous term
raft.node.Reply(msg, result)
return nil
}

// We agree on the previous log Term; truncate and append
raft.log.truncate(appendEntriesMsgBody.PrevLogIndex)
raft.log.append(appendEntriesMsgBody.Entries)

// Advance commit pointer
if raft.commitIndex < appendEntriesMsgBody.LeaderCommit {
raft.commitIndex = min(appendEntriesMsgBody.LeaderCommit, raft.log.size())
raft.advanceCommitIndex()
}

// Acknowledge
result["success"] = true
raft.node.Reply(msg, result)
return nil
}

func (raft *RaftNode) setupHandlers() error {
// Handle Client KV requests
kvRequests := func(msg maelstrom.Message, op Operation) error {
if raft.state == StateLeader {
raft.log.append([]Entry{{
Term: raft.currentTerm,
Op: &op,
Msg: msg,
}})
} else if raft.leaderId != "" {
// we're not the leader, but we can proxy to one
msg.Dest = raft.leaderId
raft.node.Send(raft.leaderId, msg.Body)
} else {
return raft.node.Reply(msg, &ErrorMsgBody{
Type: MsgTypeError,
Code: ErrCodeTemporarilyUnavailable,
Text: ErrNotLeader,
})
}

return nil
}

kvReadRequest := func(msg maelstrom.Message) error {
var readMsgBody ReadMsgBody
err := json.Unmarshal(msg.Body, &readMsgBody)
if err != nil {
panic(err)
}

return kvRequests(msg, Operation{
Type: readMsgBody.Type,
MsgId: readMsgBody.MsgId,
Key: readMsgBody.Key,
Client: readMsgBody.Client,
})
}

kvWriteRequest := func(msg maelstrom.Message) error {
var writeMsgBody WriteMsgBody
err := json.Unmarshal(msg.Body, &writeMsgBody)
if err != nil {
panic(err)
}

return kvRequests(msg, Operation{
Type: writeMsgBody.Type,
MsgId: int(writeMsgBody.MsgId),
Key: writeMsgBody.Key,
Client: writeMsgBody.Client,
Value: writeMsgBody.Value,
})
}

kvCasRequest := func(msg maelstrom.Message) error {
var casMsgBody CasMsgBody
err := json.Unmarshal(msg.Body, &casMsgBody)
if err != nil {
panic(err)
}

return kvRequests(msg, Operation{
Type: casMsgBody.Type,
MsgId: casMsgBody.MsgId,
Key: casMsgBody.Key,
Client: casMsgBody.Client,
From: casMsgBody.From,
To: casMsgBody.To,
})
}

raft.node.Handle(string(MsgTypeRead), kvReadRequest)
raft.node.Handle(string(MsgTypeWrite), kvWriteRequest)
raft.node.Handle(string(MsgTypeCas), kvCasRequest)
raft.node.Handle(string(MsgTypeRequestVote), raft.requestVote)
raft.node.Handle(string(MsgTypeAppendEntries), raft.appendEntries)
return nil
}
75 changes: 75 additions & 0 deletions demo/go/cmd/maelstrom-raft/kv_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"fmt"
"log"
"sync"
)

// KVStore A state machine providing a Key-Value store.
type KVStore struct {
state map[int]int
wu sync.Mutex
}

func (kvStore *KVStore) init() {
kvStore.state = map[int]int{}
}

func (kvStore *KVStore) apply(op Operation) any {
kvStore.wu.Lock()
defer kvStore.wu.Unlock()
// Applies an operation to the state machine, and returns a response message.
t := op.Type
k := op.Key

var body any
// Handle state transition
if t == MsgTypeRead {
if value, ok := kvStore.state[k]; ok {
body = &ReadOkMsgBody{
Type: MsgTypeReadOk,
Value: value,
}
} else {
body = &ErrorMsgBody{
Type: MsgTypeError,
Code: ErrCodeKeyDoesNotExist,
Text: ErrTxtNotFound,
}
}
} else if t == MsgTypeWrite {
kvStore.state[k] = op.Value
body = &WriteOkMsgBody{
Type: MsgTypeWriteOk,
}
} else if t == MsgTypeCas {
if value, ok := kvStore.state[k]; !ok {
body = &ErrorMsgBody{
Type: MsgTypeError,
Code: ErrCodeKeyDoesNotExist,
Text: ErrTxtNotFound,
}
} else if value != op.From {
body = &ErrorMsgBody{
Type: MsgTypeError,
Code: ErrCodePreconditionFailed,
Text: fmt.Sprintf(ErrExpectedButHad, op.From, value),
}
} else {
kvStore.state[k] = op.To
body = &CasOkMsgBody{
Type: MsgTypeCasOk,
}
}
}

log.Printf("KV:\n %v \n", kvStore.state)
return body
}

func newKVStore() *KVStore {
kvStore := KVStore{}
kvStore.init()
return &kvStore
}
61 changes: 61 additions & 0 deletions demo/go/cmd/maelstrom-raft/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"fmt"
"github.com/samber/lo"
)

type Log struct {
Entries []Entry
}

func (log *Log) init() {
// Note that we provide a default entry here, which simplifies
// some default cases involving empty logs.
log.Entries = []Entry{{
Term: 0,
}}
}

func (log *Log) get(index int) Entry {
// Return a log entry by index. Note that Raft's log is 1-indexed.
return log.Entries[index-1]
}

func (log *Log) append(entries []Entry) {
// Appends multiple entries to the log
log.Entries = append(log.Entries, entries...)
}

func (log *Log) last() Entry {
// Returns the most recent entry
return log.Entries[len(log.Entries)-1]
}

func (log *Log) lastTerm() int {
// What's the term of the last entry in the log?
return log.last().Term // TODO: if index exception return 0
}

func (log *Log) size() int {
return len(log.Entries)
}

func (log *Log) truncate(size int) {
// Truncate the log to this many entries
log.Entries = lo.Slice(log.Entries, 0, size)
}

func (log *Log) fromIndex(index int) []Entry {
if index <= 0 {
panic(fmt.Errorf("illegal index %d", index))
}

return lo.Slice(log.Entries, index-1, len(log.Entries))
}

func newLog() *Log {
log := Log{}
log.init()
return &log
}
Loading