15
15
package mvcc
16
16
17
17
import (
18
- "sync "
19
-
18
+ "bytes "
19
+ "fmt"
20
20
"github.com/google/btree"
21
21
"go.uber.org/zap"
22
22
)
@@ -30,102 +30,98 @@ type index interface {
30
30
Tombstone (key []byte , rev Revision ) error
31
31
Compact (rev int64 ) map [Revision ]struct {}
32
32
Keep (rev int64 ) map [Revision ]struct {}
33
-
34
- Insert (ki * keyIndex )
35
- KeyIndex (ki * keyIndex ) * keyIndex
36
33
}
37
34
38
35
type treeIndex struct {
39
- sync.RWMutex
40
- tree * btree.BTreeG [* keyIndex ]
41
- lg * zap.Logger
36
+ baseRev int64
37
+ revisionTree []* btree.BTreeG [keyRev ]
38
+ lg * zap.Logger
39
+ }
40
+
41
+ type keyRev struct {
42
+ key []byte
43
+ mod , created Revision
44
+ version int64
45
+ }
46
+
47
+ var lessThen btree.LessFunc [keyRev ] = func (k keyRev , k2 keyRev ) bool {
48
+ return compare (k , k2 ) == - 1
49
+ }
50
+
51
+ func compare (k keyRev , k2 keyRev ) int {
52
+ return bytes .Compare (k .key , k2 .key )
42
53
}
43
54
44
55
func newTreeIndex (lg * zap.Logger ) * treeIndex {
45
56
return & treeIndex {
46
- tree : btree .NewG (32 , func (aki * keyIndex , bki * keyIndex ) bool {
47
- return aki .Less (bki )
48
- }),
49
- lg : lg ,
57
+ baseRev : - 1 ,
58
+ lg : lg ,
50
59
}
51
60
}
52
61
53
62
func (ti * treeIndex ) Put (key []byte , rev Revision ) {
54
- keyi := & keyIndex {key : key }
55
-
56
- ti .Lock ()
57
- defer ti .Unlock ()
58
- okeyi , ok := ti .tree .Get (keyi )
59
- if ! ok {
60
- keyi .put (ti .lg , rev .Main , rev .Sub )
61
- ti .tree .ReplaceOrInsert (keyi )
62
- return
63
+ if ti .baseRev == - 1 {
64
+ ti .baseRev = rev .Main - 1
65
+ ti .revisionTree = []* btree.BTreeG [keyRev ]{
66
+ btree .NewG [keyRev ](32 , lessThen ),
67
+ }
68
+ }
69
+ if rev .Main != ti .rev ()+ 1 {
70
+ panic (fmt .Sprintf ("append only, lastRev: %d, putRev: %d" , ti .rev (), rev .Main ))
63
71
}
64
- okeyi .put (ti .lg , rev .Main , rev .Sub )
72
+ prevTree := ti .revisionTree [len (ti .revisionTree )- 1 ]
73
+ item , found := prevTree .Get (keyRev {key : key })
74
+ created := rev
75
+ var version int64 = 1
76
+ if found {
77
+ created = item .created
78
+ version = item .version + 1
79
+ }
80
+ newTree := prevTree .Clone ()
81
+ newTree .ReplaceOrInsert (keyRev {
82
+ key : key ,
83
+ mod : rev ,
84
+ created : created ,
85
+ version : version ,
86
+ })
87
+ ti .revisionTree = append (ti .revisionTree , newTree )
65
88
}
66
89
67
- func (ti * treeIndex ) Get (key []byte , atRev int64 ) (modified , created Revision , ver int64 , err error ) {
68
- ti .RLock ()
69
- defer ti .RUnlock ()
70
- return ti .unsafeGet (key , atRev )
90
+ func (ti * treeIndex ) rev () int64 {
91
+ return ti .baseRev + int64 (len (ti .revisionTree )) - 1
71
92
}
72
93
73
- func (ti * treeIndex ) unsafeGet (key []byte , atRev int64 ) (modified , created Revision , ver int64 , err error ) {
74
- keyi := & keyIndex { key : key }
75
- if keyi = ti . keyIndex ( keyi ); keyi == nil {
94
+ func (ti * treeIndex ) Get (key []byte , atRev int64 ) (modified , created Revision , ver int64 , err error ) {
95
+ idx := atRev - ti . baseRev
96
+ if idx < 0 || idx >= int64 ( len ( ti . revisionTree )) {
76
97
return Revision {}, Revision {}, 0 , ErrRevisionNotFound
77
98
}
78
- return keyi .get (ti .lg , atRev )
79
- }
80
-
81
- func (ti * treeIndex ) KeyIndex (keyi * keyIndex ) * keyIndex {
82
- ti .RLock ()
83
- defer ti .RUnlock ()
84
- return ti .keyIndex (keyi )
85
- }
86
-
87
- func (ti * treeIndex ) keyIndex (keyi * keyIndex ) * keyIndex {
88
- if ki , ok := ti .tree .Get (keyi ); ok {
89
- return ki
99
+ tree := ti .revisionTree [idx ]
100
+ keyRev , found := tree .Get (keyRev {key : key })
101
+ if ! found {
102
+ return Revision {}, Revision {}, 0 , ErrRevisionNotFound
90
103
}
91
- return nil
92
- }
93
-
94
- func (ti * treeIndex ) unsafeVisit (key , end []byte , f func (ki * keyIndex ) bool ) {
95
- keyi , endi := & keyIndex {key : key }, & keyIndex {key : end }
96
-
97
- ti .tree .AscendGreaterOrEqual (keyi , func (item * keyIndex ) bool {
98
- if len (endi .key ) > 0 && ! item .Less (endi ) {
99
- return false
100
- }
101
- if ! f (item ) {
102
- return false
103
- }
104
- return true
105
- })
104
+ return keyRev .mod , keyRev .created , keyRev .version , nil
106
105
}
107
106
108
107
// Revisions returns limited number of revisions from key(included) to end(excluded)
109
108
// at the given rev. The returned slice is sorted in the order of key. There is no limit if limit <= 0.
110
109
// The second return parameter isn't capped by the limit and reflects the total number of revisions.
111
110
func (ti * treeIndex ) Revisions (key , end []byte , atRev int64 , limit int ) (revs []Revision , total int ) {
112
- ti .RLock ()
113
- defer ti .RUnlock ()
114
-
115
111
if end == nil {
116
- rev , _ , _ , err := ti .unsafeGet (key , atRev )
112
+ rev , _ , _ , err := ti .Get (key , atRev )
117
113
if err != nil {
118
114
return nil , 0
119
115
}
120
116
return []Revision {rev }, 1
121
117
}
122
- ti .unsafeVisit (key , end , func (ki * keyIndex ) bool {
123
- if rev , _ , _ , err := ki .get (ti .lg , atRev ); err == nil {
124
- if limit <= 0 || len (revs ) < limit {
125
- revs = append (revs , rev )
126
- }
127
- total ++
118
+ idx := atRev - ti .baseRev
119
+ tree := ti .revisionTree [idx ]
120
+ tree .AscendRange (keyRev {key : key }, keyRev {key : end }, func (kr keyRev ) bool {
121
+ if limit <= 0 || len (revs ) < limit {
122
+ revs = append (revs , kr .mod )
128
123
}
124
+ total ++
129
125
return true
130
126
})
131
127
return revs , total
@@ -134,119 +130,74 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []
134
130
// CountRevisions returns the number of revisions
135
131
// from key(included) to end(excluded) at the given rev.
136
132
func (ti * treeIndex ) CountRevisions (key , end []byte , atRev int64 ) int {
137
- ti .RLock ()
138
- defer ti .RUnlock ()
139
-
140
133
if end == nil {
141
- _ , _ , _ , err := ti .unsafeGet (key , atRev )
134
+ _ , _ , _ , err := ti .Get (key , atRev )
142
135
if err != nil {
143
136
return 0
144
137
}
145
138
return 1
146
139
}
140
+ idx := atRev - ti .baseRev
141
+ tree := ti .revisionTree [idx ]
147
142
total := 0
148
- ti .unsafeVisit (key , end , func (ki * keyIndex ) bool {
149
- if _ , _ , _ , err := ki .get (ti .lg , atRev ); err == nil {
150
- total ++
151
- }
143
+ tree .AscendRange (keyRev {key : key }, keyRev {key : end }, func (kr keyRev ) bool {
144
+ total ++
152
145
return true
153
146
})
154
147
return total
155
148
}
156
149
157
150
func (ti * treeIndex ) Range (key , end []byte , atRev int64 ) (keys [][]byte , revs []Revision ) {
158
- ti .RLock ()
159
- defer ti .RUnlock ()
160
-
161
151
if end == nil {
162
- rev , _ , _ , err := ti .unsafeGet (key , atRev )
152
+ rev , _ , _ , err := ti .Get (key , atRev )
163
153
if err != nil {
164
154
return nil , nil
165
155
}
166
156
return [][]byte {key }, []Revision {rev }
167
157
}
168
- ti . unsafeVisit ( key , end , func ( ki * keyIndex ) bool {
169
- if rev , _ , _ , err := ki . get ( ti .lg , atRev ); err == nil {
170
- revs = append ( revs , rev )
171
- keys = append (keys , ki . key )
172
- }
158
+ idx := atRev - ti . baseRev
159
+ tree := ti .revisionTree [ idx ]
160
+ tree . AscendRange ( keyRev { key : key }, keyRev { key : end }, func ( kr keyRev ) bool {
161
+ revs = append (revs , kr . mod )
162
+ keys = append ( keys , kr . key )
173
163
return true
174
164
})
175
165
return keys , revs
176
166
}
177
167
178
168
func (ti * treeIndex ) Tombstone (key []byte , rev Revision ) error {
179
- keyi := & keyIndex {key : key }
180
-
181
- ti .Lock ()
182
- defer ti .Unlock ()
183
- ki , ok := ti .tree .Get (keyi )
184
- if ! ok {
169
+ if rev .Main != ti .rev ()+ 1 {
170
+ panic (fmt .Sprintf ("append only, lastRev: %d, putRev: %d" , ti .rev (), rev .Main ))
171
+ }
172
+ prevTree := ti .revisionTree [len (ti .revisionTree )- 1 ]
173
+ newTree := prevTree .Clone ()
174
+ _ , found := prevTree .Delete (keyRev {
175
+ key : key ,
176
+ })
177
+ ti .revisionTree = append (ti .revisionTree , newTree )
178
+ if ! found {
185
179
return ErrRevisionNotFound
186
180
}
187
-
188
- return ki .tombstone (ti .lg , rev .Main , rev .Sub )
181
+ return nil
189
182
}
190
183
191
184
func (ti * treeIndex ) Compact (rev int64 ) map [Revision ]struct {} {
192
185
available := make (map [Revision ]struct {})
193
186
ti .lg .Info ("compact tree index" , zap .Int64 ("revision" , rev ))
194
- ti .Lock ()
195
- clone := ti .tree .Clone ()
196
- ti .Unlock ()
197
-
198
- clone .Ascend (func (keyi * keyIndex ) bool {
199
- // Lock is needed here to prevent modification to the keyIndex while
200
- // compaction is going on or revision added to empty before deletion
201
- ti .Lock ()
202
- keyi .compact (ti .lg , rev , available )
203
- if keyi .isEmpty () {
204
- _ , ok := ti .tree .Delete (keyi )
205
- if ! ok {
206
- ti .lg .Panic ("failed to delete during compaction" )
207
- }
208
- }
209
- ti .Unlock ()
210
- return true
211
- })
187
+ idx := rev - ti .baseRev
188
+ ti .revisionTree = ti .revisionTree [idx :]
189
+ ti .baseRev = rev
212
190
return available
213
191
}
214
192
215
193
// Keep finds all revisions to be kept for a Compaction at the given rev.
216
194
func (ti * treeIndex ) Keep (rev int64 ) map [Revision ]struct {} {
217
195
available := make (map [Revision ]struct {})
218
- ti .RLock ()
219
- defer ti .RUnlock ()
220
- ti . tree .Ascend (func (keyi * keyIndex ) bool {
221
- keyi . keep ( rev , available )
196
+ idx := rev - ti .baseRev
197
+ tree := ti .revisionTree [ idx ]
198
+ tree .Ascend (func (item keyRev ) bool {
199
+ available [ item . mod ] = struct {}{}
222
200
return true
223
201
})
224
202
return available
225
203
}
226
-
227
- func (ti * treeIndex ) Equal (bi index ) bool {
228
- b := bi .(* treeIndex )
229
-
230
- if ti .tree .Len () != b .tree .Len () {
231
- return false
232
- }
233
-
234
- equal := true
235
-
236
- ti .tree .Ascend (func (aki * keyIndex ) bool {
237
- bki , _ := b .tree .Get (aki )
238
- if ! aki .equal (bki ) {
239
- equal = false
240
- return false
241
- }
242
- return true
243
- })
244
-
245
- return equal
246
- }
247
-
248
- func (ti * treeIndex ) Insert (ki * keyIndex ) {
249
- ti .Lock ()
250
- defer ti .Unlock ()
251
- ti .tree .ReplaceOrInsert (ki )
252
- }
0 commit comments