feat(loki.process): Add forward_to to stage.metrics for direct metric export #5022
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
PR Description
This PR adds a
forward_toargument to thestage.metricsblock inloki.process, enabling metrics generated from log processing to be forwarded directly to remote storage components (likeprometheus.remote_write) instead of being exposed at Alloy's local/metricsendpoint.This feature is useful when you want to derive metrics from logs and send them directly to a Prometheus backend without cluttering the local metrics endpoint.
Key Changes
New Arguments:
forward_to(list(MetricsReceiver)) - List of receivers to forward generated metrics tometrics_flush_interval(duration) - Configurable interval for flushing metrics to the receivers (default: 60s)Behaviour:
metrics_flush_interval/metricsendpoint)Which issue(s) this PR fixes
Fixes #4779
Notes to the Reviewer
Implementation Details
Prometheus Fanout Integration:
The
storage.Appendabletypeforward_toargument usesprometheus.Fanoutto send generated metrics to multiple receivers. The fanout requires aLabelStorewhich caches the storage reference returned by append calls to prevent rehashing of labels every time a metric with the same label set is appended. TheLabelStoreis passed down from theloki.processcomponent to the pipeline and then to the metrics stage.Pipeline Signature Updates:
Any stage that creates a new pipeline internally (e.g.,
stage.match) also needed theLabelStoreinstance. These stages have been modified to accept theLabelStoreargument, which required updating various tests to follow the newNewPipelinemethod signature.Inspiration from spanmetrics:
Took inspiration from
otelcol.connector.spanmetricsfor themetrics_flush_intervalpattern to configure the interval for flushing metric data.Conditional Registry Registration:
Uses the existing collector structure and now conditionally registers metrics to the global registry based on whether
forward_tois set.Metric Flushing (
flushMetrics):*dto.LabelPairtolabels.Labelslices__name__label with the metric name_bucket,_sum, and_countmetric suffixes appropriatelyMetadata Limitation:
The
flushMetricsfunction callsUpdateMetadataon the appender, but this currently does not work as expected. The remote write protocol does not support updating metadata for appended metrics. This is a known limitation tracked in #547. Please let me know if there is a way around this so that egenrated metric's metadata can also be sent to the remote endpoint.Data Race Prevention:
Modified the metric collectors (
Counters,Gauges,Histograms) to useatomic.Int64for thelastModSecfield. This prevents data races between updating metrics in theProcessmethod and theCollectcall in the separate flush metrics goroutine.Testing
Apart from the unit test also manually tested with the following configuration against local Loki and Prometheus instances:
logging { level = "debug" format = "logfmt" } loki.source.file "tmpfiles" { targets = [ {__path__ = "/path/to/test-service.log"}, ] forward_to = [loki.process.test_service.receiver] tail_from_end = true } loki.process "test_service" { forward_to = [loki.write.test_service_logs.receiver] stage.regex { expression = "level=(?P<level>\\S+).*status=(?P<status>\\S+).*latency_ms=(?P<latency>\\S+)" } stage.metrics { metric.counter { name = "test_service_requests_total" match_all = true description = "Total number of requests" action = "inc" } metric.histogram { name = "test_service_request_latency_ms" description = "Request latency in milliseconds" source = "latency" buckets = [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000] } metric.gauge { name = "test_service_request_status_gauge" description = "Request status gauge" source = "status" action = "set" } forward_to = [prometheus.remote_write.test_service_metrics.receiver] metrics_flush_interval = "5s" } } loki.write "test_service_logs" { endpoint { url = "http://localhost:3100/loki/api/v1/push" } } prometheus.remote_write "test_service_metrics" { endpoint { url = "http://localhost:9090/api/v1/write" } }PR Checklist