Skip to content

Commit 625b706

Browse files
committed
Use immutable binary tree for index
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent e26043f commit 625b706

File tree

4 files changed

+573
-643
lines changed

4 files changed

+573
-643
lines changed

server/storage/mvcc/index.go

+103-138
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
package mvcc
1616

1717
import (
18-
"sync"
18+
"bytes"
19+
"fmt"
1920

2021
"github.com/google/btree"
2122
"go.uber.org/zap"
@@ -30,103 +31,103 @@ type index interface {
3031
Tombstone(key []byte, rev Revision) error
3132
Compact(rev int64) map[Revision]struct{}
3233
Keep(rev int64) map[Revision]struct{}
33-
Equal(b index) bool
34-
35-
Insert(ki *keyIndex)
36-
KeyIndex(ki *keyIndex) *keyIndex
3734
}
3835

3936
type treeIndex struct {
40-
sync.RWMutex
41-
tree *btree.BTreeG[*keyIndex]
42-
lg *zap.Logger
37+
baseRev int64
38+
revisionTree []*btree.BTreeG[keyRev]
39+
lg *zap.Logger
4340
}
4441

45-
func newTreeIndex(lg *zap.Logger) index {
46-
return &treeIndex{
47-
tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
48-
return aki.Less(bki)
49-
}),
50-
lg: lg,
51-
}
42+
type keyRev struct {
43+
key []byte
44+
mod, created Revision
45+
version int64
5246
}
5347

54-
func (ti *treeIndex) Put(key []byte, rev Revision) {
55-
keyi := &keyIndex{key: key}
56-
57-
ti.Lock()
58-
defer ti.Unlock()
59-
okeyi, ok := ti.tree.Get(keyi)
60-
if !ok {
61-
keyi.put(ti.lg, rev.Main, rev.Sub)
62-
ti.tree.ReplaceOrInsert(keyi)
63-
return
64-
}
65-
okeyi.put(ti.lg, rev.Main, rev.Sub)
48+
var lessThen btree.LessFunc[keyRev] = func(k keyRev, k2 keyRev) bool {
49+
return compare(k, k2) == -1
6650
}
6751

68-
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
69-
ti.RLock()
70-
defer ti.RUnlock()
71-
return ti.unsafeGet(key, atRev)
52+
func compare(k keyRev, k2 keyRev) int {
53+
return bytes.Compare(k.key, k2.key)
7254
}
7355

74-
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
75-
keyi := &keyIndex{key: key}
76-
if keyi = ti.keyIndex(keyi); keyi == nil {
77-
return Revision{}, Revision{}, 0, ErrRevisionNotFound
56+
func newTreeIndex(lg *zap.Logger) index {
57+
return &treeIndex{
58+
baseRev: -1,
59+
lg: lg,
7860
}
79-
return keyi.get(ti.lg, atRev)
80-
}
81-
82-
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
83-
ti.RLock()
84-
defer ti.RUnlock()
85-
return ti.keyIndex(keyi)
8661
}
8762

88-
func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
89-
if ki, ok := ti.tree.Get(keyi); ok {
90-
return ki
63+
func (ti *treeIndex) Put(key []byte, rev Revision) {
64+
if ti.baseRev == -1 {
65+
ti.baseRev = rev.Main - 1
66+
ti.revisionTree = []*btree.BTreeG[keyRev]{
67+
btree.NewG[keyRev](32, lessThen),
68+
}
69+
}
70+
if rev.Main != ti.rev()+1 {
71+
panic(fmt.Sprintf("append only, lastRev: %d, putRev: %d", ti.rev(), rev.Main))
72+
}
73+
prevTree := ti.revisionTree[len(ti.revisionTree)-1]
74+
item, found := prevTree.Get(keyRev{key: key})
75+
created := rev
76+
var version int64 = 1
77+
if found {
78+
created = item.created
79+
version = item.version + 1
80+
}
81+
tirev := ti.rev()
82+
if rev.Main == tirev {
83+
prevTree.ReplaceOrInsert(keyRev{
84+
key: key,
85+
mod: rev,
86+
created: created,
87+
version: version,
88+
})
89+
} else if rev.Main == tirev+1 {
90+
newTree := prevTree.Clone()
91+
newTree.ReplaceOrInsert(keyRev{
92+
key: key,
93+
mod: rev,
94+
created: created,
95+
version: version,
96+
})
97+
ti.revisionTree = append(ti.revisionTree, newTree)
98+
} else {
99+
panic(fmt.Sprintf("append only, lastRev: %d, putRev: %d", ti.rev(), rev.Main))
91100
}
92-
return nil
93101
}
94102

95-
func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) {
96-
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
103+
func (ti *treeIndex) rev() int64 {
104+
return ti.baseRev + int64(len(ti.revisionTree)) - 1
105+
}
97106

98-
ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
99-
if len(endi.key) > 0 && !item.Less(endi) {
100-
return false
101-
}
102-
if !f(item) {
103-
return false
104-
}
105-
return true
106-
})
107+
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
108+
keyRev, found := ti.forRev(atRev).Get(keyRev{key: key})
109+
if !found {
110+
return Revision{}, Revision{}, 0, ErrRevisionNotFound
111+
}
112+
return keyRev.mod, keyRev.created, keyRev.version, nil
107113
}
108114

109115
// Revisions returns limited number of revisions from key(included) to end(excluded)
110116
// at the given rev. The returned slice is sorted in the order of key. There is no limit if limit <= 0.
111117
// The second return parameter isn't capped by the limit and reflects the total number of revisions.
112118
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []Revision, total int) {
113-
ti.RLock()
114-
defer ti.RUnlock()
115-
116119
if end == nil {
117-
rev, _, _, err := ti.unsafeGet(key, atRev)
120+
rev, _, _, err := ti.Get(key, atRev)
118121
if err != nil {
119122
return nil, 0
120123
}
121124
return []Revision{rev}, 1
122125
}
123-
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
124-
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
125-
if limit <= 0 || len(revs) < limit {
126-
revs = append(revs, rev)
127-
}
128-
total++
126+
ti.forRev(atRev).AscendRange(keyRev{key: key}, keyRev{key: end}, func(kr keyRev) bool {
127+
if limit <= 0 || len(revs) < limit {
128+
revs = append(revs, kr.mod)
129129
}
130+
total++
130131
return true
131132
})
132133
return revs, total
@@ -135,119 +136,83 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []
135136
// CountRevisions returns the number of revisions
136137
// from key(included) to end(excluded) at the given rev.
137138
func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
138-
ti.RLock()
139-
defer ti.RUnlock()
140-
141139
if end == nil {
142-
_, _, _, err := ti.unsafeGet(key, atRev)
140+
_, _, _, err := ti.Get(key, atRev)
143141
if err != nil {
144142
return 0
145143
}
146144
return 1
147145
}
148146
total := 0
149-
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
150-
if _, _, _, err := ki.get(ti.lg, atRev); err == nil {
151-
total++
152-
}
147+
ti.forRev(atRev).AscendRange(keyRev{key: key}, keyRev{key: end}, func(kr keyRev) bool {
148+
total++
153149
return true
154150
})
155151
return total
156152
}
157153

158154
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []Revision) {
159-
ti.RLock()
160-
defer ti.RUnlock()
161-
162155
if end == nil {
163-
rev, _, _, err := ti.unsafeGet(key, atRev)
156+
rev, _, _, err := ti.Get(key, atRev)
164157
if err != nil {
165158
return nil, nil
166159
}
167160
return [][]byte{key}, []Revision{rev}
168161
}
169-
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
170-
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
171-
revs = append(revs, rev)
172-
keys = append(keys, ki.key)
173-
}
162+
ti.forRev(atRev).AscendRange(keyRev{key: key}, keyRev{key: end}, func(kr keyRev) bool {
163+
revs = append(revs, kr.mod)
164+
keys = append(keys, kr.key)
174165
return true
175166
})
176167
return keys, revs
177168
}
178169

179170
func (ti *treeIndex) Tombstone(key []byte, rev Revision) error {
180-
keyi := &keyIndex{key: key}
181-
182-
ti.Lock()
183-
defer ti.Unlock()
184-
ki, ok := ti.tree.Get(keyi)
185-
if !ok {
171+
prevTree := ti.revisionTree[len(ti.revisionTree)-1]
172+
tirev := ti.rev()
173+
var found bool
174+
if rev.Main == tirev {
175+
_, found = prevTree.Delete(keyRev{
176+
key: key,
177+
})
178+
} else if rev.Main == tirev+1 {
179+
newTree := prevTree.Clone()
180+
_, found = newTree.Delete(keyRev{
181+
key: key,
182+
})
183+
ti.revisionTree = append(ti.revisionTree, newTree)
184+
} else {
185+
panic(fmt.Sprintf("append only, lastRev: %d, putRev: %d", ti.rev(), rev.Main))
186+
}
187+
if !found {
186188
return ErrRevisionNotFound
187189
}
188-
189-
return ki.tombstone(ti.lg, rev.Main, rev.Sub)
190+
return nil
190191
}
191192

192193
func (ti *treeIndex) Compact(rev int64) map[Revision]struct{} {
193194
available := make(map[Revision]struct{})
194195
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
195-
ti.Lock()
196-
clone := ti.tree.Clone()
197-
ti.Unlock()
198-
199-
clone.Ascend(func(keyi *keyIndex) bool {
200-
// Lock is needed here to prevent modification to the keyIndex while
201-
// compaction is going on or revision added to empty before deletion
202-
ti.Lock()
203-
keyi.compact(ti.lg, rev, available)
204-
if keyi.isEmpty() {
205-
_, ok := ti.tree.Delete(keyi)
206-
if !ok {
207-
ti.lg.Panic("failed to delete during compaction")
208-
}
209-
}
210-
ti.Unlock()
211-
return true
212-
})
196+
idx := rev - ti.baseRev
197+
ti.revisionTree = ti.revisionTree[idx:]
198+
ti.baseRev = rev
213199
return available
214200
}
215201

216202
// Keep finds all revisions to be kept for a Compaction at the given rev.
217203
func (ti *treeIndex) Keep(rev int64) map[Revision]struct{} {
218204
available := make(map[Revision]struct{})
219-
ti.RLock()
220-
defer ti.RUnlock()
221-
ti.tree.Ascend(func(keyi *keyIndex) bool {
222-
keyi.keep(rev, available)
205+
ti.forRev(rev).Ascend(func(item keyRev) bool {
206+
available[item.mod] = struct{}{}
223207
return true
224208
})
225209
return available
226210
}
227211

228-
func (ti *treeIndex) Equal(bi index) bool {
229-
b := bi.(*treeIndex)
230-
231-
if ti.tree.Len() != b.tree.Len() {
232-
return false
212+
func (ti *treeIndex) forRev(rev int64) *btree.BTreeG[keyRev] {
213+
idx := rev - ti.baseRev
214+
if idx < 0 || idx >= int64(len(ti.revisionTree)) {
215+
return nil
233216
}
234-
235-
equal := true
236-
237-
ti.tree.Ascend(func(aki *keyIndex) bool {
238-
bki, _ := b.tree.Get(aki)
239-
if !aki.equal(bki) {
240-
equal = false
241-
return false
242-
}
243-
return true
244-
})
245-
246-
return equal
247-
}
248-
249-
func (ti *treeIndex) Insert(ki *keyIndex) {
250-
ti.Lock()
251-
defer ti.Unlock()
252-
ti.tree.ReplaceOrInsert(ki)
217+
return ti.revisionTree[idx]
253218
}

server/storage/mvcc/index_bench_test.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,21 @@ import (
2424
"go.uber.org/zap"
2525
)
2626

27-
func BenchmarkIndexCompactBase(b *testing.B) { benchmarkIndexCompact(b, 3, 100) }
28-
func BenchmarkIndexCompactLongKey(b *testing.B) { benchmarkIndexCompact(b, 512, 100) }
29-
func BenchmarkIndexCompactLargeKeySpace(b *testing.B) { benchmarkIndexCompact(b, 3, 100000) }
27+
//func BenchmarkIndexCompactBase(b *testing.B) { benchmarkIndexCompact(b, 3, 100) }
28+
//func BenchmarkIndexCompactLongKey(b *testing.B) { benchmarkIndexCompact(b, 512, 100) }
29+
//func BenchmarkIndexCompactLargeKeySpace(b *testing.B) { benchmarkIndexCompact(b, 3, 100000) }
3030

31-
func BenchmarkIndexKeepBase(b *testing.B) { benchmarkIndexKeep(b, 3, 100) }
32-
func BenchmarkIndexKeepLongKey(b *testing.B) { benchmarkIndexKeep(b, 512, 100) }
33-
func BenchmarkIndexKeepLargeKeySpace(b *testing.B) { benchmarkIndexKeep(b, 3, 100000) }
31+
//func BenchmarkIndexKeepBase(b *testing.B) { benchmarkIndexKeep(b, 3, 100) }
32+
//func BenchmarkIndexKeepLongKey(b *testing.B) { benchmarkIndexKeep(b, 512, 100) }
33+
//func BenchmarkIndexKeepLargeKeySpace(b *testing.B) { benchmarkIndexKeep(b, 3, 100000) }
3434

3535
func BenchmarkIndexPutBase(b *testing.B) { benchmarkIndexPut(b, 3, 100) }
3636
func BenchmarkIndexPutLongKey(b *testing.B) { benchmarkIndexPut(b, 512, 100) }
3737
func BenchmarkIndexPutLargeKeySpace(b *testing.B) { benchmarkIndexPut(b, 3, 100000) }
3838

39-
func BenchmarkIndexTombstoneBase(b *testing.B) { benchmarkIndexTombstone(b, 3, 100, 25) }
40-
func BenchmarkIndexTombstoneLongKey(b *testing.B) { benchmarkIndexTombstone(b, 512, 100, 25) }
41-
func BenchmarkIndexTombstoneLargeKeySpace(b *testing.B) { benchmarkIndexTombstone(b, 3, 100000, 25) }
39+
//func BenchmarkIndexTombstoneBase(b *testing.B) { benchmarkIndexTombstone(b, 3, 100, 25) }
40+
//func BenchmarkIndexTombstoneLongKey(b *testing.B) { benchmarkIndexTombstone(b, 512, 100, 25) }
41+
//func BenchmarkIndexTombstoneLargeKeySpace(b *testing.B) { benchmarkIndexTombstone(b, 3, 100000, 25) }
4242

4343
func BenchmarkIndexGetBase(b *testing.B) { benchmarkIndexGet(b, 3, 100, 1, 25) }
4444
func BenchmarkIndexGetRepeatedKeys(b *testing.B) { benchmarkIndexGet(b, 3, 100, 1000, 25) }

0 commit comments

Comments
 (0)