Skip to content

Commit cc64d28

Browse files
committed
Use maxRequestSizeBytes for batch limiting
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 90b2c4c commit cc64d28

14 files changed

+80
-71
lines changed

server/config/config.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,6 @@ func (c *ServerConfig) BootstrapTimeoutEffective() time.Duration {
363363

364364
func (c *ServerConfig) BackendPath() string { return datadir.ToBackendFileName(c.DataDir) }
365365

366-
func (c *ServerConfig) MaxRequestBytesWithOverhead() uint {
367-
return c.MaxRequestBytes + grpcOverheadBytes
366+
func (c *ServerConfig) MaxRequestBytesWithOverhead() int {
367+
return int(c.MaxRequestBytes) + grpcOverheadBytes
368368
}

server/etcdserver/api/v3rpc/grpc.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
6161
opts = append(opts, grpc.ChainUnaryInterceptor(chainUnaryInterceptors...))
6262
opts = append(opts, grpc.ChainStreamInterceptor(chainStreamInterceptors...))
6363

64-
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytesWithOverhead())))
64+
opts = append(opts, grpc.MaxRecvMsgSize(s.Cfg.MaxRequestBytesWithOverhead()))
6565
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
6666
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))
6767

server/etcdserver/api/v3rpc/watch.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type watchServer struct {
4343
clusterID int64
4444
memberID int64
4545

46-
maxRequestBytes uint
46+
maxRequestBytes int
4747

4848
sg apply.RaftStatusGetter
4949
watchable mvcc.WatchableKV
@@ -126,7 +126,7 @@ type serverWatchStream struct {
126126
clusterID int64
127127
memberID int64
128128

129-
maxRequestBytes uint
129+
maxRequestBytes int
130130

131131
sg apply.RaftStatusGetter
132132
watchable mvcc.WatchableKV
@@ -544,12 +544,12 @@ func IsCreateEvent(e mvccpb.Event) bool {
544544

545545
func sendFragments(
546546
wr *pb.WatchResponse,
547-
maxRequestBytes uint,
547+
maxRequestBytes int,
548548
sendFunc func(*pb.WatchResponse) error,
549549
) error {
550550
// no need to fragment if total request size is smaller
551551
// than max request limit or response contains only one event
552-
if uint(wr.Size()) < maxRequestBytes || len(wr.Events) < 2 {
552+
if wr.Size() < maxRequestBytes || len(wr.Events) < 2 {
553553
return sendFunc(wr)
554554
}
555555

@@ -562,7 +562,7 @@ func sendFragments(
562562
cur := ow
563563
for _, ev := range wr.Events[idx:] {
564564
cur.Events = append(cur.Events, ev)
565-
if len(cur.Events) > 1 && uint(cur.Size()) >= maxRequestBytes {
565+
if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes {
566566
cur.Events = cur.Events[:len(cur.Events)-1]
567567
break
568568
}

server/etcdserver/api/v3rpc/watch_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
func TestSendFragment(t *testing.T) {
2828
tt := []struct {
2929
wr *pb.WatchResponse
30-
maxRequestBytes uint
30+
maxRequestBytes int
3131
fragments int
3232
werr error
3333
}{

server/etcdserver/corrupt_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ func TestHashKVHandler(t *testing.T) {
516516
etcdSrv.cluster.SetID(types.ID(localClusterID), types.ID(localClusterID))
517517
be, _ := betesting.NewDefaultTmpBackend(t)
518518
defer betesting.Close(t, be)
519-
etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
519+
etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
520520
defer func() {
521521
assert.NoError(t, etcdSrv.kv.Close())
522522
}()

server/etcdserver/server.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -368,9 +368,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
368368
return nil, err
369369
}
370370

371-
mvccStoreConfig := mvcc.StoreConfig{
372-
CompactionBatchLimit: cfg.CompactionBatchLimit,
373-
CompactionSleepInterval: cfg.CompactionSleepInterval,
371+
mvccStoreConfig := mvcc.WatchableStoreConfig{
372+
StoreConfig: mvcc.StoreConfig{
373+
CompactionBatchLimit: cfg.CompactionBatchLimit,
374+
CompactionSleepInterval: cfg.CompactionSleepInterval,
375+
},
376+
WatchBatchMaxSize: cfg.MaxRequestBytesWithOverhead(),
374377
}
375378
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
376379
srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())

server/etcdserver/server_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ func TestSnapshotDisk(t *testing.T) {
652652
v2store: st,
653653
consistIndex: cindex.NewConsistentIndex(be),
654654
}
655-
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
655+
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
656656
defer func() {
657657
assert.NoError(t, srv.kv.Close())
658658
}()
@@ -703,7 +703,7 @@ func TestSnapshotMemory(t *testing.T) {
703703
v2store: st,
704704
consistIndex: cindex.NewConsistentIndex(be),
705705
}
706-
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
706+
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
707707
defer func() {
708708
assert.NoError(t, srv.kv.Close())
709709
}()
@@ -775,7 +775,7 @@ func TestSnapshotOrdering(t *testing.T) {
775775
beHooks: serverstorage.NewBackendHooks(lg, ci),
776776
}
777777

778-
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
778+
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
779779
s.be = be
780780

781781
s.start()
@@ -869,7 +869,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
869869
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 1),
870870
}
871871

872-
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
872+
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
873873
s.be = be
874874

875875
s.start()

server/storage/mvcc/kv_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,7 @@ func TestKVSnapshot(t *testing.T) {
757757

758758
func TestWatchableKVWatch(t *testing.T) {
759759
b, _ := betesting.NewDefaultTmpBackend(t)
760-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
760+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
761761
defer cleanup(s, b)
762762

763763
w := s.NewWatchStream()

server/storage/mvcc/watchable_store.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type watchable interface {
5050

5151
type watchableStore struct {
5252
*store
53+
watchBatchMaxSize int
5354

5455
// mu protects watcher groups and batches. It should never be locked
5556
// before locking store.mu to avoid deadlock.
@@ -76,24 +77,30 @@ var _ WatchableKV = (*watchableStore)(nil)
7677
// cancel operations.
7778
type cancelFunc func()
7879

79-
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
80+
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore {
8081
s := newWatchableStore(lg, b, le, cfg)
8182
s.wg.Add(2)
8283
go s.syncWatchersLoop()
8384
go s.syncVictimsLoop()
8485
return s
8586
}
8687

87-
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
88+
type WatchableStoreConfig struct {
89+
StoreConfig
90+
WatchBatchMaxSize int
91+
}
92+
93+
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore {
8894
if lg == nil {
8995
lg = zap.NewNop()
9096
}
9197
s := &watchableStore{
92-
store: NewStore(lg, b, le, cfg),
93-
victimc: make(chan struct{}, 1),
94-
unsynced: newWatcherGroup(),
95-
synced: newWatcherGroup(),
96-
stopc: make(chan struct{}),
98+
store: NewStore(lg, b, le, cfg.StoreConfig),
99+
victimc: make(chan struct{}, 1),
100+
unsynced: newWatcherGroup(),
101+
synced: newWatcherGroup(),
102+
stopc: make(chan struct{}),
103+
watchBatchMaxSize: cfg.WatchBatchMaxSize,
97104
}
98105
s.store.ReadView = &readView{s}
99106
s.store.WriteView = &writeView{s}
@@ -361,7 +368,7 @@ func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event)
361368
evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1)
362369

363370
victims := make(watcherBatch)
364-
wb := newWatcherBatch(wg, evs)
371+
wb := newWatcherBatch(wg, evs, s.watchBatchMaxSize)
365372
for w := range wg.watchers {
366373
if w.minRev < compactionRev {
367374
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
@@ -484,7 +491,7 @@ func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) {
484491
// watchers that watch on the key of the event.
485492
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
486493
victim := make(watcherBatch)
487-
for w, eb := range newWatcherBatch(&s.synced, evs) {
494+
for w, eb := range newWatcherBatch(&s.synced, evs, s.watchBatchMaxSize) {
488495
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
489496
pendingEventsGauge.Add(float64(len(eb.evs)))
490497
} else {

server/storage/mvcc/watchable_store_bench_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
func BenchmarkWatchableStorePut(b *testing.B) {
2929
be, _ := betesting.NewDefaultTmpBackend(b)
30-
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
30+
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
3131
defer cleanup(s, be)
3232

3333
// arbitrary number of bytes
@@ -47,7 +47,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
4747
// some synchronization operations, such as mutex locking.
4848
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
4949
be, _ := betesting.NewDefaultTmpBackend(b)
50-
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
50+
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
5151
defer cleanup(s, be)
5252

5353
// arbitrary number of bytes
@@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {
7878

7979
func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
8080
be, _ := betesting.NewDefaultTmpBackend(b)
81-
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
81+
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
8282
defer cleanup(s, be)
8383

8484
k := []byte("testkey")
@@ -122,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
122122
// we should put to simulate the real-world use cases.
123123
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
124124
be, _ := betesting.NewDefaultTmpBackend(b)
125-
ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
125+
ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
126126

127127
defer cleanup(ws, be)
128128

@@ -164,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
164164

165165
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
166166
be, _ := betesting.NewDefaultTmpBackend(b)
167-
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
167+
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
168168

169169
defer cleanup(s, be)
170170

server/storage/mvcc/watchable_store_test.go

+16-20
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333

3434
func TestWatch(t *testing.T) {
3535
b, _ := betesting.NewDefaultTmpBackend(t)
36-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
36+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
3737
defer cleanup(s, b)
3838

3939
testKey := []byte("foo")
@@ -52,7 +52,7 @@ func TestWatch(t *testing.T) {
5252

5353
func TestNewWatcherCancel(t *testing.T) {
5454
b, _ := betesting.NewDefaultTmpBackend(t)
55-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
55+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
5656
defer cleanup(s, b)
5757

5858
testKey := []byte("foo")
@@ -81,7 +81,7 @@ func TestCancelUnsynced(t *testing.T) {
8181
// because newWatchableStore automatically calls syncWatchers
8282
// method to sync watchers in unsynced map. We want to keep watchers
8383
// in unsynced to test if syncWatchers works as expected.
84-
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
84+
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
8585
defer cleanup(s, b)
8686

8787
// Put a key so that we can spawn watchers on that key.
@@ -125,7 +125,7 @@ func TestCancelUnsynced(t *testing.T) {
125125
// and moves these watchers to synced.
126126
func TestSyncWatchers(t *testing.T) {
127127
b, _ := betesting.NewDefaultTmpBackend(t)
128-
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
128+
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
129129
defer cleanup(s, b)
130130

131131
testKey := []byte("foo")
@@ -271,7 +271,7 @@ func TestRangeEvents(t *testing.T) {
271271
// TestWatchCompacted tests a watcher that watches on a compacted revision.
272272
func TestWatchCompacted(t *testing.T) {
273273
b, _ := betesting.NewDefaultTmpBackend(t)
274-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
274+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
275275
defer cleanup(s, b)
276276

277277
testKey := []byte("foo")
@@ -309,7 +309,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) {
309309

310310
b, _ := betesting.NewDefaultTmpBackend(t)
311311
lg := zaptest.NewLogger(t)
312-
s := New(lg, b, &lease.FakeLessor{}, StoreConfig{})
312+
s := New(lg, b, &lease.FakeLessor{}, WatchableStoreConfig{})
313313

314314
defer func() {
315315
cleanup(s, b)
@@ -363,7 +363,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) {
363363

364364
func TestWatchFutureRev(t *testing.T) {
365365
b, _ := betesting.NewDefaultTmpBackend(t)
366-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
366+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
367367
defer cleanup(s, b)
368368

369369
testKey := []byte("foo")
@@ -402,7 +402,7 @@ func TestWatchRestore(t *testing.T) {
402402
test := func(delay time.Duration) func(t *testing.T) {
403403
return func(t *testing.T) {
404404
b, _ := betesting.NewDefaultTmpBackend(t)
405-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
405+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
406406
defer cleanup(s, b)
407407

408408
testKey := []byte("foo")
@@ -448,11 +448,11 @@ func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
448448
// 5. choose the watcher from step 1, without panic
449449
func TestWatchRestoreSyncedWatcher(t *testing.T) {
450450
b1, _ := betesting.NewDefaultTmpBackend(t)
451-
s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{})
451+
s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, WatchableStoreConfig{})
452452
defer cleanup(s1, b1)
453453

454454
b2, _ := betesting.NewDefaultTmpBackend(t)
455-
s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{})
455+
s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, WatchableStoreConfig{})
456456
defer cleanup(s2, b2)
457457

458458
testKey, testValue := []byte("foo"), []byte("bar")
@@ -555,13 +555,9 @@ func TestWatchBatchUnsynced(t *testing.T) {
555555
for _, tc := range tcs {
556556
t.Run(tc.name, func(t *testing.T) {
557557
b, _ := betesting.NewDefaultTmpBackend(t)
558-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
559-
oldMaxRevs := watchBatchMaxSize
560-
defer func() {
561-
watchBatchMaxSize = oldMaxRevs
562-
cleanup(s, b)
563-
}()
564-
watchBatchMaxSize = tc.watchBatchMaxSize
558+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{
559+
WatchBatchMaxSize: tc.watchBatchMaxSize,
560+
})
565561

566562
k := []byte("k")
567563
eventProtoOverhead := 13
@@ -680,7 +676,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
680676
wg.add(w)
681677
}
682678

683-
gwe := newWatcherBatch(&wg, tt.evs)
679+
gwe := newWatcherBatch(&wg, tt.evs, 0)
684680
if len(gwe) != len(tt.wwe) {
685681
t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe))
686682
}
@@ -702,7 +698,7 @@ func TestWatchVictims(t *testing.T) {
702698
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
703699

704700
b, _ := betesting.NewDefaultTmpBackend(t)
705-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
701+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
706702

707703
defer func() {
708704
cleanup(s, b)
@@ -779,7 +775,7 @@ func TestWatchVictims(t *testing.T) {
779775
// canceling its watches.
780776
func TestStressWatchCancelClose(t *testing.T) {
781777
b, _ := betesting.NewDefaultTmpBackend(t)
782-
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
778+
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
783779
defer cleanup(s, b)
784780

785781
testKey, testValue := []byte("foo"), []byte("bar")

server/storage/mvcc/watcher_bench_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626

2727
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
2828
be, _ := betesting.NewDefaultTmpBackend(b)
29-
watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
29+
watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
3030

3131
defer cleanup(watchable, be)
3232

0 commit comments

Comments
 (0)