-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmerge_utils.go
164 lines (137 loc) · 4.55 KB
/
merge_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package golsm
import (
"container/heap"
"github.com/huandu/skiplist"
)
// Heap entry for the k-way merge algorithm.
type heapEntry struct {
entry *LSMEntry
listIndex int // index of the entry source.
idx int // index of the entry in the list.
iterator *SSTableIterator
}
// Heap implementation for the k-way merge algorithm.
type mergeHeap []heapEntry
func (h mergeHeap) Len() int {
return len(h)
}
// Min heap based on timestamp. The entry with the smallest timestamp is at the
// top of the heap.
func (h mergeHeap) Less(i, j int) bool {
return h[i].entry.Timestamp < h[j].entry.Timestamp
}
func (h mergeHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *mergeHeap) Push(x interface{}) {
*h = append(*h, x.(heapEntry))
}
// Pop the min entry from the heap.
func (h *mergeHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// Performs a k-way merge on a list of possibly overlapping ranges and merges
// them into a single range without any duplicate entries.
// Deduplication is done by keeping track of the most recent entry for each key
// and discarding the older ones using the timestamp.
func mergeRanges(ranges [][]*LSMEntry) []KVPair {
minHeap := &mergeHeap{}
heap.Init(minHeap)
var results []KVPair
// Keep track of the most recent entry for each key, in sorted order of keys.
seen := skiplist.New(skiplist.String)
// Add the first element from each list to the heap.
for i, entries := range ranges {
if len(entries) > 0 {
heap.Push(minHeap, heapEntry{entry: entries[0], listIndex: i, idx: 0})
}
}
for minHeap.Len() > 0 {
// Pop the min entry from the heap.
minEntry := heap.Pop(minHeap).(heapEntry)
previousValue := seen.Get(minEntry.entry.Key)
// Check if this key has been seen before.
if previousValue != nil {
// If the previous entry has a smaller timestamp, then we need to
// replace it with the more recent entry.
if previousValue.Value.(heapEntry).entry.Timestamp < minEntry.entry.Timestamp {
seen.Set(minEntry.entry.Key, minEntry)
}
} else {
// Add the entry to the seen list.
seen.Set(minEntry.entry.Key, minEntry)
}
// Add the next element from the same list to the heap
if minEntry.idx+1 < len(ranges[minEntry.listIndex]) {
nextEntry := ranges[minEntry.listIndex][minEntry.idx+1]
heap.Push(minHeap, heapEntry{entry: nextEntry, listIndex: minEntry.listIndex, idx: minEntry.idx + 1})
}
}
// Iterate through the seen list and add the values to the results.
iter := seen.Front()
for iter != nil {
entry := iter.Value.(heapEntry)
if entry.entry.Command == Command_DELETE {
iter = iter.Next()
continue
}
results = append(results, KVPair{Key: entry.entry.Key, Value: entry.entry.Value})
iter = iter.Next()
}
return results
}
// Performs a k-way merge on SSTable iterators of possibly overlapping ranges
// and merges them into a single range without any duplicate entries.
// Deduplication is done by keeping track of the most recent entry for each key
// and discarding the older ones using the timestamp.
func mergeIterators(iterators []*SSTableIterator) []*LSMEntry {
minHeap := &mergeHeap{}
heap.Init(minHeap)
var results []*LSMEntry
// Keep track of the most recent entry for each key, in sorted order of keys.
seen := skiplist.New(skiplist.String)
// Add the iterators to the heap.
for _, iterator := range iterators {
if iterator == nil {
continue
}
heap.Push(minHeap, heapEntry{entry: iterator.Value, iterator: iterator})
}
for minHeap.Len() > 0 {
// Pop the min entry from the heap.
minEntry := heap.Pop(minHeap).(heapEntry)
previousValue := seen.Get(minEntry.entry.Key)
// Check if this key has been seen before.
if previousValue != nil {
// If the previous entry has a smaller timestamp, then we need to
// replace it with the more recent entry.
if previousValue.Value.(heapEntry).entry.Timestamp < minEntry.entry.Timestamp {
seen.Set(minEntry.entry.Key, minEntry)
}
} else {
// Add the entry to the seen list.
seen.Set(minEntry.entry.Key, minEntry)
}
// Add the next element from the same list to the heap
if minEntry.iterator.Next() != nil {
nextEntry := minEntry.iterator.Value
heap.Push(minHeap, heapEntry{entry: nextEntry, iterator: minEntry.iterator})
}
}
// Iterate through the seen list and add the values to the results.
iter := seen.Front()
for iter != nil {
entry := iter.Value.(heapEntry)
if entry.entry.Command == Command_DELETE {
iter = iter.Next()
continue
}
results = append(results, entry.entry)
iter = iter.Next()
}
return results
}