Skip to content

Commit 83cf7bb

Browse files
authored
Merge pull request #19188 from fuweid/fix-19179
mvcc: restore tombstone index if it's first revision
2 parents bae527c + d8b4192 commit 83cf7bb

File tree

5 files changed

+148
-4
lines changed

5 files changed

+148
-4
lines changed

server/storage/mvcc/key_index.go

+9
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,15 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified Revision, ver int6
116116
keysGauge.Inc()
117117
}
118118

119+
// restoreTombstone is used to restore a tombstone revision, which is the only
120+
// revision so far for a key. We don't know the creating revision (i.e. already
121+
// compacted) of the key, so set it empty.
122+
func (ki *keyIndex) restoreTombstone(lg *zap.Logger, main, sub int64) {
123+
ki.restore(lg, Revision{}, Revision{main, sub}, 1)
124+
ki.generations = append(ki.generations, generation{})
125+
keysGauge.Dec()
126+
}
127+
119128
// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
120129
// It also creates a new empty generation in the keyIndex.
121130
// It returns ErrRevisionNotFound when tombstone on an empty generation.

server/storage/mvcc/key_index_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,51 @@ import (
2020
"testing"
2121

2222
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
2324
"go.uber.org/zap"
2425
"go.uber.org/zap/zaptest"
2526
)
2627

28+
func TestRestoreTombstone(t *testing.T) {
29+
lg := zaptest.NewLogger(t)
30+
31+
// restore from tombstone
32+
//
33+
// key: "foo"
34+
// modified: 16
35+
// "created": 16
36+
// generations:
37+
// {empty}
38+
// {{16, 0}(t)[0]}
39+
//
40+
ki := &keyIndex{key: []byte("foo")}
41+
ki.restoreTombstone(lg, 16, 0)
42+
43+
// get should return not found
44+
for retAt := 16; retAt <= 20; retAt++ {
45+
_, _, _, err := ki.get(lg, int64(retAt))
46+
require.ErrorIs(t, err, ErrRevisionNotFound)
47+
}
48+
49+
// doCompact should keep that tombstone
50+
availables := map[Revision]struct{}{}
51+
ki.doCompact(16, availables)
52+
require.Len(t, availables, 1)
53+
_, ok := availables[Revision{Main: 16}]
54+
require.True(t, ok)
55+
56+
// should be able to put new revisions
57+
ki.put(lg, 17, 0)
58+
ki.put(lg, 18, 0)
59+
revs := ki.since(lg, 16)
60+
require.Equal(t, []Revision{{16, 0}, {17, 0}, {18, 0}}, revs)
61+
62+
// compaction should remove restored tombstone
63+
ki.compact(lg, 17, map[Revision]struct{}{})
64+
require.Len(t, ki.generations, 1)
65+
require.Equal(t, []Revision{{17, 0}, {18, 0}}, ki.generations[0].revs)
66+
}
67+
2768
func TestKeyIndexGet(t *testing.T) {
2869
// key: "foo"
2970
// modified: 16

server/storage/mvcc/kv_test.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/prometheus/client_golang/prometheus"
2727
dto "github.com/prometheus/client_model/go"
28+
"github.com/stretchr/testify/assert"
2829
"go.uber.org/zap/zaptest"
2930

3031
"go.etcd.io/etcd/api/v3/mvccpb"
@@ -657,6 +658,8 @@ func TestKVHash(t *testing.T) {
657658
}
658659

659660
func TestKVRestore(t *testing.T) {
661+
compactBatchLimit := 5
662+
660663
tests := []func(kv KV){
661664
func(kv KV) {
662665
kv.Put([]byte("foo"), []byte("bar0"), 1)
@@ -674,10 +677,23 @@ func TestKVRestore(t *testing.T) {
674677
kv.Put([]byte("foo"), []byte("bar1"), 2)
675678
kv.Compact(traceutil.TODO(), 1)
676679
},
680+
func(kv KV) { // after restore, foo1 key only has tombstone revision
681+
kv.Put([]byte("foo1"), []byte("bar1"), 0)
682+
kv.Put([]byte("foo2"), []byte("bar2"), 0)
683+
kv.Put([]byte("foo3"), []byte("bar3"), 0)
684+
kv.Put([]byte("foo4"), []byte("bar4"), 0)
685+
kv.Put([]byte("foo5"), []byte("bar5"), 0)
686+
_, delAtRev := kv.DeleteRange([]byte("foo1"), nil)
687+
assert.Equal(t, int64(7), delAtRev)
688+
689+
// after compaction and restore, foo1 key only has tombstone revision
690+
ch, _ := kv.Compact(traceutil.TODO(), delAtRev)
691+
<-ch
692+
},
677693
}
678694
for i, tt := range tests {
679695
b, _ := betesting.NewDefaultTmpBackend(t)
680-
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
696+
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit})
681697
tt(s)
682698
var kvss [][]mvccpb.KeyValue
683699
for k := int64(0); k < 10; k++ {
@@ -689,7 +705,7 @@ func TestKVRestore(t *testing.T) {
689705
s.Close()
690706

691707
// ns should recover the previous state from backend.
692-
ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
708+
ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit})
693709

694710
if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
695711
t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)

server/storage/mvcc/kvstore.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -473,8 +473,12 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int
473473
continue
474474
}
475475
ki.put(lg, rev.Main, rev.Sub)
476-
} else if !isTombstone(rkv.key) {
477-
ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version)
476+
} else {
477+
if isTombstone(rkv.key) {
478+
ki.restoreTombstone(lg, rev.Main, rev.Sub)
479+
} else {
480+
ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version)
481+
}
478482
idx.Insert(ki)
479483
kiCache[rkv.kstr] = ki
480484
}

tests/e2e/watch_test.go

+74
Original file line numberDiff line numberDiff line change
@@ -488,3 +488,77 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto
488488
}
489489
}
490490
}
491+
492+
// TestResumeCompactionOnTombstone verifies whether a deletion event is preserved
493+
// when etcd restarts and resumes compaction on a key that only has a tombstone revision.
494+
func TestResumeCompactionOnTombstone(t *testing.T) {
495+
e2e.BeforeTest(t)
496+
497+
ctx := context.Background()
498+
compactBatchLimit := 5
499+
500+
cfg := e2e.DefaultConfig()
501+
clus, err := e2e.NewEtcdProcessCluster(context.Background(),
502+
t,
503+
e2e.WithConfig(cfg),
504+
e2e.WithClusterSize(1),
505+
e2e.WithCompactionBatchLimit(compactBatchLimit),
506+
e2e.WithGoFailEnabled(true),
507+
e2e.WithWatchProcessNotifyInterval(100*time.Millisecond),
508+
)
509+
require.NoError(t, err)
510+
defer clus.Close()
511+
512+
c1 := newClient(t, clus.EndpointsGRPC(), cfg.Client)
513+
defer c1.Close()
514+
515+
keyPrefix := "/key-"
516+
for i := 0; i < compactBatchLimit; i++ {
517+
key := fmt.Sprintf("%s%d", keyPrefix, i)
518+
value := fmt.Sprintf("%d", i)
519+
520+
t.Logf("PUT key=%s, val=%s", key, value)
521+
_, err = c1.KV.Put(ctx, key, value)
522+
require.NoError(t, err)
523+
}
524+
525+
firstKey := keyPrefix + "0"
526+
t.Logf("DELETE key=%s", firstKey)
527+
deleteResp, err := c1.KV.Delete(ctx, firstKey)
528+
require.NoError(t, err)
529+
530+
var deleteEvent *clientv3.Event
531+
select {
532+
case watchResp := <-c1.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision)):
533+
require.Len(t, watchResp.Events, 1)
534+
535+
require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type)
536+
deletedKey := string(watchResp.Events[0].Kv.Key)
537+
require.Equal(t, firstKey, deletedKey)
538+
539+
deleteEvent = watchResp.Events[0]
540+
case <-time.After(100 * time.Millisecond):
541+
t.Fatal("timed out getting watch response")
542+
}
543+
544+
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactBeforeSetFinishedCompact", `panic`))
545+
546+
t.Logf("COMPACT rev=%d", deleteResp.Header.Revision)
547+
_, err = c1.KV.Compact(ctx, deleteResp.Header.Revision, clientv3.WithCompactPhysical())
548+
require.Error(t, err)
549+
550+
require.NoError(t, clus.Restart(ctx))
551+
552+
c2 := newClient(t, clus.EndpointsGRPC(), cfg.Client)
553+
defer c2.Close()
554+
555+
watchChan := c2.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision))
556+
select {
557+
case watchResp := <-watchChan:
558+
require.Equal(t, []*clientv3.Event{deleteEvent}, watchResp.Events)
559+
case <-time.After(100 * time.Millisecond):
560+
// we care only about the first response, but have an
561+
// escape hatch in case the watch response is delayed.
562+
t.Fatal("timed out getting watch response")
563+
}
564+
}

0 commit comments

Comments
 (0)