@@ -396,46 +396,63 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
396
396
397
397
// TestWatchBatchUnsynced tests batching on unsynced watchers
398
398
func TestWatchBatchUnsynced (t * testing.T ) {
399
- b , _ := betesting . NewDefaultTmpBackend ( t )
400
- s := New ( zaptest . NewLogger ( t ), b , & lease. FakeLessor {}, StoreConfig {})
401
- oldMaxRevs := watchBatchMaxRevs
402
- defer func () {
403
- watchBatchMaxRevs = oldMaxRevs
404
- cleanup ( s , b )
405
- }()
406
- batches := 3
407
- watchBatchMaxRevs = 4
408
- eventsPerRevision := 3
409
-
410
- v := [] byte ( "foo" )
411
- for i := 0 ; i < watchBatchMaxRevs * batches ; i ++ {
412
- txn := s . Write ( traceutil . TODO ())
413
- for j := 0 ; j < eventsPerRevision ; j ++ {
414
- txn . Put ( v , v , lease . NoLease )
415
- }
416
- txn . End ()
399
+ tcs := [] struct {
400
+ name string
401
+ batches int
402
+ watchBatchMaxRevs int
403
+ eventsPerRevision int
404
+ expectRevisionBatches [][] int64
405
+ }{
406
+ {
407
+ name : "3 batches, 4 revs per batch, 3 events per revision" ,
408
+ batches : 3 ,
409
+ watchBatchMaxRevs : 4 ,
410
+ eventsPerRevision : 3 ,
411
+ expectRevisionBatches : [][] int64 {
412
+ { 2 , 2 , 2 , 3 , 3 , 3 , 4 , 4 , 4 , 5 , 5 , 5 },
413
+ { 6 , 6 , 6 , 7 , 7 , 7 , 8 , 8 , 8 , 9 , 9 , 9 },
414
+ { 10 , 10 , 10 , 11 , 11 , 11 , 12 , 12 , 12 , 13 , 13 , 13 },
415
+ },
416
+ },
417
417
}
418
+ for _ , tc := range tcs {
419
+ t .Run (tc .name , func (t * testing.T ) {
420
+ b , _ := betesting .NewDefaultTmpBackend (t )
421
+ s := New (zaptest .NewLogger (t ), b , & lease.FakeLessor {}, StoreConfig {})
422
+ oldMaxRevs := watchBatchMaxRevs
423
+ defer func () {
424
+ watchBatchMaxRevs = oldMaxRevs
425
+ cleanup (s , b )
426
+ }()
427
+ watchBatchMaxRevs = tc .watchBatchMaxRevs
418
428
419
- w := s .NewWatchStream ()
420
- defer w .Close ()
429
+ v := []byte ("foo" )
430
+ for i := 0 ; i < watchBatchMaxRevs * tc .batches ; i ++ {
431
+ txn := s .Write (traceutil .TODO ())
432
+ for j := 0 ; j < tc .eventsPerRevision ; j ++ {
433
+ txn .Put (v , v , lease .NoLease )
434
+ }
435
+ txn .End ()
436
+ }
421
437
422
- w .Watch (0 , v , nil , 1 )
423
- revisionBatches := make ([][]int64 , batches )
424
- for i := 0 ; i < batches ; i ++ {
425
- for _ , e := range (<- w .Chan ()).Events {
426
- revisionBatches [i ] = append (revisionBatches [i ], e .Kv .ModRevision )
427
- }
438
+ w := s .NewWatchStream ()
439
+ defer w .Close ()
440
+
441
+ w .Watch (0 , v , nil , 1 )
442
+ revisionBatches := make ([][]int64 , tc .batches )
443
+ for i := 0 ; i < tc .batches ; i ++ {
444
+ for _ , e := range (<- w .Chan ()).Events {
445
+ revisionBatches [i ] = append (revisionBatches [i ], e .Kv .ModRevision )
446
+ }
447
+ }
448
+ assert .Equal (t , tc .expectRevisionBatches , revisionBatches )
449
+
450
+ s .store .revMu .Lock ()
451
+ defer s .store .revMu .Unlock ()
452
+ assert .Equal (t , 1 , s .synced .size ())
453
+ assert .Equal (t , 0 , s .unsynced .size ())
454
+ })
428
455
}
429
- assert .Equal (t , [][]int64 {
430
- {2 , 2 , 2 , 3 , 3 , 3 , 4 , 4 , 4 , 5 , 5 , 5 },
431
- {6 , 6 , 6 , 7 , 7 , 7 , 8 , 8 , 8 , 9 , 9 , 9 },
432
- {10 , 10 , 10 , 11 , 11 , 11 , 12 , 12 , 12 , 13 , 13 , 13 },
433
- }, revisionBatches )
434
-
435
- s .store .revMu .Lock ()
436
- defer s .store .revMu .Unlock ()
437
- assert .Equal (t , 1 , s .synced .size ())
438
- assert .Equal (t , 0 , s .unsynced .size ())
439
456
}
440
457
441
458
func TestNewMapwatcherToEventMap (t * testing.T ) {
0 commit comments