@@ -1585,3 +1585,78 @@ func TestV3NoEventsLostOnCompact(t *testing.T) {
1585
1585
}
1586
1586
assert .Truef (t , compacted , "Expected stream to get compacted, instead we got %d events out of %d events" , eventCount , writeCount )
1587
1587
}
1588
+
1589
+ // TestV3WatchLargeTxnFragmentTrue tests watching multiple large transactions w/ fragment enabled.
1590
+ func TestV3WatchLargeTxnFragmentTrue (t * testing.T ) {
1591
+ integration .BeforeTest (t )
1592
+ testV3WatchLargeTxn (t , true )
1593
+ }
1594
+
1595
+ // TestV3WatchLargeTxnFragmentFalse tests watching multiple large transactions w/ fragment disabled.
1596
+ //
1597
+ // Even though each event is only 1.2 MB which is below the maximum response size, the server
1598
+ // attempts to return all four events in one 4.8 MB response which fails.
1599
+ func TestV3WatchLargeTxnFragmentFalse (t * testing.T ) {
1600
+ integration .BeforeTest (t )
1601
+ testV3WatchLargeTxn (t , false )
1602
+ }
1603
+
1604
+ // testV3WatchLargeTxn tests watching multiple large transactions
1605
+ func testV3WatchLargeTxn (t * testing.T , fragment bool ) {
1606
+ clus := integration .NewCluster (t , & integration.ClusterConfig {Size : 3 })
1607
+ defer clus .Terminate (t )
1608
+
1609
+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
1610
+ defer cancel ()
1611
+ wStream , err := integration .ToGRPC (clus .RandClient ()).Watch .Watch (ctx )
1612
+ require .Nil (t , err )
1613
+
1614
+ // Put 12 keys w/ different values of length 100KB in one transaction 4 times.
1615
+ var n = 4
1616
+ var m = 12
1617
+ var rev []int64
1618
+ var svc = integration .ToGRPC (clus .RandClient ()).KV
1619
+ for j := range n {
1620
+ req := & pb.TxnRequest {Success : []* pb.RequestOp {}}
1621
+ for i := range m {
1622
+ req .Success = append (req .Success , & pb.RequestOp {
1623
+ Request : & pb.RequestOp_RequestPut {
1624
+ RequestPut : & pb.PutRequest {
1625
+ Key : []byte (fmt .Sprintf (`test-watch-fragment-%03d` , i )),
1626
+ Value : bytes .Repeat ([]byte {'a' + uint8 (i + j )}, 1e5 ),
1627
+ },
1628
+ },
1629
+ })
1630
+ }
1631
+ resp , err := svc .Txn (ctx , req )
1632
+ require .Nil (t , err , err )
1633
+ assert .True (t , resp .Succeeded )
1634
+ rev = append (rev , resp .Header .Revision )
1635
+ }
1636
+
1637
+ // Create Watch
1638
+ err = wStream .Send (& pb.WatchRequest {RequestUnion : & pb.WatchRequest_CreateRequest {
1639
+ CreateRequest : & pb.WatchCreateRequest {
1640
+ Key : []byte (`test-watch-fragment-000` ),
1641
+ RangeEnd : []byte (`test-watch-fragment-999` ),
1642
+ StartRevision : rev [0 ],
1643
+ Fragment : fragment ,
1644
+ },
1645
+ }})
1646
+ require .Nil (t , err )
1647
+ resp , err := wStream .Recv ()
1648
+ require .Nil (t , err )
1649
+ require .NotNil (t , resp )
1650
+ require .True (t , resp .Created )
1651
+
1652
+ // Receive events
1653
+ var events []* mvccpb.Event
1654
+ for len (events ) < n * m {
1655
+ resp , err := wStream .Recv ()
1656
+ require .Nil (t , err , err )
1657
+ require .NotNil (t , resp )
1658
+ events = append (events , resp .Events ... )
1659
+ }
1660
+ // Last response header revision should match mod revision of last returned event
1661
+ assert .Equal (t , rev [len (rev )- 1 ], events [len (events )- 1 ].Kv .ModRevision )
1662
+ }
0 commit comments