[r382] blockbuilder: fix how kafka reader metrics are instantiated#14384
[r382] blockbuilder: fix how kafka reader metrics are instantiated#14384mimir-github-bot[bot] wants to merge 1 commit intor382from
Conversation
#### What this PR does This PR improves how the block-builder instantiates its copy of the Kafka reader metrics. That is, the `ingest.ReaderMetrics` is a service, that internally pulls the observations from its underlying metrics source. Because the block-builder never starts the service, some metrics — e.g. `cortex_ingest_storage_reader_estimated_bytes_per_record` — are never populated. The PR addresses that with the following changes: 1. make sure the source inside the `swappableReaderMetricsSource` is safe to be observed concurrently 2. make the block-builder to start / stop the `ReaderMetrics` service Also, fixed a small akwardness in the NewReaderMetrics, where the `kpromMetrics` argument could be `nil`. #### Which issue(s) this PR fixes or relates to Relates to https://github.com/grafana/mimir-squad/issues/3529 --------- Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com> (cherry picked from commit 5cda4ce)
| readerMetricsSource, | ||
| cfg.Kafka.Topic, | ||
| kpm, | ||
| ) |
There was a problem hiding this comment.
Duplicate component label causes panic at registration
High Severity
The block builder wraps the registerer with prometheus.Labels{"component": "block-builder"}, but NewReaderMetrics internally calls NewStrongReadConsistencyInstrumentation with the hardcoded const component = "partition-reader", which applies ConstLabels: map[string]string{"component": component} to each metric. When registering these metrics through the wrapped registerer, wrapDesc detects the duplicate component label name and produces an error Desc, causing promauto's MustRegister to panic. This crashes the block builder at construction time whenever FetchConcurrencyMax > 0.
Additional Locations (1)
| } | ||
|
|
||
| if b.schedulerConn != nil { | ||
| return b.schedulerConn.Close() |
There was a problem hiding this comment.
Leaked readerMetrics service if kafka client creation fails
Low Severity
If readerMetrics is successfully started (line 163–168) but NewKafkaReaderClient fails (line 170–177), starting returns an error. Per dskit's BasicService contract, stopping is not called when starting returns an error, so the running readerMetrics TimerService goroutine is never stopped. This is a resource/goroutine leak on startup failure.
|
Bugbot Autofix prepared fixes for 2 of the 2 bugs found in the latest run.
Or push these changes by commenting: Preview (1cdb635d50)diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go
--- a/pkg/blockbuilder/blockbuilder.go
+++ b/pkg/blockbuilder/blockbuilder.go
@@ -79,12 +79,7 @@
var readerMetrics *ingest.ReaderMetrics
if cfg.Kafka.FetchConcurrencyMax > 0 {
- m := ingest.NewReaderMetrics(
- prometheus.WrapRegistererWith(prometheus.Labels{"component": "block-builder"}, reg),
- readerMetricsSource,
- cfg.Kafka.Topic,
- kpm,
- )
+ m := ingest.NewReaderMetrics(reg, readerMetricsSource, cfg.Kafka.Topic, kpm)
readerMetrics = &m
}
@@ -167,6 +162,14 @@
}
}
+ // Ensure readerMetrics is stopped if starting fails after this point.
+ // dskit's BasicService does not call stopping when starting returns an error.
+ defer func() {
+ if err != nil && b.readerMetrics != nil {
+ _ = services.StopAndAwaitTerminated(context.Background(), b.readerMetrics)
+ }
+ }()
+
b.kafkaClient, err = ingest.NewKafkaReaderClient(
b.cfg.Kafka,
b.kpromMetrics, |
|
Will have to add the changes from #14385. Will amend the backport after testing |



Backport 5cda4ce from #14383
Note
Medium Risk
Touches Kafka reader metrics wiring and lifecycle (start/stop) plus adds concurrency protection around a shared metrics source, which could affect metric reporting or introduce subtle runtime issues if mis-handled.
Overview
Fixes how Kafka reader metrics are instantiated and managed for the block-builder when concurrent fetching is enabled.
BlockBuildernow starts/stops theingest.ReaderMetricsservice with the component lifecycle, registers its metrics under acomponent="block-builder"label, and uses an explicitkprommetrics instance rather than implicit/nil defaults. The swappable metrics source used by concurrent fetchers is reworked to be thread-safe (RW mutex + nil-safe getters) instead of swapping an embedded interface.Updates ingest tests and
PartitionReaderto always create and passkprommetrics intoNewReaderMetrics, and adjusts tests to initializekpromdirectly viaOnNewClient.Written by Cursor Bugbot for commit c8570a6. This will update automatically on new commits. Configure here.