Skip to content

Commit ce75605

Browse files
committed
fix test
1 parent 05656b4 commit ce75605

File tree

1 file changed

+28
-7
lines changed

1 file changed

+28
-7
lines changed

Diff for: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

+28-7
Original file line numberDiff line numberDiff line change
@@ -3033,7 +3033,7 @@ public void testPollTimeMetrics(GroupProtocol groupProtocol) {
30333033

30343034
@ParameterizedTest
30353035
@EnumSource(GroupProtocol.class)
3036-
public void testPollIdleRatio(GroupProtocol groupProtocol) {
3036+
public void testPollIdleRatio(GroupProtocol groupProtocol) {
30373037
ConsumerMetadata metadata = createMetadata(subscription);
30383038
MockClient client = new MockClient(time, metadata);
30393039
initMetadata(client, Collections.singletonMap(topic, 1));
@@ -3046,21 +3046,24 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
30463046
assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue());
30473047

30483048
// 1st poll
3049-
// Spend 50ms in poll so value = 1.0
3049+
// Spend 50ms in poll. value=NaN because "the fraction of time spent inside poll" is undefined until the polling interval has an end point,
3050+
// which is only known once the second poll starts.
3051+
// This also means the metric will always exclude the latest poll, since we don't know how much time is spent outside poll for that interval
3052+
// until poll is called again
30503053
consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
30513054
time.sleep(50);
30523055
consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
30533056

3054-
assertEquals(1.0d, consumer.metrics().get(pollIdleRatio).metricValue());
3057+
assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue());
30553058

30563059
// 2nd poll
3057-
// Spend 50m outside poll and 0ms in poll so value = 0.0
3060+
// Spend 50m outside poll and 0ms in poll.
30583061
time.sleep(50);
30593062
consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
30603063
consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
30613064

3062-
// Avg of first two data points
3063-
assertEquals((1.0d + 0.0d) / 2, consumer.metrics().get(pollIdleRatio).metricValue());
3065+
// Avg of first single data point where we spent half of the time inside poll
3066+
assertEquals(0.5, consumer.metrics().get(pollIdleRatio).metricValue());
30643067

30653068
// 3rd poll
30663069
// Spend 25ms outside poll and 25ms in poll so value = 0.5
@@ -3069,8 +3072,26 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
30693072
time.sleep(25);
30703073
consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
30713074

3075+
// Avg of two data points
3076+
assertEquals((0.5d + 0.0d) / 2, consumer.metrics().get(pollIdleRatio).metricValue());
3077+
3078+
// 4th poll
3079+
// Spend 0ms inside and outside poll.
3080+
consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
3081+
consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
3082+
30723083
// Avg of three data points
3073-
assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue());
3084+
assertEquals((0.5d + 0.0d + 1.0d) / 3, consumer.metrics().get(pollIdleRatio).metricValue());
3085+
3086+
for (int i = 0; i < 5; i++) {
3087+
time.sleep(10);
3088+
consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
3089+
time.sleep(10);
3090+
consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
3091+
}
3092+
3093+
// Avg of the previous data points, plus the 0ms poll, plus the first 4 10ms polls.
3094+
assertEquals((0.5d + 0.0d + 1.0d + 0.0d + 0.5d * 4) / (3 + 1 + 4), consumer.metrics().get(pollIdleRatio).metricValue());
30743095
}
30753096

30763097
private static boolean consumerMetricPresent(KafkaConsumer<String, String> consumer, String name) {

0 commit comments

Comments
 (0)