@@ -1585,3 +1585,78 @@ func TestV3NoEventsLostOnCompact(t *testing.T) {
15851585 }
15861586 assert .Truef (t , compacted , "Expected stream to get compacted, instead we got %d events out of %d events" , eventCount , writeCount )
15871587}
1588+
1589+ // TestV3WatchLargeTxnFragmentTrue tests watching multiple large transactions w/ fragment enabled.
1590+ func TestV3WatchLargeTxnFragmentTrue (t * testing.T ) {
1591+ integration .BeforeTest (t )
1592+ testV3WatchLargeTxn (t , true )
1593+ }
1594+
1595+ // TestV3WatchLargeTxnFragmentFalse tests watching multiple large transactions w/ fragment disabled.
1596+ //
1597+ // Even though each event is only 1.2 MB which is below the maximum response size, the server
1598+ // attempts to return all four events in one 4.8 MB response which fails.
1599+ func TestV3WatchLargeTxnFragmentFalse (t * testing.T ) {
1600+ integration .BeforeTest (t )
1601+ testV3WatchLargeTxn (t , false )
1602+ }
1603+
1604+ // testV3WatchLargeTxn tests watching multiple large transactions
1605+ func testV3WatchLargeTxn (t * testing.T , fragment bool ) {
1606+ clus := integration .NewCluster (t , & integration.ClusterConfig {Size : 3 })
1607+ defer clus .Terminate (t )
1608+
1609+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
1610+ defer cancel ()
1611+ wStream , err := integration .ToGRPC (clus .RandClient ()).Watch .Watch (ctx )
1612+ require .Nil (t , err )
1613+
1614+ // Put 12 keys w/ different values of length 100KB in one transaction 4 times.
1615+ var n = 4
1616+ var m = 12
1617+ var rev []int64
1618+ var svc = integration .ToGRPC (clus .RandClient ()).KV
1619+ for j := range n {
1620+ req := & pb.TxnRequest {Success : []* pb.RequestOp {}}
1621+ for i := range m {
1622+ req .Success = append (req .Success , & pb.RequestOp {
1623+ Request : & pb.RequestOp_RequestPut {
1624+ RequestPut : & pb.PutRequest {
1625+ Key : []byte (fmt .Sprintf (`test-watch-fragment-%03d` , i )),
1626+ Value : bytes .Repeat ([]byte {'a' + uint8 (i + j )}, 1e5 ),
1627+ },
1628+ },
1629+ })
1630+ }
1631+ resp , err := svc .Txn (ctx , req )
1632+ require .Nil (t , err , err )
1633+ assert .True (t , resp .Succeeded )
1634+ rev = append (rev , resp .Header .Revision )
1635+ }
1636+
1637+ // Create Watch
1638+ err = wStream .Send (& pb.WatchRequest {RequestUnion : & pb.WatchRequest_CreateRequest {
1639+ CreateRequest : & pb.WatchCreateRequest {
1640+ Key : []byte (`test-watch-fragment-000` ),
1641+ RangeEnd : []byte (`test-watch-fragment-999` ),
1642+ StartRevision : rev [0 ],
1643+ Fragment : fragment ,
1644+ },
1645+ }})
1646+ require .Nil (t , err )
1647+ resp , err := wStream .Recv ()
1648+ require .Nil (t , err )
1649+ require .NotNil (t , resp )
1650+ require .True (t , resp .Created )
1651+
1652+ // Receive events
1653+ var events []* mvccpb.Event
1654+ for len (events ) < n * m {
1655+ resp , err := wStream .Recv ()
1656+ require .Nil (t , err , err )
1657+ require .NotNil (t , resp )
1658+ events = append (events , resp .Events ... )
1659+ }
1660+ // Last response header revision should match mod revision of last returned event
1661+ assert .Equal (t , rev [len (rev )- 1 ], events [len (events )- 1 ].Kv .ModRevision )
1662+ }
0 commit comments