forked from ava-labs/avalanchego
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.go
282 lines (248 loc) · 9.08 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package indexer
import (
"errors"
"fmt"
"sync"
"go.uber.org/zap"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/prefixdb"
"github.com/ava-labs/avalanchego/database/versiondb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
)
// Maximum number of containers IDs that can be fetched at a time in a call to
// GetContainerRange
const MaxFetchedByRange = 1024
var (
// Maps to the byte representation of the next accepted index
nextAcceptedIndexKey = []byte{0x00}
indexToContainerPrefix = []byte{0x01}
containerToIDPrefix = []byte{0x02}
errNoneAccepted = errors.New("no containers have been accepted")
errNumToFetchInvalid = fmt.Errorf("numToFetch must be in [1,%d]", MaxFetchedByRange)
errNoContainerAtIndex = errors.New("no container at index")
_ snow.Acceptor = (*index)(nil)
)
// index indexes containers in their order of acceptance
//
// Invariant: index is thread-safe.
// Invariant: index assumes that Accept is called, before the container is
// committed to the database of the VM, in the order they were accepted.
type index struct {
clock mockable.Clock
lock sync.RWMutex
// The index of the next accepted transaction
nextAcceptedIndex uint64
// When [baseDB] is committed, writes to [baseDB]
vDB *versiondb.Database
baseDB database.Database
// Both [indexToContainer] and [containerToIndex] have [vDB] underneath
// Index --> Container
indexToContainer database.Database
// Container ID --> Index
containerToIndex database.Database
log logging.Logger
}
// Create a new thread-safe index.
//
// Invariant: Closes [baseDB] on close.
func newIndex(
baseDB database.Database,
log logging.Logger,
clock mockable.Clock,
) (*index, error) {
vDB := versiondb.New(baseDB)
indexToContainer := prefixdb.New(indexToContainerPrefix, vDB)
containerToIndex := prefixdb.New(containerToIDPrefix, vDB)
i := &index{
clock: clock,
baseDB: baseDB,
vDB: vDB,
indexToContainer: indexToContainer,
containerToIndex: containerToIndex,
log: log,
}
// Get next accepted index from db
nextAcceptedIndex, err := database.GetUInt64(i.vDB, nextAcceptedIndexKey)
if err == database.ErrNotFound {
// Couldn't find it in the database. Must not have accepted any containers in previous runs.
i.log.Info("created new index",
zap.Uint64("nextAcceptedIndex", i.nextAcceptedIndex),
)
return i, nil
}
if err != nil {
return nil, fmt.Errorf("couldn't get next accepted index from database: %w", err)
}
i.nextAcceptedIndex = nextAcceptedIndex
i.log.Info("created new index",
zap.Uint64("nextAcceptedIndex", i.nextAcceptedIndex),
)
return i, nil
}
// Close this index
func (i *index) Close() error {
return utils.Err(
i.indexToContainer.Close(),
i.containerToIndex.Close(),
i.vDB.Close(),
i.baseDB.Close(),
)
}
// Index that the given transaction is accepted
// Returned error should be treated as fatal; the VM should not commit [containerID]
// or any new containers as accepted.
func (i *index) Accept(ctx *snow.ConsensusContext, containerID ids.ID, containerBytes []byte) error {
i.lock.Lock()
defer i.lock.Unlock()
// It may be the case that in a previous run of this node, this index committed [containerID]
// as accepted and then the node shut down before the VM committed [containerID] as accepted.
// In that case, when the node restarts Accept will be called with the same container.
// Make sure we don't index the same container twice in that event.
_, err := i.containerToIndex.Get(containerID[:])
if err == nil {
ctx.Log.Debug("not indexing already accepted container",
zap.Stringer("containerID", containerID),
)
return nil
}
if err != database.ErrNotFound {
return fmt.Errorf("couldn't get whether %s is accepted: %w", containerID, err)
}
ctx.Log.Debug("indexing container",
zap.Uint64("nextAcceptedIndex", i.nextAcceptedIndex),
zap.Stringer("containerID", containerID),
)
// Persist index --> Container
nextAcceptedIndexBytes := database.PackUInt64(i.nextAcceptedIndex)
bytes, err := Codec.Marshal(CodecVersion, Container{
ID: containerID,
Bytes: containerBytes,
Timestamp: i.clock.Time().UnixNano(),
})
if err != nil {
return fmt.Errorf("couldn't serialize container %s: %w", containerID, err)
}
if err := i.indexToContainer.Put(nextAcceptedIndexBytes, bytes); err != nil {
return fmt.Errorf("couldn't put accepted container %s into index: %w", containerID, err)
}
// Persist container ID --> index
if err := i.containerToIndex.Put(containerID[:], nextAcceptedIndexBytes); err != nil {
return fmt.Errorf("couldn't map container %s to index: %w", containerID, err)
}
// Persist next accepted index
i.nextAcceptedIndex++
if err := database.PutUInt64(i.vDB, nextAcceptedIndexKey, i.nextAcceptedIndex); err != nil {
return fmt.Errorf("couldn't put accepted container %s into index: %w", containerID, err)
}
// Atomically commit [i.vDB], [i.indexToContainer], [i.containerToIndex] to [i.baseDB]
return i.vDB.Commit()
}
// Returns the ID of the [index]th accepted container and the container itself.
// For example, if [index] == 0, returns the first accepted container.
// If [index] == 1, returns the second accepted container, etc.
// Returns an error if there is no container at the given index.
func (i *index) GetContainerByIndex(index uint64) (Container, error) {
i.lock.RLock()
defer i.lock.RUnlock()
return i.getContainerByIndex(index)
}
// Assumes [i.lock] is held
func (i *index) getContainerByIndex(index uint64) (Container, error) {
lastAcceptedIndex, ok := i.lastAcceptedIndex()
if !ok || index > lastAcceptedIndex {
return Container{}, fmt.Errorf("%w %d", errNoContainerAtIndex, index)
}
indexBytes := database.PackUInt64(index)
return i.getContainerByIndexBytes(indexBytes)
}
// [indexBytes] is the byte representation of the index to fetch.
// Assumes [i.lock] is held
func (i *index) getContainerByIndexBytes(indexBytes []byte) (Container, error) {
containerBytes, err := i.indexToContainer.Get(indexBytes)
if err != nil {
i.log.Error("couldn't read container from database",
zap.Error(err),
)
return Container{}, fmt.Errorf("couldn't read from database: %w", err)
}
var container Container
if _, err := Codec.Unmarshal(containerBytes, &container); err != nil {
return Container{}, fmt.Errorf("couldn't unmarshal container: %w", err)
}
return container, nil
}
// GetContainerRange returns the IDs of containers at indices
// [startIndex], [startIndex+1], ..., [startIndex+numToFetch-1].
// [startIndex] should be <= i.lastAcceptedIndex().
// [numToFetch] should be in [0, MaxFetchedByRange]
func (i *index) GetContainerRange(startIndex, numToFetch uint64) ([]Container, error) {
// Check arguments for validity
if numToFetch == 0 || numToFetch > MaxFetchedByRange {
return nil, fmt.Errorf("%w but is %d", errNumToFetchInvalid, numToFetch)
}
i.lock.RLock()
defer i.lock.RUnlock()
lastAcceptedIndex, ok := i.lastAcceptedIndex()
if !ok {
return nil, errNoneAccepted
} else if startIndex > lastAcceptedIndex {
return nil, fmt.Errorf("start index (%d) > last accepted index (%d)", startIndex, lastAcceptedIndex)
}
// Calculate the last index we will fetch
lastIndex := min(startIndex+numToFetch-1, lastAcceptedIndex)
// [lastIndex] is always >= [startIndex] so this is safe.
// [numToFetch] is limited to [MaxFetchedByRange] so [containers] is bounded in size.
containers := make([]Container, int(lastIndex)-int(startIndex)+1)
n := 0
var err error
for j := startIndex; j <= lastIndex; j++ {
containers[n], err = i.getContainerByIndex(j)
if err != nil {
return nil, fmt.Errorf("couldn't get container at index %d: %w", j, err)
}
n++
}
return containers, nil
}
// Returns database.ErrNotFound if the container is not indexed as accepted
func (i *index) GetIndex(id ids.ID) (uint64, error) {
i.lock.RLock()
defer i.lock.RUnlock()
return database.GetUInt64(i.containerToIndex, id[:])
}
func (i *index) GetContainerByID(id ids.ID) (Container, error) {
i.lock.RLock()
defer i.lock.RUnlock()
// Read index from database
indexBytes, err := i.containerToIndex.Get(id[:])
if err != nil {
return Container{}, err
}
return i.getContainerByIndexBytes(indexBytes)
}
// GetLastAccepted returns the last accepted container.
// Returns an error if no containers have been accepted.
func (i *index) GetLastAccepted() (Container, error) {
i.lock.RLock()
defer i.lock.RUnlock()
lastAcceptedIndex, exists := i.lastAcceptedIndex()
if !exists {
return Container{}, errNoneAccepted
}
return i.getContainerByIndex(lastAcceptedIndex)
}
// Assumes i.lock is held
// Returns:
//
// 1. The index of the most recently accepted transaction, or 0 if no
// transactions have been accepted
// 2. Whether at least 1 transaction has been accepted
func (i *index) lastAcceptedIndex() (uint64, bool) {
return i.nextAcceptedIndex - 1, i.nextAcceptedIndex != 0
}