diff --git a/server/mvcc/key_index.go b/server/mvcc/key_index.go index d38b0933d92..dfb5dcbee26 100644 --- a/server/mvcc/key_index.go +++ b/server/mvcc/key_index.go @@ -119,6 +119,15 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int6 keysGauge.Inc() } +// restoreTombstone is used to restore a tombstone revision, which is the only +// revision so far for a key. We don't know the creating revision (i.e. already +// compacted) of the key, so set it empty. +func (ki *keyIndex) restoreTombstone(lg *zap.Logger, main, sub int64) { + ki.restore(lg, revision{}, revision{main, sub}, 1) + ki.generations = append(ki.generations, generation{}) + keysGauge.Dec() +} + // tombstone puts a revision, pointing to a tombstone, to the keyIndex. // It also creates a new empty generation in the keyIndex. // It returns ErrRevisionNotFound when tombstone on an empty generation. diff --git a/server/mvcc/key_index_test.go b/server/mvcc/key_index_test.go index 814e252fd4a..693514266f9 100644 --- a/server/mvcc/key_index_test.go +++ b/server/mvcc/key_index_test.go @@ -19,9 +19,51 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) +func TestRestoreTombstone(t *testing.T) { + lg := zaptest.NewLogger(t) + + // restore from tombstone + // + // key: "foo" + // modified: 16 + // "created": 16 + // generations: + // {empty} + // {{16, 0}(t)[0]} + // + ki := &keyIndex{key: []byte("foo")} + ki.restoreTombstone(lg, 16, 0) + + // get should return not found + for retAt := 16; retAt <= 20; retAt++ { + _, _, _, err := ki.get(lg, int64(retAt)) + require.ErrorIs(t, err, ErrRevisionNotFound) + } + + // doCompact should keep that tombstone + availables := map[revision]struct{}{} + ki.doCompact(16, availables) + require.Len(t, availables, 1) + _, ok := availables[revision{main: 16}] + require.True(t, ok) + + // should be able to put new revisions + ki.put(lg, 17, 0) + ki.put(lg, 18, 0) + revs := ki.since(lg, 16) + require.Equal(t, []revision{{16, 0}, {17, 0}, {18, 0}}, revs) + + // compaction should remove restored tombstone + ki.compact(lg, 17, map[revision]struct{}{}) + require.Len(t, ki.generations, 1) + require.Equal(t, []revision{{17, 0}, {18, 0}}, ki.generations[0].revs) +} + func TestKeyIndexGet(t *testing.T) { // key: "foo" // rev: 16 diff --git a/server/mvcc/kv_test.go b/server/mvcc/kv_test.go index 524a1346ebd..7ccd719fb07 100644 --- a/server/mvcc/kv_test.go +++ b/server/mvcc/kv_test.go @@ -31,7 +31,9 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) // Functional tests for features implemented in v3 store. It treats v3 store @@ -620,6 +622,8 @@ func TestKVHash(t *testing.T) { } func TestKVRestore(t *testing.T) { + compactBatchLimit := 5 + tests := []func(kv KV){ func(kv KV) { kv.Put([]byte("foo"), []byte("bar0"), 1) @@ -637,10 +641,23 @@ func TestKVRestore(t *testing.T) { kv.Put([]byte("foo"), []byte("bar1"), 2) kv.Compact(traceutil.TODO(), 1) }, + func(kv KV) { // after restore, foo1 key only has tombstone revision + kv.Put([]byte("foo1"), []byte("bar1"), 0) + kv.Put([]byte("foo2"), []byte("bar2"), 0) + kv.Put([]byte("foo3"), []byte("bar3"), 0) + kv.Put([]byte("foo4"), []byte("bar4"), 0) + kv.Put([]byte("foo5"), []byte("bar5"), 0) + _, delAtRev := kv.DeleteRange([]byte("foo1"), nil) + assert.Equal(t, int64(7), delAtRev) + + // after compaction and restore, foo1 key only has tombstone revision + ch, _ := kv.Compact(traceutil.TODO(), delAtRev) + <-ch + }, } for i, tt := range tests { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) tt(s) var kvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { @@ -651,8 +668,8 @@ func TestKVRestore(t *testing.T) { keysBefore := readGaugeInt(keysGauge) s.Close() - // ns should recover the the previous state from backend. - ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}) + // ns should recover the previous state from backend. + ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore { t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore) diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index fb81af6ec74..f0450537db3 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -484,8 +484,12 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int continue } ki.put(lg, rev.main, rev.sub) - } else if !isTombstone(rkv.key) { - ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) + } else { + if isTombstone(rkv.key) { + ki.restoreTombstone(lg, rev.main, rev.sub) + } else { + ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) + } idx.Insert(ki) kiCache[rkv.kstr] = ki } diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index d09347f5bdd..3fcb5bf81b4 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -482,3 +482,82 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto } } } + +// TestResumeCompactionOnTombstone verifies whether a deletion event is preserved +// when etcd restarts and resumes compaction on a key that only has a tombstone revision. +func TestResumeCompactionOnTombstone(t *testing.T) { + e2e.BeforeTest(t) + + ctx := context.Background() + compactBatchLimit := 5 + + cfg := e2e.EtcdProcessClusterConfig{ + GoFailEnabled: true, + ClusterSize: 1, + IsClientAutoTLS: true, + ClientTLS: e2e.ClientTLS, + CompactionBatchLimit: compactBatchLimit, + WatchProcessNotifyInterval: 100 * time.Millisecond, + } + clus, err := e2e.NewEtcdProcessCluster(t, &cfg) + require.NoError(t, err) + defer clus.Close() + + c1 := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS) + defer c1.Close() + + keyPrefix := "/key-" + for i := 0; i < compactBatchLimit; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + value := fmt.Sprintf("%d", i) + + t.Logf("PUT key=%s, val=%s", key, value) + _, err = c1.KV.Put(ctx, key, value) + require.NoError(t, err) + } + + firstKey := keyPrefix + "0" + t.Logf("DELETE key=%s", firstKey) + deleteResp, err := c1.KV.Delete(ctx, firstKey) + require.NoError(t, err) + + var deleteEvent *clientv3.Event + select { + case watchResp := <-c1.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision)): + require.Len(t, watchResp.Events, 1) + + require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type) + deletedKey := string(watchResp.Events[0].Kv.Key) + require.Equal(t, firstKey, deletedKey) + + deleteEvent = watchResp.Events[0] + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out getting watch response") + } + + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactBeforeSetFinishedCompact", `panic`)) + + t.Logf("COMPACT rev=%d", deleteResp.Header.Revision) + _, err = c1.KV.Compact(ctx, deleteResp.Header.Revision, clientv3.WithCompactPhysical()) + require.Error(t, err) + + require.Error(t, clus.Procs[0].Stop()) + // NOTE: The proc panics and exit code is 2. It's impossible to restart + // that etcd proc because last exit code is 2 and Restart() refuses to + // start new one. Using IsRunning() function is to cleanup status. + require.False(t, clus.Procs[0].IsRunning()) + require.NoError(t, clus.Restart()) + + c2 := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS) + defer c2.Close() + + watchChan := c2.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision)) + select { + case watchResp := <-watchChan: + require.Equal(t, []*clientv3.Event{deleteEvent}, watchResp.Events) + case <-time.After(100 * time.Millisecond): + // we care only about the first response, but have an + // escape hatch in case the watch response is delayed. + t.Fatal("timed out getting watch response") + } +}