-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathketama.go
106 lines (87 loc) · 2.19 KB
/
ketama.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package ketama
import (
"fmt"
"hash/crc32"
"sort"
"sync"
)
// Node represents a physical node in the cluster.
type Node struct {
ID string // Node ID
Weight int // Node weight
Value interface{} // Additional value to be stored
}
// Ketama represents a ketama ketama.
type Ketama struct {
mu sync.RWMutex
sortedKeys []uint32
nodeHashMap map[uint32]Node
virtualNodes int
}
// NewKetama creates a new Ketama ketama.
func NewKetama(nodes []Node, virtualNodes int) *Ketama {
ketama := &Ketama{
nodeHashMap: make(map[uint32]Node),
virtualNodes: virtualNodes,
}
for _, node := range nodes {
ketama.addNode(node)
}
return ketama
}
// AddNode adds a node to the ketama.
func (k *Ketama) AddNode(node Node) {
k.mu.Lock()
defer k.mu.Unlock()
k.addNode(node)
}
// addNode adds a node to the ketama.
func (k *Ketama) addNode(node Node) {
for i := 0; i < k.virtualNodes; i++ {
nodeKey := generateNodeKey(node.ID, i)
k.nodeHashMap[nodeKey] = node
k.sortedKeys = append(k.sortedKeys, nodeKey)
}
sort.Slice(k.sortedKeys, func(i, j int) bool {
return k.sortedKeys[i] < k.sortedKeys[j]
})
}
// RemoveNode removes a node from the ketama.
func (k *Ketama) RemoveNode(nodeID string) {
k.mu.Lock()
defer k.mu.Unlock()
newSortedKeys := make([]uint32, 0)
for _, key := range k.sortedKeys {
node := k.nodeHashMap[key]
if node.ID != nodeID {
newSortedKeys = append(newSortedKeys, key)
}
}
k.sortedKeys = newSortedKeys
for i := 0; i < k.virtualNodes; i++ {
nodeKey := generateNodeKey(nodeID, i)
delete(k.nodeHashMap, nodeKey)
}
}
// GetNode gets the node for the given key.
func (k *Ketama) GetNode(key string) (Node, bool) {
k.mu.RLock()
defer k.mu.RUnlock()
if len(k.sortedKeys) == 0 {
return Node{}, false
}
keyHash := crc32.ChecksumIEEE([]byte(key))
idx := sort.Search(len(k.sortedKeys), func(i int) bool {
return k.sortedKeys[i] >= keyHash
})
if idx == len(k.sortedKeys) {
idx = 0
}
node := k.nodeHashMap[k.sortedKeys[idx]]
return node, true
}
// generateNodeKey generates a key for the virtual node.
func generateNodeKey(nodeID string, i int) uint32 {
key := fmt.Sprintf("%s-%d", nodeID, i)
return crc32.ChecksumIEEE([]byte(key))
}