@@ -93,9 +93,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
93
93
@ ParameterizedTest (name = TestInfoUtils .TestWithParameterizedQuorumAndGroupProtocolNames )
94
94
@ MethodSource (Array (" getTestQuorumAndGroupProtocolParametersAll" ))
95
95
def testThrottledProducerConsumer (quorum : String , groupProtocol : String ): Unit = {
96
- val numRecords = 1000
97
- val produced = quotaTestClients.produceUntilThrottled(numRecords )
98
- assertTrue(produced < numRecords , " Produced records should less than max records" )
96
+ val maxRecords = 1000
97
+ val produced = quotaTestClients.produceUntilThrottled(maxRecords )
98
+ assertTrue(produced < maxRecords , " Produced records should less than max records" )
99
99
quotaTestClients.verifyProduceThrottle(expectThrottle = true )
100
100
101
101
// Consumer should read in a bursty manner and get throttled immediately
@@ -114,12 +114,12 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
114
114
quotaTestClients.overrideQuotas(Long .MaxValue , Long .MaxValue , Long .MaxValue .toDouble)
115
115
quotaTestClients.waitForQuotaUpdate(Long .MaxValue , Long .MaxValue , Long .MaxValue .toDouble)
116
116
117
- val numRecords = 1000
118
- assertEquals(numRecords , quotaTestClients.produceUntilThrottled(numRecords ))
117
+ val maxRecords = 1000
118
+ assertEquals(maxRecords , quotaTestClients.produceUntilThrottled(maxRecords ))
119
119
quotaTestClients.verifyProduceThrottle(expectThrottle = false )
120
120
121
121
// The "client" consumer does not get throttled.
122
- assertEquals(numRecords , quotaTestClients.consumeUntilThrottled(numRecords ))
122
+ assertEquals(maxRecords , quotaTestClients.consumeUntilThrottled(maxRecords ))
123
123
quotaTestClients.verifyConsumeThrottle(expectThrottle = false )
124
124
}
125
125
@@ -132,9 +132,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
132
132
quotaTestClients.overrideQuotas(2000 , 250 , Long .MaxValue .toDouble)
133
133
quotaTestClients.waitForQuotaUpdate(2000 , 250 , Long .MaxValue .toDouble)
134
134
135
- val numRecords = 1000
136
- val produced = quotaTestClients.produceUntilThrottled(numRecords )
137
- assertTrue(produced < numRecords , " Produced records should less than max records" )
135
+ val maxRecords = 1000
136
+ val produced = quotaTestClients.produceUntilThrottled(maxRecords )
137
+ assertTrue(produced < maxRecords , " Produced records should less than max records" )
138
138
quotaTestClients.verifyProduceThrottle(expectThrottle = true )
139
139
140
140
// Consumer should be able to consume at least one record, even when throttled
@@ -149,24 +149,24 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
149
149
quotaTestClients.overrideQuotas(Long .MaxValue , Long .MaxValue , Long .MaxValue .toDouble)
150
150
quotaTestClients.waitForQuotaUpdate(Long .MaxValue , Long .MaxValue , Long .MaxValue .toDouble)
151
151
152
- val numRecords = 1000
153
- assertEquals(numRecords , quotaTestClients.produceUntilThrottled(numRecords ))
152
+ val maxRecords = 1000
153
+ assertEquals(maxRecords , quotaTestClients.produceUntilThrottled(maxRecords ))
154
154
quotaTestClients.verifyProduceThrottle(expectThrottle = false )
155
- assertEquals(numRecords , quotaTestClients.consumeUntilThrottled(numRecords ))
155
+ assertEquals(maxRecords , quotaTestClients.consumeUntilThrottled(maxRecords ))
156
156
quotaTestClients.verifyConsumeThrottle(expectThrottle = false )
157
157
158
158
// Delete producer and consumer quota overrides. Consumer and producer should now be
159
159
// throttled since broker defaults are very small
160
160
quotaTestClients.removeQuotaOverrides()
161
161
quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
162
- val produced = quotaTestClients.produceUntilThrottled(numRecords )
163
- assertTrue(produced < numRecords , " Produced records should less than max records" )
162
+ val produced = quotaTestClients.produceUntilThrottled(maxRecords )
163
+ assertTrue(produced < maxRecords , " Produced records should less than max records" )
164
164
quotaTestClients.verifyProduceThrottle(expectThrottle = true )
165
165
166
166
// Since producer may have been throttled after producing a couple of records,
167
167
// consume from beginning till throttled
168
168
quotaTestClients.consumer.seekToBeginning(Collections .singleton(new TopicPartition (topic1, 0 )))
169
- quotaTestClients.consumeUntilThrottled(numRecords + produced)
169
+ quotaTestClients.consumeUntilThrottled(maxRecords + produced)
170
170
quotaTestClients.verifyConsumeThrottle(expectThrottle = true )
171
171
}
172
172
0 commit comments