forked from dgraph-io/badger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanifest.go
475 lines (425 loc) · 13.2 KB
/
manifest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"os"
"path/filepath"
"sync"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)
// Manifest represents the contents of the MANIFEST file in a Badger store.
//
// The MANIFEST file describes the startup state of the db -- all LSM files and what level they're
// at.
//
// It consists of a sequence of ManifestChangeSet objects. Each of these is treated atomically,
// and contains a sequence of ManifestChange's (file creations/deletions) which we use to
// reconstruct the manifest at startup.
type Manifest struct {
Levels []levelManifest
Tables map[uint64]TableManifest
// Contains total number of creation and deletion changes in the manifest -- used to compute
// whether it'd be useful to rewrite the manifest.
Creations int
Deletions int
}
func createManifest() Manifest {
levels := make([]levelManifest, 0)
return Manifest{
Levels: levels,
Tables: make(map[uint64]TableManifest),
}
}
// levelManifest contains information about LSM tree levels
// in the MANIFEST file.
type levelManifest struct {
Tables map[uint64]struct{} // Set of table id's
}
// TableManifest contains information about a specific table
// in the LSM tree.
type TableManifest struct {
Level uint8
KeyID uint64
Compression options.CompressionType
}
// manifestFile holds the file pointer (and other info) about the manifest file, which is a log
// file we append to.
type manifestFile struct {
fp *os.File
directory string
// We make this configurable so that unit tests can hit rewrite() code quickly
deletionsRewriteThreshold int
// Guards appends, which includes access to the manifest field.
appendLock sync.Mutex
// Used to track the current state of the manifest, used when rewriting.
manifest Manifest
// Used to indicate if badger was opened in InMemory mode.
inMemory bool
}
const (
// ManifestFilename is the filename for the manifest file.
ManifestFilename = "MANIFEST"
manifestRewriteFilename = "MANIFEST-REWRITE"
manifestDeletionsRewriteThreshold = 10000
manifestDeletionsRatio = 10
)
// asChanges returns a sequence of changes that could be used to recreate the Manifest in its
// present state.
func (m *Manifest) asChanges() []*pb.ManifestChange {
changes := make([]*pb.ManifestChange, 0, len(m.Tables))
for id, tm := range m.Tables {
changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID, tm.Compression))
}
return changes
}
func (m *Manifest) clone() Manifest {
changeSet := pb.ManifestChangeSet{Changes: m.asChanges()}
ret := createManifest()
y.Check(applyChangeSet(&ret, &changeSet))
return ret
}
// openOrCreateManifestFile opens a Badger manifest file if it exists, or creates one if
// doesn’t exists.
func openOrCreateManifestFile(opt Options) (
ret *manifestFile, result Manifest, err error) {
if opt.InMemory {
return &manifestFile{inMemory: true}, Manifest{}, nil
}
return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, manifestDeletionsRewriteThreshold)
}
func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold int) (
*manifestFile, Manifest, error) {
path := filepath.Join(dir, ManifestFilename)
var flags uint32
if readOnly {
flags |= y.ReadOnly
}
fp, err := y.OpenExistingFile(path, flags) // We explicitly sync in addChanges, outside the lock.
if err != nil {
if !os.IsNotExist(err) {
return nil, Manifest{}, err
}
if readOnly {
return nil, Manifest{}, fmt.Errorf("no manifest found, required for read-only db")
}
m := createManifest()
fp, netCreations, err := helpRewrite(dir, &m)
if err != nil {
return nil, Manifest{}, err
}
y.AssertTrue(netCreations == 0)
mf := &manifestFile{
fp: fp,
directory: dir,
manifest: m.clone(),
deletionsRewriteThreshold: deletionsThreshold,
}
return mf, m, nil
}
manifest, truncOffset, err := ReplayManifestFile(fp)
if err != nil {
_ = fp.Close()
return nil, Manifest{}, err
}
if !readOnly {
// Truncate file so we don't have a half-written entry at the end.
if err := fp.Truncate(truncOffset); err != nil {
_ = fp.Close()
return nil, Manifest{}, err
}
}
if _, err = fp.Seek(0, io.SeekEnd); err != nil {
_ = fp.Close()
return nil, Manifest{}, err
}
mf := &manifestFile{
fp: fp,
directory: dir,
manifest: manifest.clone(),
deletionsRewriteThreshold: deletionsThreshold,
}
return mf, manifest, nil
}
func (mf *manifestFile) close() error {
if mf.inMemory {
return nil
}
return mf.fp.Close()
}
// addChanges writes a batch of changes, atomically, to the file. By "atomically" that means when
// we replay the MANIFEST file, we'll either replay all the changes or none of them. (The truth of
// this depends on the filesystem -- some might append garbage data if a system crash happens at
// the wrong time.)
func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {
if mf.inMemory {
return nil
}
changes := pb.ManifestChangeSet{Changes: changesParam}
buf, err := proto.Marshal(&changes)
if err != nil {
return err
}
// Maybe we could use O_APPEND instead (on certain file systems)
mf.appendLock.Lock()
if err := applyChangeSet(&mf.manifest, &changes); err != nil {
mf.appendLock.Unlock()
return err
}
// Rewrite manifest if it'd shrink by 1/10 and it's big enough to care
if mf.manifest.Deletions > mf.deletionsRewriteThreshold &&
mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) {
if err := mf.rewrite(); err != nil {
mf.appendLock.Unlock()
return err
}
} else {
var lenCrcBuf [8]byte
binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(buf)))
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable))
buf = append(lenCrcBuf[:], buf...)
if _, err := mf.fp.Write(buf); err != nil {
mf.appendLock.Unlock()
return err
}
}
mf.appendLock.Unlock()
return y.FileSync(mf.fp)
}
// Has to be 4 bytes. The value can never change, ever, anyway.
var magicText = [4]byte{'B', 'd', 'g', 'r'}
// The magic version number.
const magicVersion = 7
func helpRewrite(dir string, m *Manifest) (*os.File, int, error) {
rewritePath := filepath.Join(dir, manifestRewriteFilename)
// We explicitly sync.
fp, err := y.OpenTruncFile(rewritePath, false)
if err != nil {
return nil, 0, err
}
buf := make([]byte, 8)
copy(buf[0:4], magicText[:])
binary.BigEndian.PutUint32(buf[4:8], magicVersion)
netCreations := len(m.Tables)
changes := m.asChanges()
set := pb.ManifestChangeSet{Changes: changes}
changeBuf, err := proto.Marshal(&set)
if err != nil {
fp.Close()
return nil, 0, err
}
var lenCrcBuf [8]byte
binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(changeBuf)))
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(changeBuf, y.CastagnoliCrcTable))
buf = append(buf, lenCrcBuf[:]...)
buf = append(buf, changeBuf...)
if _, err := fp.Write(buf); err != nil {
fp.Close()
return nil, 0, err
}
if err := y.FileSync(fp); err != nil {
fp.Close()
return nil, 0, err
}
// In Windows the files should be closed before doing a Rename.
if err = fp.Close(); err != nil {
return nil, 0, err
}
manifestPath := filepath.Join(dir, ManifestFilename)
if err := os.Rename(rewritePath, manifestPath); err != nil {
return nil, 0, err
}
fp, err = y.OpenExistingFile(manifestPath, 0)
if err != nil {
return nil, 0, err
}
if _, err := fp.Seek(0, io.SeekEnd); err != nil {
fp.Close()
return nil, 0, err
}
if err := syncDir(dir); err != nil {
fp.Close()
return nil, 0, err
}
return fp, netCreations, nil
}
// Must be called while appendLock is held.
func (mf *manifestFile) rewrite() error {
// In Windows the files should be closed before doing a Rename.
if err := mf.fp.Close(); err != nil {
return err
}
fp, netCreations, err := helpRewrite(mf.directory, &mf.manifest)
if err != nil {
return err
}
mf.fp = fp
mf.manifest.Creations = netCreations
mf.manifest.Deletions = 0
return nil
}
type countingReader struct {
wrapped *bufio.Reader
count int64
}
func (r *countingReader) Read(p []byte) (n int, err error) {
n, err = r.wrapped.Read(p)
r.count += int64(n)
return
}
func (r *countingReader) ReadByte() (b byte, err error) {
b, err = r.wrapped.ReadByte()
if err == nil {
r.count++
}
return
}
var (
errBadMagic = errors.New("manifest has bad magic")
errBadChecksum = errors.New("manifest has checksum mismatch")
)
// ReplayManifestFile reads the manifest file and constructs two manifest objects. (We need one
// immutable copy and one mutable copy of the manifest. Easiest way is to construct two of them.)
// Also, returns the last offset after a completely read manifest entry -- the file must be
// truncated at that point before further appends are made (if there is a partial entry after
// that). In normal conditions, truncOffset is the file size.
func ReplayManifestFile(fp *os.File) (Manifest, int64, error) {
r := countingReader{wrapped: bufio.NewReader(fp)}
var magicBuf [8]byte
if _, err := io.ReadFull(&r, magicBuf[:]); err != nil {
return Manifest{}, 0, errBadMagic
}
if !bytes.Equal(magicBuf[0:4], magicText[:]) {
return Manifest{}, 0, errBadMagic
}
version := y.BytesToU32(magicBuf[4:8])
if version != magicVersion {
return Manifest{}, 0,
//nolint:lll
fmt.Errorf("manifest has unsupported version: %d (we support %d).\n"+
"Please see https://github.com/dgraph-io/badger/blob/master/README.md#i-see-manifest-has-unsupported-version-x-we-support-y-error"+
" on how to fix this.",
version, magicVersion)
}
stat, err := fp.Stat()
if err != nil {
return Manifest{}, 0, err
}
build := createManifest()
var offset int64
for {
offset = r.count
var lenCrcBuf [8]byte
_, err := io.ReadFull(&r, lenCrcBuf[:])
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return Manifest{}, 0, err
}
length := y.BytesToU32(lenCrcBuf[0:4])
// Sanity check to ensure we don't over-allocate memory.
if length > uint32(stat.Size()) {
return Manifest{}, 0, errors.Errorf(
"Buffer length: %d greater than file size: %d. Manifest file might be corrupted",
length, stat.Size())
}
var buf = make([]byte, length)
if _, err := io.ReadFull(&r, buf); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return Manifest{}, 0, err
}
if crc32.Checksum(buf, y.CastagnoliCrcTable) != y.BytesToU32(lenCrcBuf[4:8]) {
return Manifest{}, 0, errBadChecksum
}
var changeSet pb.ManifestChangeSet
if err := proto.Unmarshal(buf, &changeSet); err != nil {
return Manifest{}, 0, err
}
if err := applyChangeSet(&build, &changeSet); err != nil {
return Manifest{}, 0, err
}
}
return build, offset, nil
}
func applyManifestChange(build *Manifest, tc *pb.ManifestChange) error {
switch tc.Op {
case pb.ManifestChange_CREATE:
if _, ok := build.Tables[tc.Id]; ok {
return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id)
}
build.Tables[tc.Id] = TableManifest{
Level: uint8(tc.Level),
KeyID: tc.KeyId,
Compression: options.CompressionType(tc.Compression),
}
for len(build.Levels) <= int(tc.Level) {
build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})})
}
build.Levels[tc.Level].Tables[tc.Id] = struct{}{}
build.Creations++
case pb.ManifestChange_DELETE:
tm, ok := build.Tables[tc.Id]
if !ok {
return fmt.Errorf("MANIFEST removes non-existing table %d", tc.Id)
}
delete(build.Levels[tm.Level].Tables, tc.Id)
delete(build.Tables, tc.Id)
build.Deletions++
default:
return fmt.Errorf("MANIFEST file has invalid manifestChange op")
}
return nil
}
// This is not a "recoverable" error -- opening the KV store fails because the MANIFEST file is
// just plain broken.
func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet) error {
for _, change := range changeSet.Changes {
if err := applyManifestChange(build, change); err != nil {
return err
}
}
return nil
}
func newCreateChange(
id uint64, level int, keyID uint64, c options.CompressionType) *pb.ManifestChange {
return &pb.ManifestChange{
Id: id,
Op: pb.ManifestChange_CREATE,
Level: uint32(level),
KeyId: keyID,
// Hard coding it, since we're supporting only AES for now.
EncryptionAlgo: pb.EncryptionAlgo_aes,
Compression: uint32(c),
}
}
func newDeleteChange(id uint64) *pb.ManifestChange {
return &pb.ManifestChange{
Id: id,
Op: pb.ManifestChange_DELETE,
}
}