Skip to content

Commit eda3152

Browse files
authored
feat(loki.process): Support structured metadata as source type of stage.labels for loki.process (#5055)
#### PR Description This enhances the `labels` stage of `loki.process` to be able to retrieve data from structured metadata. #### Which issue(s) this PR fixes Fixes #4717 #### Notes to the Reviewer #### PR Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [x] Documentation added - [x] Tests updated - [x] Config converters updated
1 parent 534e7db commit eda3152

8 files changed

Lines changed: 332 additions & 97 deletions

File tree

docs/sources/reference/components/loki/loki.process.md

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -582,15 +582,16 @@ stage.label_keep {
582582

583583
### `stage.labels`
584584

585-
The `stage.labels` inner block configures a labels processing stage that can read data from the extracted values map and set new labels on incoming log entries.
585+
The `stage.labels` inner block configures a labels processing stage that can read data from the extracted values map or structured metadata and set new labels on incoming log entries.
586586

587587
For labels that are static, refer to [`stage.static_labels`][stage.static_labels]
588588

589589
The following arguments are supported:
590590

591-
| Name | Type | Description | Default | Required |
592-
| -------- | ------------- | --------------------------------------- | ------- | -------- |
593-
| `values` | `map(string)` | Configures a `labels` processing stage. | `{}` | no |
591+
| Name | Type | Description | Default | Required |
592+
| ------------- | ------------- | -------------------------------------------------------------------------------------------------------- | ------------- | -------- |
593+
| `values` | `map(string)` | Configures a `labels` processing stage. | `{}` | no |
594+
| `source_type` | `string` | Where to retrieve the data from. Allowed values are `"extracted"` (default) or `"structured_metadata"`. | `"extracted"` | no |
594595

595596
In a labels stage, the map's keys define the label to set and the values are how to look them up.
596597
If the value is empty, it's inferred to be the same as the key.
@@ -604,6 +605,21 @@ stage.labels {
604605
}
605606
```
606607

608+
```alloy
609+
stage.labels {
610+
source_type = "structured_metadata"
611+
values = {
612+
env = "", // Sets up an 'env' label, based on the 'env' structured metadata value.
613+
user = "username", // Sets up a 'user' label, based on the 'username' structured metadata value.
614+
}
615+
}
616+
617+
// Drop the converted structured metadata
618+
stage.structured_metadata_drop {
619+
values = [ "env", "username" ]
620+
}
621+
```
622+
607623
### `stage.limit`
608624

609625
The `stage.limit` inner block configures a rate-limiting stage that throttles logs based on several options.

internal/component/loki/process/stages/labels.go

Lines changed: 78 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,51 @@ import (
44
"errors"
55
"fmt"
66
"reflect"
7-
"time"
87

98
"github.com/go-kit/log"
109
"github.com/prometheus/common/model"
1110

1211
"github.com/grafana/alloy/internal/runtime/logging/level"
12+
"github.com/grafana/loki/pkg/push"
1313
)
1414

1515
const (
1616
ErrEmptyLabelStageConfig = "label stage config cannot be empty"
1717
ErrInvalidLabelName = "invalid label name: %s"
18+
ErrInvalidSourceType = "invalid labels source_type: %s. Can only be 'extracted' or 'structured_metadata'"
19+
20+
LabelsSourceStructuredMetadata string = "structured_metadata"
21+
LabelsSourceExtractedMap string = "extracted"
1822
)
1923

2024
// LabelsConfig is a set of labels to be extracted
2125
type LabelsConfig struct {
22-
Values map[string]*string `alloy:"values,attr"`
26+
Values map[string]*string `alloy:"values,attr"`
27+
SourceType SourceType `alloy:"source_type,attr,optional"`
2328
}
2429

2530
// validateLabelsConfig validates the Label stage configuration
26-
func validateLabelsConfig(c map[string]*string) (map[string]string, error) {
31+
func validateLabelsConfig(cfg *LabelsConfig) (map[string]string, error) {
32+
if cfg.Values == nil {
33+
return nil, errors.New(ErrEmptyLabelStageConfig)
34+
}
35+
36+
if cfg.SourceType == "" {
37+
cfg.SourceType = SourceTypeExtractedMap
38+
}
39+
40+
switch cfg.SourceType {
41+
case SourceTypeExtractedMap, SourceTypeStructuredMetadata:
42+
default:
43+
return nil, fmt.Errorf(ErrInvalidSourceType, cfg.SourceType)
44+
}
45+
2746
// We must not mutate the c.Values, create a copy with changes we need.
2847
ret := map[string]string{}
29-
if c == nil {
48+
if cfg.Values == nil {
3049
return nil, errors.New(ErrEmptyLabelStageConfig)
3150
}
32-
for labelName, labelSrc := range c {
51+
for labelName, labelSrc := range cfg.Values {
3352
// TODO: add support for different validation schemes.
3453
//nolint:staticcheck
3554
if !model.LabelName(labelName).IsValid() {
@@ -42,54 +61,93 @@ func validateLabelsConfig(c map[string]*string) (map[string]string, error) {
4261
ret[labelName] = *labelSrc
4362
}
4463
}
64+
4565
return ret, nil
4666
}
4767

4868
// newLabelStage creates a new label stage to set labels from extracted data
4969
func newLabelStage(logger log.Logger, configs LabelsConfig) (Stage, error) {
50-
labelsConfig, err := validateLabelsConfig(configs.Values)
70+
labelsConfig, err := validateLabelsConfig(&configs)
5171
if err != nil {
5272
return nil, err
5373
}
54-
return toStage(&labelStage{
74+
return &labelStage{
75+
cfg: &configs,
5576
labelsConfig: labelsConfig,
5677
logger: logger,
57-
}), nil
78+
}, nil
5879
}
5980

6081
// labelStage sets labels from extracted data
6182
type labelStage struct {
83+
cfg *LabelsConfig
6284
labelsConfig map[string]string
6385
logger log.Logger
6486
}
6587

66-
// Process implements Stage
67-
func (l *labelStage) Process(labels model.LabelSet, extracted map[string]any, _ *time.Time, _ *string) {
68-
processLabelsConfigs(l.logger, extracted, l.labelsConfig, func(labelName model.LabelName, labelValue model.LabelValue) {
69-
labels[labelName] = labelValue
70-
})
88+
// Run implements Stage
89+
func (l *labelStage) Run(in chan Entry) chan Entry {
90+
out := make(chan Entry)
91+
go func() {
92+
defer close(out)
93+
for e := range in {
94+
switch l.cfg.SourceType {
95+
case SourceTypeExtractedMap:
96+
l.addLabelFromExtractedMap(e.Labels, e.Extracted)
97+
case SourceTypeStructuredMetadata:
98+
l.addLabelsFromStructuredMetadata(e.Labels, e.StructuredMetadata)
99+
}
100+
out <- e
101+
}
102+
}()
103+
return out
71104
}
72105

73-
type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue)
74-
75-
func processLabelsConfigs(logger log.Logger, extracted map[string]any, labelsConfig map[string]string, consumer labelsConsumer) {
76-
for lName, lSrc := range labelsConfig {
106+
func (l *labelStage) addLabelFromExtractedMap(labels model.LabelSet, extracted map[string]any) {
107+
for lName, lSrc := range l.labelsConfig {
77108
if lValue, ok := extracted[lSrc]; ok {
78109
s, err := getString(lValue)
79110
if err != nil {
80111
if Debug {
81-
level.Debug(logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue))
112+
level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue))
82113
}
83114
continue
84115
}
85116
labelValue := model.LabelValue(s)
86117
if !labelValue.IsValid() {
87118
if Debug {
88-
level.Debug(logger).Log("msg", "invalid label value parsed", "value", labelValue)
119+
level.Debug(l.logger).Log("msg", "invalid label value parsed", "value", labelValue)
89120
}
90121
continue
91122
}
92-
consumer(model.LabelName(lName), labelValue)
123+
124+
labels[model.LabelName(lName)] = labelValue
93125
}
94126
}
95127
}
128+
129+
func (l *labelStage) addLabelsFromStructuredMetadata(labels model.LabelSet, metadata push.LabelsAdapter) {
130+
for lName, lSrc := range l.labelsConfig {
131+
for _, kv := range metadata {
132+
if kv.Name != lSrc {
133+
continue
134+
}
135+
136+
labelValue := model.LabelValue(kv.Value)
137+
if !labelValue.IsValid() {
138+
if Debug {
139+
level.Debug(l.logger).Log("msg", "invalid structured metadata label value", "label", lName, "value", labelValue)
140+
}
141+
break
142+
}
143+
144+
labels[model.LabelName(lName)] = labelValue
145+
break
146+
}
147+
}
148+
}
149+
150+
// Cleanup implements Stage.
151+
func (*labelStage) Cleanup() {
152+
// no-op
153+
}

0 commit comments

Comments
 (0)