Skip to content

Commit 8c2a6e7

Browse files
committed
mvcc: restore tombstone index if it's first revision
The tombstone could be the only one available revision in database. It happens when all historical revisions have been deleted in previous compactions. Since tombstone revision is still in database, we should restore it as valid key index. Otherwise, we lost that event. Signed-off-by: Wei Fu <[email protected]>
1 parent ebb2b06 commit 8c2a6e7

File tree

5 files changed

+148
-2
lines changed

5 files changed

+148
-2
lines changed

server/storage/mvcc/key_index.go

+23
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,29 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified Revision, ver int6
116116
keysGauge.Inc()
117117
}
118118

119+
// restoreTombstone is only used to build index when there is no previous revision.
120+
// Since there is no historical information, CreateRevision always is empty.
121+
func (ki *keyIndex) restoreTombstone(lg *zap.Logger, main, sub int64) {
122+
if len(ki.generations) != 0 {
123+
lg.Panic(
124+
"'restore' got an unexpected non-empty generations",
125+
zap.Int("generations-size", len(ki.generations)),
126+
)
127+
}
128+
129+
rev := Revision{Main: main, Sub: sub}
130+
131+
ki.modified = rev
132+
ki.generations = append(ki.generations,
133+
generation{
134+
ver: 1,
135+
created: Revision{}, // unknown for first revision as tombstone
136+
revs: []Revision{rev},
137+
},
138+
generation{},
139+
)
140+
}
141+
119142
// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
120143
// It also creates a new empty generation in the keyIndex.
121144
// It returns ErrRevisionNotFound when tombstone on an empty generation.

server/storage/mvcc/key_index_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,51 @@ import (
2020
"testing"
2121

2222
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
2324
"go.uber.org/zap"
2425
"go.uber.org/zap/zaptest"
2526
)
2627

28+
func TestRestoreTombstone(t *testing.T) {
29+
lg := zaptest.NewLogger(t)
30+
31+
// restore from tombstone
32+
//
33+
// key: "foo"
34+
// modified: 16
35+
// "created": 16
36+
// generations:
37+
// {empty}
38+
// {{16, 0}(t)[0]}
39+
//
40+
ki := &keyIndex{key: []byte("foo")}
41+
ki.restoreTombstone(lg, 16, 0)
42+
43+
// get should return not found
44+
for retAt := 16; retAt <= 20; retAt++ {
45+
_, _, _, err := ki.get(lg, int64(retAt))
46+
require.ErrorIs(t, err, ErrRevisionNotFound)
47+
}
48+
49+
// doCompact should keep that tombstone
50+
availables := map[Revision]struct{}{}
51+
ki.doCompact(16, availables)
52+
require.Len(t, availables, 1)
53+
_, ok := availables[Revision{Main: 16}]
54+
require.True(t, ok)
55+
56+
// should be able to put new revisions
57+
ki.put(lg, 17, 0)
58+
ki.put(lg, 18, 0)
59+
revs := ki.since(lg, 16)
60+
require.Equal(t, []Revision{{16, 0}, {17, 0}, {18, 0}}, revs)
61+
62+
// compaction should remove restored tombstone
63+
ki.compact(lg, 17, map[Revision]struct{}{})
64+
require.Len(t, ki.generations, 1)
65+
require.Equal(t, []Revision{{17, 0}, {18, 0}}, ki.generations[0].revs)
66+
}
67+
2768
func TestKeyIndexGet(t *testing.T) {
2869
// key: "foo"
2970
// modified: 16

server/storage/mvcc/kv_test.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/prometheus/client_golang/prometheus"
2727
dto "github.com/prometheus/client_model/go"
28+
"github.com/stretchr/testify/assert"
2829
"go.uber.org/zap/zaptest"
2930

3031
"go.etcd.io/etcd/api/v3/mvccpb"
@@ -657,6 +658,8 @@ func TestKVHash(t *testing.T) {
657658
}
658659

659660
func TestKVRestore(t *testing.T) {
661+
compactBatchLimit := 5
662+
660663
tests := []func(kv KV){
661664
func(kv KV) {
662665
kv.Put([]byte("foo"), []byte("bar0"), 1)
@@ -674,10 +677,23 @@ func TestKVRestore(t *testing.T) {
674677
kv.Put([]byte("foo"), []byte("bar1"), 2)
675678
kv.Compact(traceutil.TODO(), 1)
676679
},
680+
func(kv KV) { // after restore, foo1 key only has tombstone revision
681+
kv.Put([]byte("foo1"), []byte("bar1"), 0)
682+
kv.Put([]byte("foo2"), []byte("bar2"), 0)
683+
kv.Put([]byte("foo3"), []byte("bar3"), 0)
684+
kv.Put([]byte("foo4"), []byte("bar4"), 0)
685+
kv.Put([]byte("foo5"), []byte("bar5"), 0)
686+
_, delAtRev := kv.DeleteRange([]byte("foo1"), nil)
687+
assert.Equal(t, int64(7), delAtRev)
688+
689+
// after compaction and restore, foo1 key only has tombstone revision
690+
ch, _ := kv.Compact(traceutil.TODO(), delAtRev)
691+
<-ch
692+
},
677693
}
678694
for i, tt := range tests {
679695
b, _ := betesting.NewDefaultTmpBackend(t)
680-
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
696+
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit})
681697
tt(s)
682698
var kvss [][]mvccpb.KeyValue
683699
for k := int64(0); k < 10; k++ {
@@ -689,7 +705,7 @@ func TestKVRestore(t *testing.T) {
689705
s.Close()
690706

691707
// ns should recover the previous state from backend.
692-
ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
708+
ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit})
693709

694710
if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
695711
t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)

server/storage/mvcc/kvstore.go

+4
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,10 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int
477477
ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version)
478478
idx.Insert(ki)
479479
kiCache[rkv.kstr] = ki
480+
} else {
481+
ki.restoreTombstone(lg, rev.Main, rev.Sub)
482+
idx.Insert(ki)
483+
kiCache[rkv.kstr] = ki
480484
}
481485
}
482486
}()

tests/e2e/watch_test.go

+62
Original file line numberDiff line numberDiff line change
@@ -488,3 +488,65 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto
488488
}
489489
}
490490
}
491+
492+
func TestRestoreTombstone(t *testing.T) {
493+
e2e.BeforeTest(t)
494+
495+
ctx := context.Background()
496+
compactBatchLimit := 5
497+
498+
cfg := e2e.DefaultConfig()
499+
clus, err := e2e.NewEtcdProcessCluster(context.Background(),
500+
t,
501+
e2e.WithConfig(cfg),
502+
e2e.WithClusterSize(1),
503+
e2e.WithCompactionBatchLimit(compactBatchLimit),
504+
e2e.WithGoFailEnabled(true),
505+
e2e.WithWatchProcessNotifyInterval(100*time.Millisecond),
506+
)
507+
require.NoError(t, err)
508+
defer clus.Close()
509+
510+
c1 := newClient(t, clus.EndpointsGRPC(), cfg.Client)
511+
defer c1.Close()
512+
513+
keyPrefix := "/key-"
514+
for i := 0; i < compactBatchLimit; i++ {
515+
key := fmt.Sprintf("%s%d", keyPrefix, i)
516+
value := fmt.Sprintf("%d", i)
517+
518+
t.Logf("PUT key=%s, val=%s", key, value)
519+
_, err = c1.KV.Put(ctx, key, value)
520+
require.NoError(t, err)
521+
}
522+
523+
firstKey := keyPrefix + "0"
524+
t.Logf("DELETE key=%s", firstKey)
525+
deleteResp, err := c1.KV.Delete(ctx, firstKey)
526+
require.NoError(t, err)
527+
528+
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactBeforeSetFinishedCompact", `panic`))
529+
530+
t.Logf("COMPACT rev=%d", deleteResp.Header.Revision)
531+
_, err = c1.KV.Compact(ctx, deleteResp.Header.Revision, clientv3.WithCompactPhysical())
532+
require.Error(t, err)
533+
534+
require.NoError(t, clus.Restart(ctx))
535+
536+
c2 := newClient(t, clus.EndpointsGRPC(), cfg.Client)
537+
defer c2.Close()
538+
539+
watchChan := c2.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision))
540+
select {
541+
case watchResp := <-watchChan:
542+
require.Len(t, watchResp.Events, 1)
543+
544+
require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type)
545+
deletedKey := string(watchResp.Events[0].Kv.Key)
546+
require.Equal(t, firstKey, deletedKey)
547+
case <-time.After(100 * time.Millisecond):
548+
// we care only about the first response, but have an
549+
// escape hatch in case the watch response is delayed.
550+
t.Fatal("timed out getting watch response")
551+
}
552+
}

0 commit comments

Comments
 (0)