Skip to content

Commit b97989a

Browse files
committed
watch ranges over transaction limit
1 parent 71486b1 commit b97989a

9 files changed

Lines changed: 214 additions & 98 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ The implementation is in the `pkg/drivers/fdb` directory.
4444
## TODO
4545

4646
Here is a list of implementation details that need to be completed before starting scale testing of this implementation.
47-
- [ ] Watch operation has to account for long-running or large transactions. Make sure that in such case there are multiple consecutive FDB transactions.
4847
- [ ] put ttl events into a separate slice
4948
- [ ] limit records to 2MiB
5049

@@ -89,5 +88,6 @@ Here is a list of implementation details that need to be completed before starti
8988
- https://forums.foundationdb.org/t/relax-consistency-guarantees/1560/19
9089
- https://forums.foundationdb.org/t/generating-sortable-unique-id-primary-key-across-the-cluster-in-an-entity-relationship-model/3789/2
9190
- https://forums.foundationdb.org/t/foundationdb-read-performance/729/4
91+
- https://forums.foundationdb.org/t/ranges-without-explicit-end-go/773/13
9292
-
9393

pkg/drivers/fdb/fdb_compact.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,23 @@ func (c *compactProcessor) next(tr *fdb.Transaction, it *fdb.RangeIterator) (fdb
2828
if err != nil {
2929
return nil, false, err
3030
}
31+
if rev == nil {
32+
return nil, false, nil
33+
}
3134

3235
lastRecord, err := c.f.getLast(tr, record.Key)
3336
if err != nil {
3437
return nil, false, err
3538
}
3639

37-
if lastRecord.Key.Rev != rev || record.IsDelete {
38-
c.f.byKeyAndRevision.Delete(tr, &KeyAndRevision{Key: record.Key, Rev: rev})
39-
if err := c.f.byRevision.Delete(tr, rev); err != nil {
40+
if lastRecord.Key.Rev != *rev || record.IsDelete {
41+
c.f.byKeyAndRevision.Delete(tr, &KeyAndRevision{Key: record.Key, Rev: *rev})
42+
if err := c.f.byRevision.Delete(tr, *rev); err != nil {
4043
return nil, false, err
4144
}
4245
}
43-
c.batchCompacted = rev
44-
return c.f.byRevision.GetSubspace().Pack(tuple.Tuple{rev}), true, nil
46+
c.batchCompacted = *rev
47+
return c.f.byRevision.GetSubspace().Pack(tuple.Tuple{*rev}), true, nil
4548
}
4649

4750
func (c *compactProcessor) endBatch(tr *fdb.Transaction, isLast bool) error {

pkg/drivers/fdb/fdb_read.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (c *listCollector) next(tr *fdb.Transaction, record *ByKeyAndRevisionRecord
145145
}
146146
c.batchIterators = append(c.batchIterators, recordIt)
147147

148-
if len(c.batchIterators) > 2 {
148+
if len(c.batchIterators) >= 1 {
149149
if err := c.fetchIterators(); err != nil {
150150
return nil, false, err
151151
}
@@ -154,7 +154,7 @@ func (c *listCollector) next(tr *fdb.Transaction, record *ByKeyAndRevisionRecord
154154
}
155155

156156
func (c *listCollector) needMore() bool {
157-
return c.limit == 0 || int64(len(c.batchIterators)+len(c.records)) < c.limit
157+
return c.limit == 0 || int64(len(c.batchIterators)+len(c.batchRecords)+len(c.records)) < c.limit
158158
}
159159

160160
func (c *listCollector) endBatch(*fdb.Transaction, bool) error {
@@ -166,18 +166,18 @@ func (c *listCollector) fetchIterators() error {
166166
rev, record, err := c.f.byRevision.GetFromIterator(it)
167167
if err != nil {
168168
return err
169+
} else if rev == nil {
170+
return fmt.Errorf("no records in by rev iterator")
169171
} else {
170-
c.batchRecords = append(c.batchRecords, &RevRecord{Rev: rev, Record: record})
172+
c.batchRecords = append(c.batchRecords, &RevRecord{Rev: *rev, Record: record})
171173
}
172174
}
173175
c.batchIterators = c.batchIterators[len(c.batchIterators):]
174176
return nil
175177
}
176178

177179
func (c *listCollector) postBatch() {
178-
for _, record := range c.batchRecords {
179-
c.records = append(c.records, record)
180-
}
180+
c.records = append(c.records, c.batchRecords...)
181181
}
182182

183183
func (c *listCollector) String() string {
@@ -213,6 +213,10 @@ func (c *recordCollector) next(tr *fdb.Transaction, it *fdb.RangeIterator) (fdb.
213213
if err != nil {
214214
return nil, false, err
215215
}
216+
if nextKeyAndRevRecord == nil {
217+
return nil, false, nil
218+
}
219+
216220
if c.batchCurrentRecord != nil && c.batchCurrentRecord.Key.Key != nextKeyAndRevRecord.Key.Key {
217221
if !c.batchCurrentRecord.Value.IsDelete {
218222
if _, _, err := c.inner.next(tr, c.batchCurrentRecord); err != nil {

pkg/drivers/fdb/fdb_subspace_by_key_revision.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ func (s *ByKeyAndRevisionSubspace) Delete(tr *fdb.Transaction, key *KeyAndRevisi
6161
}
6262

6363
func (s *ByKeyAndRevisionSubspace) GetFromIterator(it *fdb.RangeIterator) (*ByKeyAndRevisionRecord, error) {
64+
if !it.Advance() {
65+
return nil, nil
66+
}
6467
kv, err := it.Get()
6568
if err != nil {
6669
return nil, err

pkg/drivers/fdb/fdb_subspace_by_revision.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,28 +111,28 @@ func (s *ByRevisionSubspace) GetIterator(tr *fdb.Transaction, rev tuple.Versions
111111
return it, nil
112112
}
113113

114-
func (s *ByRevisionSubspace) GetFromIterator(it *fdb.RangeIterator) (tuple.Versionstamp, *Record, error) {
114+
func (s *ByRevisionSubspace) GetFromIterator(it *fdb.RangeIterator) (*tuple.Versionstamp, *Record, error) {
115115
if !it.Advance() {
116-
return dummyVersionstamp, nil, nil
116+
return nil, nil, nil
117117
}
118118
kv, err := it.Get()
119119
if err != nil {
120-
return dummyVersionstamp, nil, err
120+
return nil, nil, err
121121
}
122122
rev, record, err := s.ParseKV(kv)
123123
if err != nil {
124-
return dummyVersionstamp, nil, err
124+
return nil, nil, err
125125
}
126126
if record.ValueSize > int64(len(record.Value)) {
127127
buf := make([]byte, record.ValueSize)
128128
copy(buf, record.Value)
129129
offset := len(record.Value)
130130
if err := s.getBlob(it, buf, offset); err != nil {
131-
return dummyVersionstamp, nil, err
131+
return nil, nil, err
132132
}
133133
record.Value = buf
134134
}
135-
return rev, record, nil
135+
return &rev, record, nil
136136
}
137137

138138
func (s *ByRevisionSubspace) getBlob(it *fdb.RangeIterator, buf []byte, offset int) error {

pkg/drivers/fdb/fdb_test.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package fdb
33
import (
44
"bytes"
55
"context"
6-
"crypto/rand"
76
"fmt"
87
"github.com/stretchr/testify/assert"
98
"golang.org/x/sync/errgroup"
9+
"math/rand"
1010
"slices"
11+
"strings"
1112
"testing"
1213
"time"
1314

@@ -18,7 +19,7 @@ import (
1819
)
1920

2021
func TestFDB(t *testing.T) {
21-
forceRetryTransaction = func(i int) bool { return i < 2 }
22+
forceRetryTransaction = func(i int) bool { return false }
2223

2324
logrus.SetLevel(logrus.InfoLevel)
2425
n := 4
@@ -247,9 +248,19 @@ func TestFDBLargeRecords(t *testing.T) {
247248
err := f.Start(ctx)
248249
require.NoError(t, err)
249250

251+
watchLarge := f.Watch(ctx, "/large/", 0)
252+
250253
recordCount := 300
251254
records := createLargeRecords(t, f, ctx, recordCount)
252255

256+
for i := 0; i < recordCount; {
257+
batch := <-watchLarge.Events
258+
for _, watchedEvent := range batch {
259+
i++
260+
require.Equal(t, records[watchedEvent.KV.Key], watchedEvent.KV.Value, "watched value for key '%s' does not match", watchedEvent.KV.Key)
261+
}
262+
}
263+
253264
// Get record
254265
_, kv, err := f.Get(ctx, "/large/key42", "", 0, 0)
255266
require.NoError(t, err)
@@ -442,20 +453,29 @@ func createLargeRecords(t *testing.T, f server.Backend, ctx context.Context, rec
442453
records := make(map[string][]byte, recordCount)
443454
for i := 0; i < recordCount; i++ {
444455
key := fmt.Sprintf("/large/key%d", i)
445-
value := make([]byte, recordSize)
446-
_, err := rand.Read(value)
447-
require.NoError(t, err)
448-
records[key] = value
456+
value := randomString(recordSize)
457+
records[key] = []byte(value)
449458
v := func(key string, value []byte) (e error) {
450459
_, err := f.Create(ctx, key, value, 0)
451460
return err
452461
}
453-
g.Go(func() error { return v(key, value) })
462+
g.Go(func() error { return v(key, []byte(value)) })
454463
}
455464
require.NoError(t, g.Wait())
456465
return records
457466
}
458467

468+
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
469+
470+
func randomString(n int) string {
471+
var sb strings.Builder
472+
sb.Grow(n)
473+
for i := 0; i < n; i++ {
474+
sb.WriteByte(letters[rand.Intn(len(letters))])
475+
}
476+
return sb.String()
477+
}
478+
459479
func orEmpty(result []*server.KeyValue) []*server.KeyValue {
460480
if result == nil {
461481
result = make([]*server.KeyValue, 0)

pkg/drivers/fdb/fdb_ttl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (f *FDB) ttlEvents(ctx context.Context) chan *server.Event {
118118
defer close(result)
119119

120120
collector := newListCollector(f, 1000)
121-
rev, err := f.listWithCollector("ttl1", "/", "/", 0, collector)
121+
rev, err := f.listWithCollector("ttl1", "/", "", 0, collector)
122122
revRecords := collector.records
123123
for len(revRecords) > 0 {
124124
if err != nil {

pkg/drivers/fdb/fdb_util.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const (
1515
logConflictingKeys = false
1616

1717
splitRangeAfterDuration = 1 * time.Second
18+
transactionTimeout = 10 * time.Second
1819
transactionMaxRetryCount = 1000
1920
)
2021

@@ -30,33 +31,38 @@ type Processor[T any] interface {
3031
type batchResult struct {
3132
lastReadKey fdb.KeyConvertible
3233
collectorNeedsMore bool
33-
iteratorHasMore bool
3434
}
3535

3636
func processRange(db fdb.Database, selector fdb.SelectorRange, collector Processor[*fdb.RangeIterator]) error {
3737
beginSelector := selector.Begin
3838

3939
for i := 0; ; i++ {
40-
res, err := collectBatch(db, fdb.SelectorRange{Begin: beginSelector, End: selector.End}, collector)
40+
res, err := processBatch(db, fdb.SelectorRange{Begin: beginSelector, End: selector.End}, collector)
4141
if err != nil {
4242
return err
4343
}
4444
if !res.collectorNeedsMore {
4545
break
4646
}
47-
if !res.iteratorHasMore {
48-
break
49-
}
5047
beginSelector = fdb.FirstGreaterThan(res.lastReadKey)
5148
}
5249

5350
return nil
5451
}
5552

56-
func collectBatch(db fdb.Database, selector fdb.SelectorRange, collector Processor[*fdb.RangeIterator]) (batchResult, error) {
53+
func processBatch(db fdb.Database, selector fdb.SelectorRange, collector Processor[*fdb.RangeIterator]) (batchResult, error) {
54+
before := time.Now()
55+
defer func() {
56+
dur := time.Since(before)
57+
58+
if dur > 2*splitRangeAfterDuration {
59+
logrus.Warnf("BATCH %s => duration=%s", selector, dur)
60+
}
61+
}()
62+
5763
res, err := transact(db, batchResult{}, func(tr fdb.Transaction) (batchResult, error) {
58-
res := batchResult{collectorNeedsMore: true, iteratorHasMore: true}
59-
if err := tr.Options().SetTimeout(2 * splitRangeAfterDuration.Milliseconds()); err != nil {
64+
res := batchResult{collectorNeedsMore: true}
65+
if err := tr.Options().SetTimeout(transactionTimeout.Milliseconds()); err != nil {
6066
return res, fmt.Errorf("failed to set timeout limit: %w", err)
6167
}
6268

@@ -67,26 +73,22 @@ func collectBatch(db fdb.Database, selector fdb.SelectorRange, collector Process
6773

6874
var firstKey fdb.KeyConvertible
6975
collector.startBatch()
70-
for i := 0; res.collectorNeedsMore && res.iteratorHasMore; i++ {
71-
if !it.Advance() {
72-
res.iteratorHasMore = false
73-
break
74-
}
76+
for i := 0; res.collectorNeedsMore; i++ {
7577
if lastKey, collectorNeedsMore, err := collector.next(&tr, it); err != nil {
7678
return res, err
7779
} else {
7880
if firstKey == nil {
7981
firstKey = lastKey
8082
}
8183
res.lastReadKey = lastKey
82-
res.collectorNeedsMore = collectorNeedsMore
84+
res.collectorNeedsMore = collectorNeedsMore && lastKey != nil
8385
}
8486
if time.Since(start) > splitRangeAfterDuration {
8587
break
8688
}
8789
}
8890

89-
if err := collector.endBatch(&tr, !res.iteratorHasMore); err != nil {
91+
if err := collector.endBatch(&tr, !res.collectorNeedsMore); err != nil {
9092
return res, err
9193
}
9294

0 commit comments

Comments
 (0)