forked from NethermindEth/juno
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbucket_migrator.go
143 lines (120 loc) · 3.87 KB
/
bucket_migrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package migration
import (
"bytes"
"context"
"errors"
"os"
"os/signal"
"syscall"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils"
)
var _ Migration = (*BucketMigrator)(nil)
type (
BucketMigratorDoFunc func(t db.Transaction, b1, b2 []byte, n *utils.Network) error
BucketMigratorKeyFilter func([]byte) (bool, error)
)
type BucketMigrator struct {
target db.Bucket
// number of entries to update before returning migration.ErrCallWithNewTransaction
batchSize uint
before func()
keyFilter BucketMigratorKeyFilter
do BucketMigratorDoFunc
// key to seek to when starting the migration
startFrom []byte
}
func NewBucketMigrator(target db.Bucket, do BucketMigratorDoFunc) *BucketMigrator {
return &BucketMigrator{
target: target,
startFrom: target.Key(),
batchSize: 1_000_000,
before: func() {},
keyFilter: func(b []byte) (bool, error) { return true, nil },
do: do,
}
}
func NewBucketMover(source, destination db.Bucket) *BucketMigrator {
return NewBucketMigrator(source, func(txn db.Transaction, key, value []byte, n *utils.Network) error {
err := txn.Delete(key)
if err != nil {
return err
}
key[0] = byte(destination)
return txn.Set(key, value)
})
}
func (m *BucketMigrator) WithBatchSize(batchSize uint) *BucketMigrator {
m.batchSize = batchSize
return m
}
func (m *BucketMigrator) WithBefore(before func()) *BucketMigrator {
m.before = before
return m
}
func (m *BucketMigrator) WithKeyFilter(keyFilter BucketMigratorKeyFilter) *BucketMigrator {
m.keyFilter = keyFilter
return m
}
func (m *BucketMigrator) Before(_ []byte) error {
m.before()
return nil
}
func (m *BucketMigrator) Migrate(ctx context.Context, txn db.Transaction, network *utils.Network, log utils.SimpleLogger) ([]byte, error) {
remainingInBatch := m.batchSize
iterator, err := txn.NewIterator(nil, false)
if err != nil {
return nil, err
}
var (
firstInterrupt = ctx.Done()
secondInterrupt chan os.Signal // initially nil
)
for iterator.Seek(m.startFrom); iterator.Valid(); iterator.Next() {
select {
case <-firstInterrupt:
if errors.Is(ctx.Err(), context.Canceled) {
msg := "WARNING: Migration is in progress, but you tried to interrupt it.\n" +
"Database may be in an inconsistent state.\n" +
"To force cancellation and potentially corrupt data, send interrupt signal again.\n" +
"Otherwise, please allow the migration to complete."
log.Warnw(msg)
// after context canceled on upper level there is no way to check how many interrupts were made from ctx.Done()
// but we can Initialise additional channel to receive the signals, they will be copied by runtime and provided
// to all callers (i.e. here and on upper level)
secondInterrupt = make(chan os.Signal, 1)
signal.Notify(secondInterrupt, os.Interrupt, syscall.SIGTERM)
// if we don't set firstInterrupt to nil this case may be fired all the time because
// execution order of cases in select is not guaranteed and selecting from nil channel is blocked operation
firstInterrupt = nil
}
case <-secondInterrupt:
err := errors.New("migration interrupt")
return nil, utils.RunAndWrapOnError(iterator.Close, err)
default:
// keep going
}
key := iterator.Key()
if !bytes.HasPrefix(key, m.target.Key()) {
break
}
if pass, err := m.keyFilter(key); err != nil {
return nil, utils.RunAndWrapOnError(iterator.Close, err)
} else if pass {
if remainingInBatch == 0 {
m.startFrom = key
return nil, utils.RunAndWrapOnError(iterator.Close, ErrCallWithNewTransaction)
}
remainingInBatch--
value, err := iterator.Value()
if err != nil {
return nil, utils.RunAndWrapOnError(iterator.Close, err)
}
if err = m.do(txn, key, value, network); err != nil {
return nil, utils.RunAndWrapOnError(iterator.Close, err)
}
}
}
signal.Stop(secondInterrupt)
return nil, iterator.Close()
}