@@ -3085,7 +3085,7 @@ public void testPollTimeMetrics(GroupProtocol groupProtocol) {
3085
3085
3086
3086
@ ParameterizedTest
3087
3087
@ EnumSource (GroupProtocol .class )
3088
- public void testPollIdleRatio (GroupProtocol groupProtocol ) {
3088
+ public void testPollIdleRatio (GroupProtocol groupProtocol ) {
3089
3089
ConsumerMetadata metadata = createMetadata (subscription );
3090
3090
MockClient client = new MockClient (time , metadata );
3091
3091
initMetadata (client , Collections .singletonMap (topic , 1 ));
@@ -3098,31 +3098,53 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
3098
3098
assertEquals (Double .NaN , consumer .metrics ().get (pollIdleRatio ).metricValue ());
3099
3099
3100
3100
// 1st poll
3101
- // Spend 50ms in poll so value = 1.0
3101
+ // Spend 50ms in poll. value=NaN because "the fraction of time spent inside poll" is undefined until the polling interval has an end point,
3102
+ // which is only known once the second poll starts.
3103
+ // This also means the metric will always exclude the latest poll, since we don't know how much time is spent outside poll for that interval
3104
+ // until poll is called again
3102
3105
consumer .kafkaConsumerMetrics ().recordPollStart (time .milliseconds ());
3103
3106
time .sleep (50 );
3104
3107
consumer .kafkaConsumerMetrics ().recordPollEnd (time .milliseconds ());
3105
3108
3106
- assertEquals (1.0d , consumer .metrics ().get (pollIdleRatio ).metricValue ());
3109
+ assertEquals (Double . NaN , consumer .metrics ().get (pollIdleRatio ).metricValue ());
3107
3110
3108
3111
// 2nd poll
3109
- // Spend 50m outside poll and 0ms in poll so value = 0.0
3112
+ // Spend 50m outside poll and 0ms in poll. value=0.5 from the previous poll plus the time until this poll starts.
3110
3113
time .sleep (50 );
3111
3114
consumer .kafkaConsumerMetrics ().recordPollStart (time .milliseconds ());
3112
3115
consumer .kafkaConsumerMetrics ().recordPollEnd (time .milliseconds ());
3113
3116
3114
- // Avg of first two data points
3115
- assertEquals (( 1.0d + 0.0d ) / 2 , consumer .metrics ().get (pollIdleRatio ).metricValue ());
3117
+ // Avg of first single data point where we spent half of the time inside poll
3118
+ assertEquals (0.5 , consumer .metrics ().get (pollIdleRatio ).metricValue ());
3116
3119
3117
3120
// 3rd poll
3118
- // Spend 25ms outside poll and 25ms in poll so value = 0.5
3121
+ // Spend 25ms outside poll and 25ms in poll. value=0.0 from the previous empty poll.
3119
3122
time .sleep (25 );
3120
3123
consumer .kafkaConsumerMetrics ().recordPollStart (time .milliseconds ());
3121
3124
time .sleep (25 );
3122
3125
consumer .kafkaConsumerMetrics ().recordPollEnd (time .milliseconds ());
3123
3126
3127
+ // Avg of two data points
3128
+ assertEquals ((0.5d + 0.0d ) / 2 , consumer .metrics ().get (pollIdleRatio ).metricValue ());
3129
+
3130
+ // 4th poll
3131
+ // Spend 0ms inside and outside poll. value = 1.0 since the last poll spent 25ms inside poll and 0ms outside afterward.
3132
+ consumer .kafkaConsumerMetrics ().recordPollStart (time .milliseconds ());
3133
+ consumer .kafkaConsumerMetrics ().recordPollEnd (time .milliseconds ());
3134
+
3124
3135
// Avg of three data points
3125
- assertEquals ((1.0d + 0.0d + 0.5d ) / 3 , consumer .metrics ().get (pollIdleRatio ).metricValue ());
3136
+ assertEquals ((0.5d + 0.0d + 1.0d ) / 3 , consumer .metrics ().get (pollIdleRatio ).metricValue ());
3137
+
3138
+ // Spend 10ms inside and outside poll 5 times. value = 0.0 the first time, then 0.5 after that.
3139
+ for (int i = 0 ; i < 5 ; i ++) {
3140
+ time .sleep (10 );
3141
+ consumer .kafkaConsumerMetrics ().recordPollStart (time .milliseconds ());
3142
+ time .sleep (10 );
3143
+ consumer .kafkaConsumerMetrics ().recordPollEnd (time .milliseconds ());
3144
+ }
3145
+
3146
+ // Avg of the previous data points, plus the 0ms poll, plus the first 4 10ms polls.
3147
+ assertEquals ((0.5d + 0.0d + 1.0d + 0.0d + 0.5d * 4 ) / (3 + 1 + 4 ), consumer .metrics ().get (pollIdleRatio ).metricValue ());
3126
3148
}
3127
3149
3128
3150
private static boolean consumerMetricPresent (KafkaConsumer <String , String > consumer , String name ) {
0 commit comments