Skip to content

Commit 9ae3118

Browse files
committed
Report current_offset metric for incomplete partitions
For consumers of low volume topic-partitions, it can take a very long time for enough offset commits to happen before a partition is marked as "complete". For the `burrow_kafka_consumer_current_offset` metric to be reported, only a single committed offset is required.
1 parent 8690c58 commit 9ae3118

File tree

3 files changed

+5
-3
lines changed

3 files changed

+5
-3
lines changed

core/internal/evaluator/caching.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func evaluatePartitionStatus(partition *protocol.ConsumerPartition, minimumCompl
281281
}
282282

283283
// Slice the offsets to remove all nil entries (they'll be at the start)
284-
firstOffset := len(partition.Offsets) - 1
284+
firstOffset := len(partition.Offsets) // defaults to the length, so if all offsets are nil we make an empty slice
285285
for i, offset := range partition.Offsets {
286286
if offset != nil {
287287
firstOffset = i

core/internal/httpserver/prometheus.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,10 @@ func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc {
137137

138138
consumerPartitionLagGauge.With(labels).Set(float64(partition.CurrentLag))
139139

140-
if partition.Complete == 1.0 {
140+
if partition.Complete > 0.0 {
141141
consumerPartitionCurrentOffset.With(labels).Set(float64(partition.End.Offset))
142+
}
143+
if partition.Complete == 1.0 {
142144
partitionStatusGauge.With(labels).Set(float64(partition.Status))
143145
}
144146
}

core/internal/httpserver/prometheus_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func TestHttpServer_handlePrometheusMetrics(t *testing.T) {
158158
assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 22663`)
159159
assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 2488`)
160160
assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 99888`)
161-
assert.NotContains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="incomplete"} 5335`)
161+
assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="incomplete"} 5335`)
162162
assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="1",topic="incomplete"} 99888`)
163163

164164
assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="0",topic="testtopic"} 6556`)

0 commit comments

Comments
 (0)