Skip to content

Commit 90b2c4c

Browse files
committed
Implement batching by event size
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent b038739 commit 90b2c4c

File tree

3 files changed

+61
-43
lines changed

3 files changed

+61
-43
lines changed

server/storage/mvcc/watchable_store.go

-6
Original file line numberDiff line numberDiff line change
@@ -485,12 +485,6 @@ func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) {
485485
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
486486
victim := make(watcherBatch)
487487
for w, eb := range newWatcherBatch(&s.synced, evs) {
488-
if eb.revs != 1 {
489-
s.store.lg.Panic(
490-
"unexpected multiple revisions in watch notification",
491-
zap.Int("number-of-revisions", eb.revs),
492-
)
493-
}
494488
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
495489
pendingEventsGauge.Add(float64(len(eb.evs)))
496490
} else {

server/storage/mvcc/watchable_store_test.go

+43-18
Original file line numberDiff line numberDiff line change
@@ -503,64 +503,89 @@ func TestWatchBatchUnsynced(t *testing.T) {
503503
tcs := []struct {
504504
name string
505505
revisions int
506-
watchBatchMaxRevs int
506+
eventSize int
507+
watchBatchMaxSize int
507508
eventsPerRevision int
508509
expectRevisionBatches [][]int64
509510
}{
510511
{
511-
name: "3 revisions, 4 revs per batch, 1 events per revision",
512-
revisions: 12,
513-
watchBatchMaxRevs: 4,
512+
name: "Fits into a single batch",
513+
revisions: 10,
514+
eventSize: 100,
515+
watchBatchMaxSize: 1000,
514516
eventsPerRevision: 1,
515517
expectRevisionBatches: [][]int64{
516-
{2, 3, 4, 5},
517-
{6, 7, 8, 9},
518-
{10, 11, 12, 13},
518+
{2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
519519
},
520520
},
521521
{
522-
name: "3 revisions, 4 revs per batch, 3 events per revision",
523-
revisions: 12,
524-
watchBatchMaxRevs: 4,
522+
name: "Spills to second batch",
523+
revisions: 15,
524+
eventSize: 100,
525+
watchBatchMaxSize: 1000,
526+
eventsPerRevision: 1,
527+
expectRevisionBatches: [][]int64{
528+
{2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
529+
{12, 13, 14, 15, 16},
530+
},
531+
},
532+
{
533+
name: "Spills to second batch, but maintains revision pairs",
534+
revisions: 8,
535+
eventSize: 100,
536+
watchBatchMaxSize: 1000,
537+
eventsPerRevision: 2,
538+
expectRevisionBatches: [][]int64{
539+
{2, 2, 3, 3, 4, 4, 5, 5, 6, 6},
540+
{7, 7, 8, 8, 9, 9},
541+
},
542+
},
543+
{
544+
name: "Spills to second batch, but maintains revision triples",
545+
revisions: 6,
546+
eventSize: 100,
547+
watchBatchMaxSize: 1000,
525548
eventsPerRevision: 3,
526549
expectRevisionBatches: [][]int64{
527550
{2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5},
528-
{6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9},
529-
{10, 10, 10, 11, 11, 11, 12, 12, 12, 13, 13, 13},
551+
{6, 6, 6, 7, 7, 7},
530552
},
531553
},
532554
}
533555
for _, tc := range tcs {
534556
t.Run(tc.name, func(t *testing.T) {
535557
b, _ := betesting.NewDefaultTmpBackend(t)
536558
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
537-
oldMaxRevs := watchBatchMaxRevs
559+
oldMaxRevs := watchBatchMaxSize
538560
defer func() {
539-
watchBatchMaxRevs = oldMaxRevs
561+
watchBatchMaxSize = oldMaxRevs
540562
cleanup(s, b)
541563
}()
542-
watchBatchMaxRevs = tc.watchBatchMaxRevs
564+
watchBatchMaxSize = tc.watchBatchMaxSize
543565

544-
v := []byte("foo")
566+
k := []byte("k")
567+
eventProtoOverhead := 13
568+
v := make([]byte, tc.eventSize-eventProtoOverhead)
545569
for i := 0; i < tc.revisions; i++ {
546570
txn := s.Write(traceutil.TODO())
547571
for j := 0; j < tc.eventsPerRevision; j++ {
548-
txn.Put(v, v, lease.NoLease)
572+
txn.Put(k, v, lease.NoLease)
549573
}
550574
txn.End()
551575
}
552576

553577
w := s.NewWatchStream()
554578
defer w.Close()
555579

556-
w.Watch(0, v, nil, 1)
580+
w.Watch(0, k, nil, 1)
557581
var revisionBatches [][]int64
558582
eventCount := 0
559583
for eventCount < tc.revisions*tc.eventsPerRevision {
560584
var revisions []int64
561585
for _, e := range (<-w.Chan()).Events {
562586
revisions = append(revisions, e.Kv.ModRevision)
563587
eventCount++
588+
assert.Equal(t, tc.eventSize, e.Size())
564589
}
565590
revisionBatches = append(revisionBatches, revisions)
566591
}

server/storage/mvcc/watcher_group.go

+18-19
Original file line numberDiff line numberDiff line change
@@ -22,44 +22,43 @@ import (
2222
"go.etcd.io/etcd/pkg/v3/adt"
2323
)
2424

25-
// watchBatchMaxRevs is the maximum distinct revisions that
25+
// watchBatchMaxSize is the maximum distinct revisions that
2626
// may be sent to an unsynced watcher at a time. Declared as
2727
// var instead of const for testing purposes.
28-
var watchBatchMaxRevs = 1000
28+
var watchBatchMaxSize = 2 * 1024 * 1024
2929

3030
type eventBatch struct {
3131
// evs is a batch of revision-ordered events
3232
evs []mvccpb.Event
33-
// revs is the minimum unique revisions observed for this batch
34-
revs int
33+
// evsSize is total size of events in the batch.
34+
evsSize int
3535
// moreRev is first revision with more events following this batch
3636
moreRev int64
3737
}
3838

3939
func (eb *eventBatch) add(ev mvccpb.Event) {
40-
if eb.revs > watchBatchMaxRevs {
41-
// maxed out batch size
42-
return
43-
}
44-
4540
if len(eb.evs) == 0 {
46-
// base case
47-
eb.revs = 1
41+
eb.evsSize = ev.Size()
4842
eb.evs = append(eb.evs, ev)
4943
return
5044
}
51-
52-
// revision accounting
5345
ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
5446
evRev := ev.Kv.ModRevision
55-
if evRev > ebRev {
56-
eb.revs++
57-
if eb.revs > watchBatchMaxRevs {
58-
eb.moreRev = evRev
59-
return
60-
}
47+
if evRev == ebRev {
48+
eb.evsSize += ev.Size()
49+
eb.evs = append(eb.evs, ev)
50+
return
51+
}
52+
if eb.moreRev != 0 {
53+
return
6154
}
6255

56+
size := ev.Size()
57+
if eb.evsSize+size > watchBatchMaxSize {
58+
eb.moreRev = ev.Kv.ModRevision
59+
return
60+
}
61+
eb.evsSize += size
6362
eb.evs = append(eb.evs, ev)
6463
}
6564

0 commit comments

Comments
 (0)