From 4e6c224f132b8192d0a23c06f3b66932af72d114 Mon Sep 17 00:00:00 2001 From: Matthias Baur Date: Wed, 10 Dec 2025 12:09:06 +0100 Subject: [PATCH 1/5] feat(loki.process) Support structured metadata as source type of stage.labels for loki.process Fixes #4717 --- .../reference/components/loki/loki.process.md | 19 ++- .../component/loki/process/stages/labels.go | 117 ++++++++++---- .../loki/process/stages/labels_test.go | 152 +++++++++++++++--- .../process/stages/structured_metadata.go | 51 +++++- 4 files changed, 280 insertions(+), 59 deletions(-) diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index cf26f176853..f2d7b34dda9 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -582,15 +582,16 @@ stage.label_keep { ### `stage.labels` -The `stage.labels` inner block configures a labels processing stage that can read data from the extracted values map and set new labels on incoming log entries. +The `stage.labels` inner block configures a labels processing stage that can read data from the extracted values map or structured metadata and set new labels on incoming log entries. For labels that are static, refer to [`stage.static_labels`][stage.static_labels] The following arguments are supported: -| Name | Type | Description | Default | Required | -| -------- | ------------- | --------------------------------------- | ------- | -------- | -| `values` | `map(string)` | Configures a `labels` processing stage. | `{}` | no | +| Name | Type | Description | Default | Required | +| ------------- | ------------- | -------------------------------------------------------------------------------------------------------- | ------------- | -------- | +| `values` | `map(string)` | Configures a `labels` processing stage. | `{}` | no | +| `source_type` | `string` | Where to retrieve the data from. Allowed values are `"extracted"` (default) or `"structured_metadata"`. | `"extracted"` | no | In a labels stage, the map's keys define the label to set and the values are how to look them up. If the value is empty, it's inferred to be the same as the key. @@ -604,6 +605,16 @@ stage.labels { } ``` +```alloy +stage.labels { + source_type = "structured_metadata" + values = { + env = "", // Sets up an 'env' label, based on the 'env' structured metadata value. + user = "username", // Sets up a 'user' label, based on the 'username' structured metadata value. + } +} +``` + ### `stage.limit` The `stage.limit` inner block configures a rate-limiting stage that throttles logs based on several options. diff --git a/internal/component/loki/process/stages/labels.go b/internal/component/loki/process/stages/labels.go index 18ecd06af98..2011f44c214 100644 --- a/internal/component/loki/process/stages/labels.go +++ b/internal/component/loki/process/stages/labels.go @@ -4,92 +4,145 @@ import ( "errors" "fmt" "reflect" - "time" "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/loki/pkg/push" ) const ( ErrEmptyLabelStageConfig = "label stage config cannot be empty" ErrInvalidLabelName = "invalid label name: %s" + ErrInvalidSourceType = "invalid labels source_type: %s. Can only be 'extracted' or 'structured_metadata'" + + LabelsSourceStructuredMetadata string = "structured_metadata" + LabelsSourceExtractedMap string = "extracted" ) // LabelsConfig is a set of labels to be extracted type LabelsConfig struct { - Values map[string]*string `alloy:"values,attr"` + Values map[string]*string `alloy:"values,attr"` + SourceType string `alloy:"source_type,attr,optional"` } // validateLabelsConfig validates the Label stage configuration -func validateLabelsConfig(c map[string]*string) (map[string]string, error) { - // We must not mutate the c.Values, create a copy with changes we need. - ret := map[string]string{} - if c == nil { - return nil, errors.New(ErrEmptyLabelStageConfig) +func validateLabelsConfig(cfg *LabelsConfig) error { + if cfg.Values == nil { + return errors.New(ErrEmptyLabelStageConfig) + } + + if cfg.SourceType == "" { + cfg.SourceType = LabelsSourceExtractedMap } - for labelName, labelSrc := range c { + + switch cfg.SourceType { + case LabelsSourceExtractedMap, LabelsSourceStructuredMetadata: + default: + return fmt.Errorf(ErrInvalidSourceType, cfg.SourceType) + } + + // We must not mutate the c.Values, create a copy with changes we need. + returnValues := map[string]*string{} + for labelName, labelSrc := range cfg.Values { // TODO: add support for different validation schemes. //nolint:staticcheck if !model.LabelName(labelName).IsValid() { - return nil, fmt.Errorf(ErrInvalidLabelName, labelName) + return fmt.Errorf(ErrInvalidLabelName, labelName) } // If no label source was specified, use the key name if labelSrc == nil || *labelSrc == "" { - ret[labelName] = labelName + returnValues[labelName] = &labelName } else { - ret[labelName] = *labelSrc + returnValues[labelName] = labelSrc } } - return ret, nil + cfg.Values = returnValues + return nil } // newLabelStage creates a new label stage to set labels from extracted data func newLabelStage(logger log.Logger, configs LabelsConfig) (Stage, error) { - labelsConfig, err := validateLabelsConfig(configs.Values) + err := validateLabelsConfig(&configs) if err != nil { return nil, err } - return toStage(&labelStage{ - labelsConfig: labelsConfig, - logger: logger, - }), nil + return &labelStage{ + cfg: &configs, + logger: logger, + }, nil } // labelStage sets labels from extracted data type labelStage struct { - labelsConfig map[string]string - logger log.Logger + cfg *LabelsConfig + logger log.Logger } -// Process implements Stage -func (l *labelStage) Process(labels model.LabelSet, extracted map[string]any, _ *time.Time, _ *string) { - processLabelsConfigs(l.logger, extracted, l.labelsConfig, func(labelName model.LabelName, labelValue model.LabelValue) { - labels[labelName] = labelValue - }) +// Run implements Stage +func (l *labelStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range in { + switch l.cfg.SourceType { + case LabelsSourceExtractedMap: + l.addLabelFromExtractedMap(e.Labels, e.Extracted) + case LabelsSourceStructuredMetadata: + l.addLabelsFromStructuredMetadata(e.Labels, e.StructuredMetadata) + } + out <- e + } + }() + return out } -type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue) - -func processLabelsConfigs(logger log.Logger, extracted map[string]any, labelsConfig map[string]string, consumer labelsConsumer) { - for lName, lSrc := range labelsConfig { - if lValue, ok := extracted[lSrc]; ok { +func (l *labelStage) addLabelFromExtractedMap(labels model.LabelSet, extracted map[string]any) { + for lName, lSrc := range l.cfg.Values { + if lValue, ok := extracted[*lSrc]; ok { s, err := getString(lValue) if err != nil { if Debug { - level.Debug(logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) + level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) } continue } labelValue := model.LabelValue(s) if !labelValue.IsValid() { if Debug { - level.Debug(logger).Log("msg", "invalid label value parsed", "value", labelValue) + level.Debug(l.logger).Log("msg", "invalid label value parsed", "value", labelValue) } continue } - consumer(model.LabelName(lName), labelValue) + + labels[model.LabelName(lName)] = labelValue + } + } +} + +func (l *labelStage) addLabelsFromStructuredMetadata(labels model.LabelSet, metadata push.LabelsAdapter) { + for lName, lSrc := range l.cfg.Values { + for _, kv := range metadata { + if kv.Name != *lSrc { + continue + } + + labelValue := model.LabelValue(kv.Value) + if !labelValue.IsValid() { + if Debug { + level.Debug(l.logger).Log("msg", "invalid structured metadata label value", "label", lName, "value", labelValue) + } + break + } + + labels[model.LabelName(lName)] = labelValue + break } } } + +// Cleanup implements Stage. +func (*labelStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/labels_test.go b/internal/component/loki/process/stages/labels_test.go index 1ef78aad723..0802137fc0a 100644 --- a/internal/component/loki/process/stages/labels_test.go +++ b/internal/component/loki/process/stages/labels_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/loki/pkg/push" ) var testLabelsYaml = ` stage.json { @@ -39,7 +40,29 @@ var testLabelsLogLineWithMissingKey = ` } ` -func TestLabelsPipeline_Labels(t *testing.T) { +var testLabelsStrucuturedMetadataYaml = ` +// Create strucutured metadata +stage.static_labels { + values = { + "foo" = "bar", + } +} +stage.structured_metadata { + values = { + "baz" = "foo", + } +} + +// Create label from structured metadata +stage.labels { + source_type = "structured_metadata" + values = { + "from_structured" = "baz", + } +} +` + +func TestLabelsPipeline_LabelsFromExtracted(t *testing.T) { pl, err := NewPipeline(log.NewNopLogger(), loadConfig(testLabelsYaml), prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) @@ -53,6 +76,19 @@ func TestLabelsPipeline_Labels(t *testing.T) { assert.Equal(t, expectedLbls, out.Labels) } +func TestLabelsPipeline_LabelsFromStructuredMetadata(t *testing.T) { + pl, err := NewPipeline(log.NewNopLogger(), loadConfig(testLabelsStrucuturedMetadataYaml), prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) + if err != nil { + t.Fatal(err) + } + expectedLbls := model.LabelSet{ + "from_structured": "bar", + } + + out := processEntries(pl, newEntry(nil, nil, "", time.Now()))[0] + assert.Equal(t, expectedLbls, out.Labels) +} + func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) { var buf bytes.Buffer w := log.NewSyncWriter(&buf) @@ -76,13 +112,13 @@ var ( lv3 = "" ) -var emptyLabelsConfig = LabelsConfig{nil} +var emptyLabelsConfig = LabelsConfig{nil, ""} func TestLabels(t *testing.T) { tests := map[string]struct { config LabelsConfig err error - expectedCfgs map[string]string + expectedCfgs map[string]*string }{ "missing config": { config: emptyLabelsConfig, @@ -96,17 +132,42 @@ func TestLabels(t *testing.T) { err: fmt.Errorf(ErrInvalidLabelName, "\xfd"), expectedCfgs: nil, }, - "label value is set from name": { - config: LabelsConfig{Values: map[string]*string{ + "invalid source type": { + config: LabelsConfig{ + Values: map[string]*string{"l1": ptr("")}, + SourceType: "invalid_source_type", + }, + err: fmt.Errorf("invalid labels source_type: %s. Can only be 'extracted' or 'structured_metadata'", "invalid_source_type"), + expectedCfgs: nil, + }, + "label value is set from name for extracted": { + config: LabelsConfig{ + SourceType: LabelsSourceExtractedMap, + Values: map[string]*string{ + "l1": &lv1, + "l2": nil, + "l3": &lv3, + }}, + err: nil, + expectedCfgs: map[string]*string{ "l1": &lv1, - "l2": nil, - "l3": &lv3, - }}, + "l2": ptr("l2"), + "l3": ptr("l3"), + }, + }, + "label value is set from name for structured_metadata": { + config: LabelsConfig{ + SourceType: LabelsSourceStructuredMetadata, + Values: map[string]*string{ + "l1": &lv1, + "l2": nil, + "l3": &lv3, + }}, err: nil, - expectedCfgs: map[string]string{ - "l1": lv1, - "l2": "l2", - "l3": "l3", + expectedCfgs: map[string]*string{ + "l1": &lv1, + "l2": ptr("l2"), + "l3": ptr("l3"), }, }, } @@ -114,7 +175,7 @@ func TestLabels(t *testing.T) { test := test t.Run(name, func(t *testing.T) { t.Parallel() - actual, err := validateLabelsConfig(test.config.Values) + err := validateLabelsConfig(&test.config) if (err != nil) != (test.err != nil) { t.Errorf("validateLabelsConfig() expected error = %v, actual error = %v", test.err, err) return @@ -124,39 +185,72 @@ func TestLabels(t *testing.T) { return } if test.expectedCfgs != nil { - assert.Equal(t, test.expectedCfgs, actual) + assert.Equal(t, test.expectedCfgs, test.config.Values) } }) } } -func TestLabelStage_Process(t *testing.T) { +func TestLabelsStage_Process(t *testing.T) { sourceName := "diff_source" tests := map[string]struct { - config LabelsConfig - extractedData map[string]any - inputLabels model.LabelSet - expectedLabels model.LabelSet + config LabelsConfig + extractedData map[string]any + strcturedMetadata push.LabelsAdapter + inputLabels model.LabelSet + expectedLabels model.LabelSet }{ - "extract_success": { + "extract_success_extracted": { LabelsConfig{Values: map[string]*string{ "testLabel": nil, }}, map[string]any{ "testLabel": "testValue", }, + push.LabelsAdapter{}, + model.LabelSet{}, + model.LabelSet{ + "testLabel": "testValue", + }, + }, + "extract_success_structured_metadata": { + LabelsConfig{ + SourceType: LabelsSourceStructuredMetadata, + Values: map[string]*string{ + "testLabel": ptr("testStrucuturedMetadata"), + }}, + map[string]any{}, + push.LabelsAdapter{ + push.LabelAdapter{Name: "testStrucuturedMetadata", Value: "testValue"}, + }, model.LabelSet{}, model.LabelSet{ "testLabel": "testValue", }, }, - "different_source_name": { + "different_source_name_extracted": { LabelsConfig{Values: map[string]*string{ "testLabel": &sourceName, }}, map[string]any{ sourceName: "testValue", }, + push.LabelsAdapter{}, + model.LabelSet{}, + model.LabelSet{ + "testLabel": "testValue", + }, + }, + "different_source_name_structured_metadata": { + LabelsConfig{ + SourceType: LabelsSourceStructuredMetadata, + Values: map[string]*string{ + "testLabel": &sourceName, + }}, + map[string]any{}, + push.LabelsAdapter{ + push.LabelAdapter{Name: sourceName, Value: "testValue"}, + }, model.LabelSet{}, model.LabelSet{ "testLabel": "testValue", @@ -167,6 +261,18 @@ func TestLabelStage_Process(t *testing.T) { "testLabel": &sourceName, }}, map[string]any{}, + push.LabelsAdapter{}, + model.LabelSet{}, + model.LabelSet{}, + }, + "empty_structured_metadata": { + LabelsConfig{ + SourceType: LabelsSourceStructuredMetadata, + Values: map[string]*string{ + "testLabel": &sourceName, + }}, + map[string]any{}, + push.LabelsAdapter{}, model.LabelSet{}, model.LabelSet{}, }, @@ -180,7 +286,9 @@ func TestLabelStage_Process(t *testing.T) { t.Fatal(err) } - out := processEntries(st, newEntry(test.extractedData, test.inputLabels, "", time.Time{}))[0] + entry := newEntry(test.extractedData, test.inputLabels, "", time.Time{}) + entry.StructuredMetadata = test.strcturedMetadata + out := processEntries(st, entry)[0] assert.Equal(t, test.expectedLabels, out.Labels) }) } diff --git a/internal/component/loki/process/stages/structured_metadata.go b/internal/component/loki/process/stages/structured_metadata.go index 03d0576fb1f..008f936d776 100644 --- a/internal/component/loki/process/stages/structured_metadata.go +++ b/internal/component/loki/process/stages/structured_metadata.go @@ -1,6 +1,8 @@ package stages import ( + "errors" + "fmt" "reflect" "regexp" @@ -15,12 +17,35 @@ type StructuredMetadataConfig struct { Regex string `alloy:"regex,attr,optional"` } +// validateStructuredMetadataConfig validates the structured metadata stage config. +func validateStructuredMetadataConfig(c map[string]*string) (map[string]string, error) { + // We must not mutate the c.Values, create a copy with changes we need. + ret := map[string]string{} + if c == nil { + return nil, errors.New(ErrEmptyLabelStageConfig) + } + for labelName, labelSrc := range c { + // TODO: add support for different validation schemes. + //nolint:staticcheck + if !model.LabelName(labelName).IsValid() { + return nil, fmt.Errorf(ErrInvalidLabelName, labelName) + } + // If no label source was specified, use the key name + if labelSrc == nil || *labelSrc == "" { + ret[labelName] = labelName + } else { + ret[labelName] = *labelSrc + } + } + return ret, nil +} + func newStructuredMetadataStage(logger log.Logger, configs StructuredMetadataConfig) (Stage, error) { var validatedLabelsConfig map[string]string var err error if len(configs.Values) > 0 { - validatedLabelsConfig, err = validateLabelsConfig(configs.Values) + validatedLabelsConfig, err = validateStructuredMetadataConfig(configs.Values) if err != nil { return nil, err } @@ -81,6 +106,30 @@ func (s *structuredMetadataStage) Run(in chan Entry) chan Entry { }) } +type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue) + +func processLabelsConfigs(logger log.Logger, extracted map[string]any, labelsConfig map[string]string, consumer labelsConsumer) { + for lName, lSrc := range labelsConfig { + if lValue, ok := extracted[lSrc]; ok { + s, err := getString(lValue) + if err != nil { + if Debug { + level.Debug(logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) + } + continue + } + labelValue := model.LabelValue(s) + if !labelValue.IsValid() { + if Debug { + level.Debug(logger).Log("msg", "invalid label value parsed", "value", labelValue) + } + continue + } + consumer(model.LabelName(lName), labelValue) + } + } +} + func (s *structuredMetadataStage) extractFromLabels(e Entry) Entry { labels := e.Labels foundLabels := []model.LabelName{} From 712bf39d3f6db860429600458914c0ce91188934 Mon Sep 17 00:00:00 2001 From: Matthias Baur Date: Wed, 4 Feb 2026 12:00:52 +0100 Subject: [PATCH 2/5] Document how to drop strucutured metadata have label conversion --- docs/sources/reference/components/loki/loki.process.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index f2d7b34dda9..3e38f9cd0df 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -613,6 +613,11 @@ stage.labels { user = "username", // Sets up a 'user' label, based on the 'username' structured metadata value. } } + +// Drop the converted structured metadata +stage.structured_metadata_drop { + values = [ "env", "username" ] +} ``` ### `stage.limit` From 9164d17b7301aadb1384a9e46f5fce9648204e92 Mon Sep 17 00:00:00 2001 From: Matthias Baur Date: Wed, 4 Feb 2026 12:19:37 +0100 Subject: [PATCH 3/5] Use the same SourceType for labels and truncate stage --- .../component/loki/process/stages/labels.go | 10 ++-- .../loki/process/stages/labels_test.go | 10 ++-- .../component/loki/process/stages/truncate.go | 54 ++++--------------- .../loki/process/stages/truncate_test.go | 20 +++---- .../component/loki/process/stages/types.go | 38 +++++++++++++ 5 files changed, 68 insertions(+), 64 deletions(-) create mode 100644 internal/component/loki/process/stages/types.go diff --git a/internal/component/loki/process/stages/labels.go b/internal/component/loki/process/stages/labels.go index 2011f44c214..20b68c14000 100644 --- a/internal/component/loki/process/stages/labels.go +++ b/internal/component/loki/process/stages/labels.go @@ -24,7 +24,7 @@ const ( // LabelsConfig is a set of labels to be extracted type LabelsConfig struct { Values map[string]*string `alloy:"values,attr"` - SourceType string `alloy:"source_type,attr,optional"` + SourceType SourceType `alloy:"source_type,attr,optional"` } // validateLabelsConfig validates the Label stage configuration @@ -34,11 +34,11 @@ func validateLabelsConfig(cfg *LabelsConfig) error { } if cfg.SourceType == "" { - cfg.SourceType = LabelsSourceExtractedMap + cfg.SourceType = SourceTypeExtractedMap } switch cfg.SourceType { - case LabelsSourceExtractedMap, LabelsSourceStructuredMetadata: + case SourceTypeExtractedMap, SourceTypeStructuredMetadata: default: return fmt.Errorf(ErrInvalidSourceType, cfg.SourceType) } @@ -87,9 +87,9 @@ func (l *labelStage) Run(in chan Entry) chan Entry { defer close(out) for e := range in { switch l.cfg.SourceType { - case LabelsSourceExtractedMap: + case SourceTypeExtractedMap: l.addLabelFromExtractedMap(e.Labels, e.Extracted) - case LabelsSourceStructuredMetadata: + case SourceTypeStructuredMetadata: l.addLabelsFromStructuredMetadata(e.Labels, e.StructuredMetadata) } out <- e diff --git a/internal/component/loki/process/stages/labels_test.go b/internal/component/loki/process/stages/labels_test.go index 0802137fc0a..ddb3ff09e35 100644 --- a/internal/component/loki/process/stages/labels_test.go +++ b/internal/component/loki/process/stages/labels_test.go @@ -142,7 +142,7 @@ func TestLabels(t *testing.T) { }, "label value is set from name for extracted": { config: LabelsConfig{ - SourceType: LabelsSourceExtractedMap, + SourceType: SourceTypeExtractedMap, Values: map[string]*string{ "l1": &lv1, "l2": nil, @@ -157,7 +157,7 @@ func TestLabels(t *testing.T) { }, "label value is set from name for structured_metadata": { config: LabelsConfig{ - SourceType: LabelsSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Values: map[string]*string{ "l1": &lv1, "l2": nil, @@ -215,7 +215,7 @@ func TestLabelsStage_Process(t *testing.T) { }, "extract_success_structured_metadata": { LabelsConfig{ - SourceType: LabelsSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Values: map[string]*string{ "testLabel": ptr("testStrucuturedMetadata"), }}, @@ -243,7 +243,7 @@ func TestLabelsStage_Process(t *testing.T) { }, "different_source_name_structured_metadata": { LabelsConfig{ - SourceType: LabelsSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Values: map[string]*string{ "testLabel": &sourceName, }}, @@ -267,7 +267,7 @@ func TestLabelsStage_Process(t *testing.T) { }, "empty_structured_metadata": { LabelsConfig{ - SourceType: LabelsSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Values: map[string]*string{ "testLabel": &sourceName, }}, diff --git a/internal/component/loki/process/stages/truncate.go b/internal/component/loki/process/stages/truncate.go index 3025ab8fa74..102b5b9b435 100644 --- a/internal/component/loki/process/stages/truncate.go +++ b/internal/component/loki/process/stages/truncate.go @@ -1,9 +1,7 @@ package stages import ( - "encoding" "errors" - "fmt" "maps" "slices" "strings" @@ -36,46 +34,14 @@ type TruncateConfig struct { } type RuleConfig struct { - Limit units.Base2Bytes `alloy:"limit,attr"` - Suffix string `alloy:"suffix,attr,optional"` - Sources []string `alloy:"sources,attr,optional"` - SourceType TruncateSourceType `alloy:"source_type,attr,optional"` + Limit units.Base2Bytes `alloy:"limit,attr"` + Suffix string `alloy:"suffix,attr,optional"` + Sources []string `alloy:"sources,attr,optional"` + SourceType SourceType `alloy:"source_type,attr,optional"` effectiveLimit units.Base2Bytes } -var ( - _ encoding.TextMarshaler = TruncateSourceType("") - _ encoding.TextUnmarshaler = (*TruncateSourceType)(nil) -) - -type TruncateSourceType string - -// UnmarshalText implements encoding.TextUnmarshaler. -func (t *TruncateSourceType) UnmarshalText(text []byte) error { - str := string(text) - switch str { - case string(TruncateSourceLine), string(TruncateSourceLabel), string(TruncateSourceStructuredMetadata), string(TruncateSourceExtractedMap): - *t = TruncateSourceType(str) - default: - return fmt.Errorf("unknown source_type: %s", str) - } - - return nil -} - -// MarshalText implements encoding.TextMarshaler. -func (t TruncateSourceType) MarshalText() (text []byte, err error) { - return []byte(t), nil -} - -const ( - TruncateSourceLine TruncateSourceType = "line" - TruncateSourceLabel TruncateSourceType = "label" - TruncateSourceStructuredMetadata TruncateSourceType = "structured_metadata" - TruncateSourceExtractedMap TruncateSourceType = "extracted" -) - // validateTruncateConfig validates the TruncateConfig for the truncateStage func validateTruncateConfig(cfg *TruncateConfig) error { if len(cfg.Rules) == 0 { @@ -90,10 +56,10 @@ func validateTruncateConfig(cfg *TruncateConfig) error { } if r.SourceType == "" { - r.SourceType = TruncateSourceLine + r.SourceType = SourceTypeLine } - if r.SourceType == TruncateSourceLine && len(r.Sources) > 0 { + if r.SourceType == SourceTypeLine && len(r.Sources) > 0 { return errors.New(errSourcesForLine) } @@ -138,7 +104,7 @@ func (m *truncateStage) Run(in chan Entry) chan Entry { truncated := map[string]struct{}{} for _, r := range m.cfg.Rules { switch r.SourceType { - case TruncateSourceLine: + case SourceTypeLine: if len(e.Line) > int(r.effectiveLimit) { e.Line = e.Line[:r.effectiveLimit] + r.Suffix markTruncated(m.truncatedCount, truncated, truncateLineField) @@ -147,7 +113,7 @@ func (m *truncateStage) Run(in chan Entry) chan Entry { level.Debug(m.logger).Log("msg", "line has been truncated", "limit", r.effectiveLimit, "truncated_line", e.Line) } } - case TruncateSourceLabel: + case SourceTypeLabel: if len(r.Sources) > 0 { for _, source := range r.Sources { name := model.LabelName(source) @@ -160,7 +126,7 @@ func (m *truncateStage) Run(in chan Entry) chan Entry { m.tryTruncateLabel(r, e.Labels, k, v, truncated) } } - case TruncateSourceStructuredMetadata: + case SourceTypeStructuredMetadata: if len(r.Sources) > 0 { for i, v := range e.StructuredMetadata { if slices.Contains(r.Sources, v.Name) { @@ -174,7 +140,7 @@ func (m *truncateStage) Run(in chan Entry) chan Entry { e.StructuredMetadata[i] = m.tryTruncateStructuredMetadata(r, v, truncated) } } - case TruncateSourceExtractedMap: + case SourceTypeExtractedMap: if len(r.Sources) > 0 { for _, source := range r.Sources { if v, ok := e.Extracted[source]; ok { diff --git a/internal/component/loki/process/stages/truncate_test.go b/internal/component/loki/process/stages/truncate_test.go index d617bdf887c..af9b631a38a 100644 --- a/internal/component/loki/process/stages/truncate_test.go +++ b/internal/component/loki/process/stages/truncate_test.go @@ -80,7 +80,7 @@ func Test_TruncateStage_Process(t *testing.T) { config: []*RuleConfig{ { Limit: 15, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Suffix: "[truncated]", }, }, @@ -96,7 +96,7 @@ func Test_TruncateStage_Process(t *testing.T) { config: []*RuleConfig{ { Limit: 15, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Suffix: "[truncated]", Sources: []string{"app"}, }, @@ -113,7 +113,7 @@ func Test_TruncateStage_Process(t *testing.T) { config: []*RuleConfig{ { Limit: 15, - SourceType: TruncateSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Suffix: "", }, }, @@ -132,7 +132,7 @@ func Test_TruncateStage_Process(t *testing.T) { config: []*RuleConfig{ { Limit: 15, - SourceType: TruncateSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Suffix: "", Sources: []string{"meta1"}, }, @@ -155,18 +155,18 @@ func Test_TruncateStage_Process(t *testing.T) { }, { Limit: 15, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Suffix: "[truncated]", Sources: []string{"app"}, }, { Limit: 15, - SourceType: TruncateSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Suffix: "", }, { Limit: 8, - SourceType: TruncateSourceExtractedMap, + SourceType: SourceTypeExtractedMap, Sources: []string{"field2"}, }, }, @@ -298,7 +298,7 @@ func Test_ValidateTruncateConfig(t *testing.T) { config: &TruncateConfig{ Rules: []*RuleConfig{{ Limit: 10, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Suffix: "...", }}, }, @@ -309,7 +309,7 @@ func Test_ValidateTruncateConfig(t *testing.T) { config: &TruncateConfig{ Rules: []*RuleConfig{{ Limit: 10, - SourceType: TruncateSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Suffix: "...", }}, }, @@ -321,7 +321,7 @@ func Test_ValidateTruncateConfig(t *testing.T) { Rules: []*RuleConfig{ { Limit: 10, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Sources: []string{"app"}, Suffix: "...", }, diff --git a/internal/component/loki/process/stages/types.go b/internal/component/loki/process/stages/types.go new file mode 100644 index 00000000000..f9749c7e1cd --- /dev/null +++ b/internal/component/loki/process/stages/types.go @@ -0,0 +1,38 @@ +package stages + +import ( + "encoding" + "fmt" +) + +type SourceType string + +const ( + SourceTypeLine SourceType = "line" + SourceTypeLabel SourceType = "label" + SourceTypeStructuredMetadata SourceType = "structured_metadata" + SourceTypeExtractedMap SourceType = "extracted" +) + +var ( + _ encoding.TextMarshaler = SourceType("") + _ encoding.TextUnmarshaler = (*SourceType)(nil) +) + +// UnmarshalText implements encoding.TextUnmarshaler. +func (t *SourceType) UnmarshalText(text []byte) error { + str := string(text) + switch str { + case string(SourceTypeLine), string(SourceTypeLabel), string(SourceTypeStructuredMetadata), string(SourceTypeExtractedMap): + *t = SourceType(str) + default: + return fmt.Errorf("unknown source_type: %s", str) + } + + return nil +} + +// MarshalText implements encoding.TextMarshaler. +func (t SourceType) MarshalText() (text []byte, err error) { + return []byte(t), nil +} From 5945375e30713082aa5a4b725e80ed8562d5e8e7 Mon Sep 17 00:00:00 2001 From: Matthias Baur Date: Wed, 18 Mar 2026 08:18:38 +0100 Subject: [PATCH 4/5] Revert back to validateLabelsConfig returning a map --- .../component/loki/process/stages/labels.go | 41 +++++++++++-------- .../loki/process/stages/labels_test.go | 22 +++++----- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/internal/component/loki/process/stages/labels.go b/internal/component/loki/process/stages/labels.go index 20b68c14000..b0cd4caf7cc 100644 --- a/internal/component/loki/process/stages/labels.go +++ b/internal/component/loki/process/stages/labels.go @@ -28,9 +28,9 @@ type LabelsConfig struct { } // validateLabelsConfig validates the Label stage configuration -func validateLabelsConfig(cfg *LabelsConfig) error { +func validateLabelsConfig(cfg *LabelsConfig) (map[string]string, error) { if cfg.Values == nil { - return errors.New(ErrEmptyLabelStageConfig) + return nil, errors.New(ErrEmptyLabelStageConfig) } if cfg.SourceType == "" { @@ -40,44 +40,49 @@ func validateLabelsConfig(cfg *LabelsConfig) error { switch cfg.SourceType { case SourceTypeExtractedMap, SourceTypeStructuredMetadata: default: - return fmt.Errorf(ErrInvalidSourceType, cfg.SourceType) + return nil, fmt.Errorf(ErrInvalidSourceType, cfg.SourceType) } // We must not mutate the c.Values, create a copy with changes we need. - returnValues := map[string]*string{} + ret := map[string]string{} + if cfg.Values == nil { + return nil, errors.New(ErrEmptyLabelStageConfig) + } for labelName, labelSrc := range cfg.Values { // TODO: add support for different validation schemes. //nolint:staticcheck if !model.LabelName(labelName).IsValid() { - return fmt.Errorf(ErrInvalidLabelName, labelName) + return nil, fmt.Errorf(ErrInvalidLabelName, labelName) } // If no label source was specified, use the key name if labelSrc == nil || *labelSrc == "" { - returnValues[labelName] = &labelName + ret[labelName] = labelName } else { - returnValues[labelName] = labelSrc + ret[labelName] = *labelSrc } } - cfg.Values = returnValues - return nil + + return ret, nil } // newLabelStage creates a new label stage to set labels from extracted data func newLabelStage(logger log.Logger, configs LabelsConfig) (Stage, error) { - err := validateLabelsConfig(&configs) + labelsConfig, err := validateLabelsConfig(&configs) if err != nil { return nil, err } return &labelStage{ - cfg: &configs, - logger: logger, + cfg: &configs, + labelsConfig: labelsConfig, + logger: logger, }, nil } // labelStage sets labels from extracted data type labelStage struct { - cfg *LabelsConfig - logger log.Logger + cfg *LabelsConfig + labelsConfig map[string]string + logger log.Logger } // Run implements Stage @@ -99,8 +104,8 @@ func (l *labelStage) Run(in chan Entry) chan Entry { } func (l *labelStage) addLabelFromExtractedMap(labels model.LabelSet, extracted map[string]any) { - for lName, lSrc := range l.cfg.Values { - if lValue, ok := extracted[*lSrc]; ok { + for lName, lSrc := range l.labelsConfig { + if lValue, ok := extracted[lSrc]; ok { s, err := getString(lValue) if err != nil { if Debug { @@ -122,9 +127,9 @@ func (l *labelStage) addLabelFromExtractedMap(labels model.LabelSet, extracted m } func (l *labelStage) addLabelsFromStructuredMetadata(labels model.LabelSet, metadata push.LabelsAdapter) { - for lName, lSrc := range l.cfg.Values { + for lName, lSrc := range l.labelsConfig { for _, kv := range metadata { - if kv.Name != *lSrc { + if kv.Name != lSrc { continue } diff --git a/internal/component/loki/process/stages/labels_test.go b/internal/component/loki/process/stages/labels_test.go index ddb3ff09e35..a3690ac089c 100644 --- a/internal/component/loki/process/stages/labels_test.go +++ b/internal/component/loki/process/stages/labels_test.go @@ -118,7 +118,7 @@ func TestLabels(t *testing.T) { tests := map[string]struct { config LabelsConfig err error - expectedCfgs map[string]*string + expectedCfgs map[string]string }{ "missing config": { config: emptyLabelsConfig, @@ -149,10 +149,10 @@ func TestLabels(t *testing.T) { "l3": &lv3, }}, err: nil, - expectedCfgs: map[string]*string{ - "l1": &lv1, - "l2": ptr("l2"), - "l3": ptr("l3"), + expectedCfgs: map[string]string{ + "l1": lv1, + "l2": "l2", + "l3": "l3", }, }, "label value is set from name for structured_metadata": { @@ -164,10 +164,10 @@ func TestLabels(t *testing.T) { "l3": &lv3, }}, err: nil, - expectedCfgs: map[string]*string{ - "l1": &lv1, - "l2": ptr("l2"), - "l3": ptr("l3"), + expectedCfgs: map[string]string{ + "l1": lv1, + "l2": "l2", + "l3": "l3", }, }, } @@ -175,7 +175,7 @@ func TestLabels(t *testing.T) { test := test t.Run(name, func(t *testing.T) { t.Parallel() - err := validateLabelsConfig(&test.config) + actual, err := validateLabelsConfig(&test.config) if (err != nil) != (test.err != nil) { t.Errorf("validateLabelsConfig() expected error = %v, actual error = %v", test.err, err) return @@ -185,7 +185,7 @@ func TestLabels(t *testing.T) { return } if test.expectedCfgs != nil { - assert.Equal(t, test.expectedCfgs, test.config.Values) + assert.Equal(t, test.expectedCfgs, actual) } }) } From 49c2cb35f2116fb071df6637687ec2711d7d0678 Mon Sep 17 00:00:00 2001 From: Matthias Baur Date: Thu, 19 Mar 2026 17:34:45 +0100 Subject: [PATCH 5/5] Use centralized SourceType in limits_config.go as well --- .../internal/promtailconvert/internal/build/limits_config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/converter/internal/promtailconvert/internal/build/limits_config.go b/internal/converter/internal/promtailconvert/internal/build/limits_config.go index 8b018935803..b1632b2911c 100644 --- a/internal/converter/internal/promtailconvert/internal/build/limits_config.go +++ b/internal/converter/internal/promtailconvert/internal/build/limits_config.go @@ -44,7 +44,7 @@ func buildLimitsConfigStages(cfg limit.Config) []stages.StageConfig { Rules: []*stages.RuleConfig{ { Limit: lineSizeBytes, - SourceType: stages.TruncateSourceLine, + SourceType: stages.SourceTypeLine, }, }, },