diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index 5657df9a0d52c..7a27987c5fdea 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -70,7 +70,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") // Low enough quota that a producer sending a small payload in a tight loop should get throttled - val defaultProducerQuota: Long = 8000 + val defaultProducerQuota: Long = 6000 val defaultConsumerQuota: Long = 2500 val defaultRequestQuota: Double = Long.MaxValue.toDouble @@ -93,8 +93,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testThrottledProducerConsumer(quorum: String, groupProtocol: String): Unit = { - val numRecords = 1000 - val produced = quotaTestClients.produceUntilThrottled(numRecords) + val maxRecords = 1000 + val produced = quotaTestClients.produceUntilThrottled(maxRecords) + assertTrue(produced < maxRecords, "Produced records should less than max records") quotaTestClients.verifyProduceThrottle(expectThrottle = true) // Consumer should read in a bursty manner and get throttled immediately @@ -113,12 +114,12 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble) quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble) - val numRecords = 1000 - assertEquals(numRecords, quotaTestClients.produceUntilThrottled(numRecords)) + val maxRecords = 1000 + assertEquals(maxRecords, quotaTestClients.produceUntilThrottled(maxRecords)) quotaTestClients.verifyProduceThrottle(expectThrottle = false) // The "client" consumer does not get throttled. - assertEquals(numRecords, quotaTestClients.consumeUntilThrottled(numRecords)) + assertEquals(maxRecords, quotaTestClients.consumeUntilThrottled(maxRecords)) quotaTestClients.verifyConsumeThrottle(expectThrottle = false) } @@ -131,8 +132,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { quotaTestClients.overrideQuotas(2000, 250, Long.MaxValue.toDouble) quotaTestClients.waitForQuotaUpdate(2000, 250, Long.MaxValue.toDouble) - val numRecords = 1000 - val produced = quotaTestClients.produceUntilThrottled(numRecords) + val maxRecords = 1000 + val produced = quotaTestClients.produceUntilThrottled(maxRecords) + assertTrue(produced < maxRecords, "Produced records should less than max records") quotaTestClients.verifyProduceThrottle(expectThrottle = true) // Consumer should be able to consume at least one record, even when throttled @@ -147,23 +149,24 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble) quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble) - val numRecords = 1000 - assertEquals(numRecords, quotaTestClients.produceUntilThrottled(numRecords)) + val maxRecords = 1000 + assertEquals(maxRecords, quotaTestClients.produceUntilThrottled(maxRecords)) quotaTestClients.verifyProduceThrottle(expectThrottle = false) - assertEquals(numRecords, quotaTestClients.consumeUntilThrottled(numRecords)) + assertEquals(maxRecords, quotaTestClients.consumeUntilThrottled(maxRecords)) quotaTestClients.verifyConsumeThrottle(expectThrottle = false) // Delete producer and consumer quota overrides. Consumer and producer should now be // throttled since broker defaults are very small quotaTestClients.removeQuotaOverrides() quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota) - val produced = quotaTestClients.produceUntilThrottled(numRecords) + val produced = quotaTestClients.produceUntilThrottled(maxRecords) + assertTrue(produced < maxRecords, "Produced records should less than max records") quotaTestClients.verifyProduceThrottle(expectThrottle = true) // Since producer may have been throttled after producing a couple of records, // consume from beginning till throttled quotaTestClients.consumer.seekToBeginning(Collections.singleton(new TopicPartition(topic1, 0))) - quotaTestClients.consumeUntilThrottled(numRecords + produced) + quotaTestClients.consumeUntilThrottled(maxRecords + produced) quotaTestClients.verifyConsumeThrottle(expectThrottle = true) }