Skip to content

Commit bb60ca6

Browse files
authored
feat: Add kafka/consumer/pollIdleRatio metric. (apache#19366)
The metric corresponds to the Kafka consumer metric "poll-idle-ratio-avg".
1 parent 2de75ab commit bb60ca6

5 files changed

Lines changed: 10 additions & 0 deletions

File tree

extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
"kafka/consumer/recordsPerRequestAvg" : { "dimensions" : ["topic"], "type" : "gauge", "help": "Average records per fetch request as seen by the consumer of a Kafka indexing task (per topic)."},
9898
"kafka/consumer/outgoingBytes" : { "dimensions" : ["node_id"], "type" : "count", "help": "Bytes sent to Kafka brokers by the consumer of a Kafka indexing task (per node)."},
9999
"kafka/consumer/incomingBytes" : { "dimensions" : ["node_id"], "type" : "count", "help": "Bytes received from Kafka brokers by the consumer of a Kafka indexing task (per node)."},
100+
"kafka/consumer/pollIdleRatio" : { "dimensions" : [], "type" : "gauge", "help": "Average fraction of time the consumer of a Kafka indexing task spent idle (not in poll). 0 means never idle, 1 means always idle."},
100101

101102
"ingest/count" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Count of 1 every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions." },
102103
"ingest/segments/count" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Count of final segments created by job (includes tombstones)." },

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ public class KafkaConsumerMonitor extends AbstractMonitor
127127
"kafka/consumer/incomingBytes",
128128
Set.of(CLIENT_ID_TAG, NODE_ID_TAG),
129129
KafkaConsumerMetric.MetricType.COUNTER
130+
),
131+
new KafkaConsumerMetric(
132+
POLL_IDLE_RATIO_METRIC_NAME,
133+
"kafka/consumer/pollIdleRatio",
134+
Set.of(CLIENT_ID_TAG),
135+
KafkaConsumerMetric.MetricType.GAUGE
130136
)
131137
).collect(Collectors.toMap(KafkaConsumerMetric::getKafkaMetricName, Function.identity()));
132138

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ public void testPoll() throws InterruptedException, ExecutionException
484484
emitter.verifyEmitted("kafka/consumer/recordsPerRequestAvg", 1);
485485
emitter.verifyEmitted("kafka/consumer/incomingBytes", 2);
486486
emitter.verifyEmitted("kafka/consumer/outgoingBytes", 2);
487+
emitter.verifyEmitted("kafka/consumer/pollIdleRatio", 1);
487488

488489
recordSupplier.close();
489490
Assert.assertFalse(monitor.monitor(emitter));

processing/src/main/resources/loggingEmitterAllowedMetrics.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
"kafka/consumer/fetchSizeMax": [],
7777
"kafka/consumer/incomingBytes": [],
7878
"kafka/consumer/outgoingBytes": [],
79+
"kafka/consumer/pollIdleRatio": [],
7980
"kafka/consumer/recordsConsumed": [],
8081
"kafka/consumer/recordsLag": [],
8182
"kafka/consumer/recordsPerRequestAvg": [],

processing/src/test/resources/loggingEmitterAllowedMetrics.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
"kafka/consumer/fetchSizeMax": [],
7777
"kafka/consumer/incomingBytes": [],
7878
"kafka/consumer/outgoingBytes": [],
79+
"kafka/consumer/pollIdleRatio": [],
7980
"kafka/consumer/recordsConsumed": [],
8081
"kafka/consumer/recordsLag": [],
8182
"kafka/consumer/recordsPerRequestAvg": [],

0 commit comments

Comments
 (0)