-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-17503: Fix incorrect calculation of poll-idle-ratio-avg for con… #17139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,7 @@ public class KafkaConsumerMetrics implements AutoCloseable { | |
| private final Sensor commitSyncSensor; | ||
| private long lastPollMs; | ||
| private long pollStartMs; | ||
| private long pollEndMs; | ||
| private long timeSinceLastPollMs; | ||
|
|
||
| public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) { | ||
|
|
@@ -91,16 +92,20 @@ public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) { | |
| } | ||
|
|
||
| public void recordPollStart(long pollStartMs) { | ||
| if (this.pollEndMs != 0) { | ||
| long pollIntervalMs = pollStartMs - this.pollStartMs; | ||
| long pollTimeMs = this.pollEndMs - this.pollStartMs; | ||
| double pollIdleRatio = pollTimeMs * 1.0 / pollIntervalMs; | ||
| this.pollIdleSensor.record(pollIdleRatio); | ||
| } | ||
| this.pollStartMs = pollStartMs; | ||
| this.timeSinceLastPollMs = lastPollMs != 0L ? pollStartMs - lastPollMs : 0; | ||
| this.timeBetweenPollSensor.record(timeSinceLastPollMs); | ||
| this.lastPollMs = pollStartMs; | ||
| } | ||
|
|
||
| public void recordPollEnd(long pollEndMs) { | ||
| long pollTimeMs = pollEndMs - pollStartMs; | ||
| double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs); | ||
| this.pollIdleSensor.record(pollIdleRatio); | ||
| this.pollEndMs = pollEndMs; | ||
|
||
| } | ||
|
|
||
| public void recordCommitSync(long duration) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3085,7 +3085,7 @@ public void testPollTimeMetrics(GroupProtocol groupProtocol) { | |
|
|
||
| @ParameterizedTest | ||
| @EnumSource(GroupProtocol.class) | ||
| public void testPollIdleRatio(GroupProtocol groupProtocol) { | ||
| public void testPollIdleRatio(GroupProtocol groupProtocol) { | ||
| ConsumerMetadata metadata = createMetadata(subscription); | ||
| MockClient client = new MockClient(time, metadata); | ||
| initMetadata(client, Collections.singletonMap(topic, 1)); | ||
|
|
@@ -3098,31 +3098,53 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) { | |
| assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue()); | ||
|
|
||
| // 1st poll | ||
| // Spend 50ms in poll so value = 1.0 | ||
| // Spend 50ms in poll. value=NaN because "the fraction of time spent inside poll" is undefined until the polling interval has an end point, | ||
|
||
| // which is only known once the second poll starts. | ||
| // This also means the metric will always exclude the latest poll, since we don't know how much time is spent outside poll for that interval | ||
| // until poll is called again | ||
|
||
| consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds()); | ||
| time.sleep(50); | ||
| consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds()); | ||
|
|
||
| assertEquals(1.0d, consumer.metrics().get(pollIdleRatio).metricValue()); | ||
| assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue()); | ||
|
|
||
| // 2nd poll | ||
| // Spend 50m outside poll and 0ms in poll so value = 0.0 | ||
| // Spend 50m outside poll and 0ms in poll. value=0.5 from the previous poll plus the time until this poll starts. | ||
| time.sleep(50); | ||
| consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds()); | ||
| consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds()); | ||
|
|
||
| // Avg of first two data points | ||
| assertEquals((1.0d + 0.0d) / 2, consumer.metrics().get(pollIdleRatio).metricValue()); | ||
| // Avg of first single data point where we spent half of the time inside poll | ||
| assertEquals(0.5, consumer.metrics().get(pollIdleRatio).metricValue()); | ||
|
|
||
| // 3rd poll | ||
| // Spend 25ms outside poll and 25ms in poll so value = 0.5 | ||
| // Spend 25ms outside poll and 25ms in poll. value=0.0 from the previous empty poll. | ||
| time.sleep(25); | ||
| consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds()); | ||
| time.sleep(25); | ||
| consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds()); | ||
|
|
||
| // Avg of two data points | ||
| assertEquals((0.5d + 0.0d) / 2, consumer.metrics().get(pollIdleRatio).metricValue()); | ||
|
|
||
| // 4th poll | ||
| // Spend 0ms inside and outside poll. value = 1.0 since the last poll spent 25ms inside poll and 0ms outside afterward. | ||
| consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds()); | ||
| consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds()); | ||
|
|
||
| // Avg of three data points | ||
| assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue()); | ||
| assertEquals((0.5d + 0.0d + 1.0d) / 3, consumer.metrics().get(pollIdleRatio).metricValue()); | ||
|
|
||
| // Spend 10ms inside and outside poll 5 times. value = 0.0 the first time, then 0.5 after that. | ||
| for (int i = 0; i < 5; i++) { | ||
| time.sleep(10); | ||
| consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds()); | ||
| time.sleep(10); | ||
| consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds()); | ||
| } | ||
|
|
||
| // Avg of the previous data points, plus the 0ms poll, plus the first 4 10ms polls. | ||
| assertEquals((0.5d + 0.0d + 1.0d + 0.0d + 0.5d * 4) / (3 + 1 + 4), consumer.metrics().get(pollIdleRatio).metricValue()); | ||
| } | ||
|
|
||
| private static boolean consumerMetricPresent(KafkaConsumer<String, String> consumer, String name) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method would benefit from some comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add some, though I'll likely move these updates into recordPollEnd along with changing the polling interval to be based on the end timestamps instead of the start timestamps.
The poll(0ms) case you're thinking of I assume is regarding division by 0 here?