Skip to content

KAFKA-18638 fix flasky testThrottledProducerConsumer.testThrottledProducerConsumer #18689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")

// Low enough quota that a producer sending a small payload in a tight loop should get throttled
val defaultProducerQuota: Long = 8000
val defaultProducerQuota: Long = 6000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we be confident that 6000 is a stable limit? Would you be able to share the testing results from your local environment that validate this?

val defaultConsumerQuota: Long = 2500
val defaultRequestQuota: Double = Long.MaxValue.toDouble

Expand All @@ -93,8 +93,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testThrottledProducerConsumer(quorum: String, groupProtocol: String): Unit = {
val numRecords = 1000
val produced = quotaTestClients.produceUntilThrottled(numRecords)
val maxRecords = 1000
val produced = quotaTestClients.produceUntilThrottled(maxRecords)
assertTrue(produced < maxRecords, "Produced records should less than max records")
quotaTestClients.verifyProduceThrottle(expectThrottle = true)

// Consumer should read in a bursty manner and get throttled immediately
Expand All @@ -113,12 +114,12 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble)
quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble)

val numRecords = 1000
assertEquals(numRecords, quotaTestClients.produceUntilThrottled(numRecords))
val maxRecords = 1000
assertEquals(maxRecords, quotaTestClients.produceUntilThrottled(maxRecords))
quotaTestClients.verifyProduceThrottle(expectThrottle = false)

// The "client" consumer does not get throttled.
assertEquals(numRecords, quotaTestClients.consumeUntilThrottled(numRecords))
assertEquals(maxRecords, quotaTestClients.consumeUntilThrottled(maxRecords))
quotaTestClients.verifyConsumeThrottle(expectThrottle = false)
}

Expand All @@ -131,8 +132,9 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
quotaTestClients.overrideQuotas(2000, 250, Long.MaxValue.toDouble)
quotaTestClients.waitForQuotaUpdate(2000, 250, Long.MaxValue.toDouble)

val numRecords = 1000
val produced = quotaTestClients.produceUntilThrottled(numRecords)
val maxRecords = 1000
val produced = quotaTestClients.produceUntilThrottled(maxRecords)
assertTrue(produced < maxRecords, "Produced records should less than max records")
quotaTestClients.verifyProduceThrottle(expectThrottle = true)

// Consumer should be able to consume at least one record, even when throttled
Expand All @@ -147,23 +149,24 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble)
quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, Long.MaxValue.toDouble)

val numRecords = 1000
assertEquals(numRecords, quotaTestClients.produceUntilThrottled(numRecords))
val maxRecords = 1000
assertEquals(maxRecords, quotaTestClients.produceUntilThrottled(maxRecords))
quotaTestClients.verifyProduceThrottle(expectThrottle = false)
assertEquals(numRecords, quotaTestClients.consumeUntilThrottled(numRecords))
assertEquals(maxRecords, quotaTestClients.consumeUntilThrottled(maxRecords))
quotaTestClients.verifyConsumeThrottle(expectThrottle = false)

// Delete producer and consumer quota overrides. Consumer and producer should now be
// throttled since broker defaults are very small
quotaTestClients.removeQuotaOverrides()
quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
val produced = quotaTestClients.produceUntilThrottled(numRecords)
val produced = quotaTestClients.produceUntilThrottled(maxRecords)
assertTrue(produced < maxRecords, "Produced records should less than max records")
quotaTestClients.verifyProduceThrottle(expectThrottle = true)

// Since producer may have been throttled after producing a couple of records,
// consume from beginning till throttled
quotaTestClients.consumer.seekToBeginning(Collections.singleton(new TopicPartition(topic1, 0)))
quotaTestClients.consumeUntilThrottled(numRecords + produced)
quotaTestClients.consumeUntilThrottled(maxRecords + produced)
quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
}

Expand Down
Loading