Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Main (unreleased)

### Enhancements

- Add `forward_to` argument to the `stage.metrics` block in `loki.process` to forward metrics to remote storage components like `prometheus.remote_write` instead of exposing them at the local `/metrics` endpoint. (@harshrai654)

- update promtail converter to use `file_match` block for `loki.source.file` instead of going through `local.file_match`. (@kalleep)

- Add `send_traceparent` option for `tracing` config to enable traceparent header propagation. (@MyDigitalLife)
Expand Down
49 changes: 48 additions & 1 deletion docs/sources/reference/components/loki/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,21 @@ The final output stage changes the contents of the log line to be the value of `
The `stage.metrics` inner block configures stage that allows you to define and update metrics based on values from the shared extracted map.
The created metrics are available at the {{< param "PRODUCT_NAME" >}} root `/metrics` endpoint.

The `stage.metrics` block doesn't support any arguments and is only configured via a number of nested inner `metric.*` blocks, one for each metric that should be generated.
The following arguments are supported:

| Name | Type | Description | Default | Required |
| ----------------------- | ----------------------- | -------------------------------------------------------------- | ------- | -------- |
| `forward_to` | `list(MetricsReceiver)` | Where to forward metrics instead of exposing locally. | | no |
| `metrics_flush_interval`| `duration` | How frequently to flush metrics to the forwarded receivers. | `"60s"` | no |

When `forward_to` is provided, the metrics generated by the `stage.metrics` block are forwarded to the specified receivers (for example, `prometheus.remote_write`) instead of being exposed at the {{< param "PRODUCT_NAME" >}} `/metrics` endpoint.
This is useful when you want to send derived metrics directly to a remote storage backend without exposing them locally.

The `metrics_flush_interval` argument controls how often the metrics are flushed to the receivers specified in `forward_to`. This argument is only used when `forward_to` is set.

{{< admonition type="note" >}}
When using `forward_to`, the metric type and description are not currently appended to the remote write request because the remote write protocol doesn't support updating metadata for metrics being appended.
{{< /admonition >}}

The following blocks are supported inside the definition of `stage.metrics`:

Expand Down Expand Up @@ -1016,6 +1030,39 @@ stage.metrics {
}
```

The following example shows how to forward metrics to a `prometheus.remote_write` component instead of exposing them at the local `/metrics` endpoint:

```alloy
prometheus.remote_write "default" {
endpoint {
url = "http://localhost:9090/api/v1/write"
}
}

loki.process "example" {
forward_to = [loki.write.default.receiver]

stage.regex {
expression = "^.* order_status=(?P<order_status>.*?) .*$"
}

stage.metrics {
forward_to = [prometheus.remote_write.default.receiver]

metric.counter {
name = "successful_orders_total"
description = "successful orders"
source = "order_status"
value = "success"
action = "inc"
prefix = "loki_log_metrics_"
}
}
}
```

In this example, the `loki_metrics_successful_orders_total` counter metric is sent directly to the remote Prometheus instance via `prometheus.remote_write` instead of being exposed at the {{< param "PRODUCT_NAME" >}} `/metrics` endpoint.

### `stage.multiline`

The `stage.multiline` inner block merges multiple lines into a single block before passing it on to the next stage in the pipeline.
Expand Down
11 changes: 6 additions & 5 deletions internal/component/loki/process/metric/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package metric

import (
"fmt"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -78,7 +79,7 @@ func NewCounters(name string, config *CounterConfig) (*Counters, error) {
Name: name,
ConstLabels: labels,
}),
0,
atomic.Int64{},
}
}, int64(config.MaxIdle.Seconds())),
Cfg: config,
Expand All @@ -92,24 +93,24 @@ func (c *Counters) With(labels model.LabelSet) prometheus.Counter {

type expiringCounter struct {
prometheus.Counter
lastModSec int64
lastModSec atomic.Int64
}

// Inc increments the counter by 1. Use Add to increment it by arbitrary
// non-negative values.
func (e *expiringCounter) Inc() {
e.Counter.Inc()
e.lastModSec = time.Now().Unix()
e.lastModSec.Store(time.Now().Unix())
}

// Add adds the given value to the counter. It panics if the value is <
// 0.
func (e *expiringCounter) Add(val float64) {
e.Counter.Add(val)
e.lastModSec = time.Now().Unix()
e.lastModSec.Store(time.Now().Unix())
}

// HasExpired implements Expirable
func (e *expiringCounter) HasExpired(currentTimeSec int64, maxAgeSec int64) bool {
return currentTimeSec-e.lastModSec >= maxAgeSec
return currentTimeSec-e.lastModSec.Load() >= maxAgeSec
}
19 changes: 10 additions & 9 deletions internal/component/loki/process/metric/gauges.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package metric

import (
"fmt"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -77,7 +78,7 @@ func NewGauges(name string, config *GaugeConfig) (*Gauges, error) {
Name: name,
ConstLabels: labels,
}),
0,
atomic.Int64{},
}
}, int64(config.MaxIdle.Seconds())),
Cfg: config,
Expand All @@ -91,50 +92,50 @@ func (g *Gauges) With(labels model.LabelSet) prometheus.Gauge {

type expiringGauge struct {
prometheus.Gauge
lastModSec int64
lastModSec atomic.Int64
}

// Set sets the Gauge to an arbitrary value.
func (g *expiringGauge) Set(val float64) {
g.Gauge.Set(val)
g.lastModSec = time.Now().Unix()
g.lastModSec.Store(time.Now().Unix())
}

// Inc increments the Gauge by 1. Use Add to increment it by arbitrary
// values.
func (g *expiringGauge) Inc() {
g.Gauge.Inc()
g.lastModSec = time.Now().Unix()
g.lastModSec.Store(time.Now().Unix())
}

// Dec decrements the Gauge by 1. Use Sub to decrement it by arbitrary
// values.
func (g *expiringGauge) Dec() {
g.Gauge.Dec()
g.lastModSec = time.Now().Unix()
g.lastModSec.Store(time.Now().Unix())
}

// Add adds the given value to the Gauge. (The value can be negative,
// resulting in a decrease of the Gauge.)
func (g *expiringGauge) Add(val float64) {
g.Gauge.Add(val)
g.lastModSec = time.Now().Unix()
g.lastModSec.Store(time.Now().Unix())
}

// Sub subtracts the given value from the Gauge. (The value can be
// negative, resulting in an increase of the Gauge.)
func (g *expiringGauge) Sub(val float64) {
g.Gauge.Sub(val)
g.lastModSec = time.Now().Unix()
g.lastModSec.Store(time.Now().Unix())
}

// SetToCurrentTime sets the Gauge to the current Unix time in seconds.
func (g *expiringGauge) SetToCurrentTime() {
g.Gauge.SetToCurrentTime()
g.lastModSec = time.Now().Unix()
g.lastModSec.Store(time.Now().Unix())
}

// HasExpired implements Expirable
func (g *expiringGauge) HasExpired(currentTimeSec int64, maxAgeSec int64) bool {
return currentTimeSec-g.lastModSec >= maxAgeSec
return currentTimeSec-g.lastModSec.Load() >= maxAgeSec
}
9 changes: 5 additions & 4 deletions internal/component/loki/process/metric/histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package metric

import (
"fmt"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -62,7 +63,7 @@ func NewHistograms(name string, config *HistogramConfig) (*Histograms, error) {
ConstLabels: labels,
Buckets: config.Buckets,
}),
0,
atomic.Int64{},
}
}, int64(config.MaxIdle.Seconds())),
Cfg: config,
Expand All @@ -76,16 +77,16 @@ func (h *Histograms) With(labels model.LabelSet) prometheus.Histogram {

type expiringHistogram struct {
prometheus.Histogram
lastModSec int64
lastModSec atomic.Int64
}

// Observe adds a single observation to the histogram.
func (h *expiringHistogram) Observe(val float64) {
h.Histogram.Observe(val)
h.lastModSec = time.Now().Unix()
h.lastModSec.Store(time.Now().Unix())
}

// HasExpired implements Expirable
func (h *expiringHistogram) HasExpired(currentTimeSec int64, maxAgeSec int64) bool {
return currentTimeSec-h.lastModSec >= maxAgeSec
return currentTimeSec-h.lastModSec.Load() >= maxAgeSec
}
11 changes: 10 additions & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/grafana/alloy/internal/component/loki/process/stages"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
)

Expand Down Expand Up @@ -68,6 +69,7 @@ type Component struct {
fanout []loki.LogsReceiver

debugDataPublisher livedebugging.DebugDataPublisher
labelStore labelstore.LabelStore
}

// New creates a new loki.process component.
Expand All @@ -77,9 +79,16 @@ func New(o component.Options, args Arguments) (*Component, error) {
return nil, err
}

data, err := o.GetServiceData(labelstore.ServiceName)
if err != nil {
return nil, err
}
ls := data.(labelstore.LabelStore)

c := &Component{
opts: o,
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
labelStore: ls,
}

// Create and immediately export the receiver which remains the same for
Expand Down Expand Up @@ -142,7 +151,7 @@ func (c *Component) Update(args component.Arguments) error {
c.entryHandler.Stop()
}

pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer, c.opts.MinStability)
pipeline, err := stages.NewPipeline(c.opts.Logger, newArgs.Stages, &c.opts.ID, c.opts.Registerer, c.opts.MinStability, c.labelStore)
if err != nil {
return err
}
Expand Down
57 changes: 57 additions & 0 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
Expand All @@ -24,8 +25,10 @@ import (
"github.com/grafana/alloy/internal/component/loki/process/stages"
lsf "github.com/grafana/alloy/internal/component/loki/source/file"
"github.com/grafana/alloy/internal/runtime/componenttest"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/internal/util/testappender"
"github.com/grafana/alloy/internal/util/testlivedebugging"
"github.com/grafana/alloy/syntax"
"github.com/grafana/loki/pkg/push"
Expand Down Expand Up @@ -634,10 +637,62 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
r.stop()
}

func TestMetricStage_ForwardTo_Leak(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

appender := testappender.NewCollectingAppender()
stageCfgStr := `
stage.metrics {
metric.counter {
name = "leak_test_counter"
action = "inc"
match_all = true
}
}
`
var stageCfgStruct struct {
Stages []stages.StageConfig `alloy:"stage,enum"`
}
require.NoError(t, syntax.Unmarshal([]byte(stageCfgStr), &stageCfgStruct))

// Inject appender to trigger runFlushLoop
stageCfgStruct.Stages[0].MetricsConfig.ForwardTo = []storage.Appendable{testappender.ConstantAppendable{Inner: appender}}

args := Arguments{
Stages: stageCfgStruct.Stages,
ForwardTo: []loki.LogsReceiver{loki.NewLogsReceiver()},
}

opts := component.Options{
Logger: util.TestAlloyLogger(t),
Registerer: prometheus.NewRegistry(),
OnStateChange: func(e component.Exports) {},
GetServiceData: getServiceData,
}

c, err := New(opts, args)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.Run(ctx)

// Update component multiple times.
for i := 0; i < 100; i++ {
stageCfgStruct.Stages[0].MetricsConfig.Metrics[0].Counter.Description = fmt.Sprintf("desc %d", i)
args.Stages = stageCfgStruct.Stages

err := c.Update(args)
require.NoError(t, err)
}
}

func getServiceData(name string) (interface{}, error) {
switch name {
case livedebugging.ServiceName:
return livedebugging.NewLiveDebugging(), nil
case labelstore.ServiceName:
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
default:
return nil, fmt.Errorf("service not found %s", name)
}
Expand All @@ -662,6 +717,8 @@ func getServiceDataWithLiveDebugging(log *testlivedebugging.Log) func(string) (i
switch name {
case livedebugging.ServiceName:
return ld, nil
case labelstore.ServiceName:
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
default:
return nil, fmt.Errorf("service not found %s", name)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/component/loki/process/stages/decolorize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/labelstore"
)

var testDecolorizePipeline = `
Expand Down Expand Up @@ -41,7 +42,7 @@ func TestPipeline_Decolorize(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

pl, err := NewPipeline(log.NewNopLogger(), loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable)
pl, err := NewPipeline(log.NewNopLogger(), loadConfig(testData.config), nil, prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable, labelstore.New(nil, prometheus.DefaultRegisterer))
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/component/loki/process/stages/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/util"
)

Expand Down Expand Up @@ -434,7 +435,7 @@ func TestDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "test_drop_pipeline"
logger := util.TestAlloyLogger(t)
pl, err := NewPipeline(logger, loadConfig(testDropAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable)
pl, err := NewPipeline(logger, loadConfig(testDropAlloy), &plName, registry, featuregate.StabilityGenerallyAvailable, labelstore.New(nil, prometheus.DefaultRegisterer))
require.NoError(t, err)
out := processEntries(pl,
newEntry(nil, nil, testMatchLogLineApp1, time.Now()),
Expand Down
Loading