diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index cf26f176853..3e38f9cd0df 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -582,15 +582,16 @@ stage.label_keep { ### `stage.labels` -The `stage.labels` inner block configures a labels processing stage that can read data from the extracted values map and set new labels on incoming log entries. +The `stage.labels` inner block configures a labels processing stage that can read data from the extracted values map or structured metadata and set new labels on incoming log entries. For labels that are static, refer to [`stage.static_labels`][stage.static_labels] The following arguments are supported: -| Name | Type | Description | Default | Required | -| -------- | ------------- | --------------------------------------- | ------- | -------- | -| `values` | `map(string)` | Configures a `labels` processing stage. | `{}` | no | +| Name | Type | Description | Default | Required | +| ------------- | ------------- | -------------------------------------------------------------------------------------------------------- | ------------- | -------- | +| `values` | `map(string)` | Configures a `labels` processing stage. | `{}` | no | +| `source_type` | `string` | Where to retrieve the data from. Allowed values are `"extracted"` (default) or `"structured_metadata"`. | `"extracted"` | no | In a labels stage, the map's keys define the label to set and the values are how to look them up. If the value is empty, it's inferred to be the same as the key. @@ -604,6 +605,21 @@ stage.labels { } ``` +```alloy +stage.labels { + source_type = "structured_metadata" + values = { + env = "", // Sets up an 'env' label, based on the 'env' structured metadata value. + user = "username", // Sets up a 'user' label, based on the 'username' structured metadata value. + } +} + +// Drop the converted structured metadata +stage.structured_metadata_drop { + values = [ "env", "username" ] +} +``` + ### `stage.limit` The `stage.limit` inner block configures a rate-limiting stage that throttles logs based on several options. diff --git a/internal/component/loki/process/stages/labels.go b/internal/component/loki/process/stages/labels.go index 18ecd06af98..b0cd4caf7cc 100644 --- a/internal/component/loki/process/stages/labels.go +++ b/internal/component/loki/process/stages/labels.go @@ -4,32 +4,51 @@ import ( "errors" "fmt" "reflect" - "time" "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/loki/pkg/push" ) const ( ErrEmptyLabelStageConfig = "label stage config cannot be empty" ErrInvalidLabelName = "invalid label name: %s" + ErrInvalidSourceType = "invalid labels source_type: %s. Can only be 'extracted' or 'structured_metadata'" + + LabelsSourceStructuredMetadata string = "structured_metadata" + LabelsSourceExtractedMap string = "extracted" ) // LabelsConfig is a set of labels to be extracted type LabelsConfig struct { - Values map[string]*string `alloy:"values,attr"` + Values map[string]*string `alloy:"values,attr"` + SourceType SourceType `alloy:"source_type,attr,optional"` } // validateLabelsConfig validates the Label stage configuration -func validateLabelsConfig(c map[string]*string) (map[string]string, error) { +func validateLabelsConfig(cfg *LabelsConfig) (map[string]string, error) { + if cfg.Values == nil { + return nil, errors.New(ErrEmptyLabelStageConfig) + } + + if cfg.SourceType == "" { + cfg.SourceType = SourceTypeExtractedMap + } + + switch cfg.SourceType { + case SourceTypeExtractedMap, SourceTypeStructuredMetadata: + default: + return nil, fmt.Errorf(ErrInvalidSourceType, cfg.SourceType) + } + // We must not mutate the c.Values, create a copy with changes we need. ret := map[string]string{} - if c == nil { + if cfg.Values == nil { return nil, errors.New(ErrEmptyLabelStageConfig) } - for labelName, labelSrc := range c { + for labelName, labelSrc := range cfg.Values { // TODO: add support for different validation schemes. //nolint:staticcheck if !model.LabelName(labelName).IsValid() { @@ -42,54 +61,93 @@ func validateLabelsConfig(c map[string]*string) (map[string]string, error) { ret[labelName] = *labelSrc } } + return ret, nil } // newLabelStage creates a new label stage to set labels from extracted data func newLabelStage(logger log.Logger, configs LabelsConfig) (Stage, error) { - labelsConfig, err := validateLabelsConfig(configs.Values) + labelsConfig, err := validateLabelsConfig(&configs) if err != nil { return nil, err } - return toStage(&labelStage{ + return &labelStage{ + cfg: &configs, labelsConfig: labelsConfig, logger: logger, - }), nil + }, nil } // labelStage sets labels from extracted data type labelStage struct { + cfg *LabelsConfig labelsConfig map[string]string logger log.Logger } -// Process implements Stage -func (l *labelStage) Process(labels model.LabelSet, extracted map[string]any, _ *time.Time, _ *string) { - processLabelsConfigs(l.logger, extracted, l.labelsConfig, func(labelName model.LabelName, labelValue model.LabelValue) { - labels[labelName] = labelValue - }) +// Run implements Stage +func (l *labelStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range in { + switch l.cfg.SourceType { + case SourceTypeExtractedMap: + l.addLabelFromExtractedMap(e.Labels, e.Extracted) + case SourceTypeStructuredMetadata: + l.addLabelsFromStructuredMetadata(e.Labels, e.StructuredMetadata) + } + out <- e + } + }() + return out } -type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue) - -func processLabelsConfigs(logger log.Logger, extracted map[string]any, labelsConfig map[string]string, consumer labelsConsumer) { - for lName, lSrc := range labelsConfig { +func (l *labelStage) addLabelFromExtractedMap(labels model.LabelSet, extracted map[string]any) { + for lName, lSrc := range l.labelsConfig { if lValue, ok := extracted[lSrc]; ok { s, err := getString(lValue) if err != nil { if Debug { - level.Debug(logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) + level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) } continue } labelValue := model.LabelValue(s) if !labelValue.IsValid() { if Debug { - level.Debug(logger).Log("msg", "invalid label value parsed", "value", labelValue) + level.Debug(l.logger).Log("msg", "invalid label value parsed", "value", labelValue) } continue } - consumer(model.LabelName(lName), labelValue) + + labels[model.LabelName(lName)] = labelValue } } } + +func (l *labelStage) addLabelsFromStructuredMetadata(labels model.LabelSet, metadata push.LabelsAdapter) { + for lName, lSrc := range l.labelsConfig { + for _, kv := range metadata { + if kv.Name != lSrc { + continue + } + + labelValue := model.LabelValue(kv.Value) + if !labelValue.IsValid() { + if Debug { + level.Debug(l.logger).Log("msg", "invalid structured metadata label value", "label", lName, "value", labelValue) + } + break + } + + labels[model.LabelName(lName)] = labelValue + break + } + } +} + +// Cleanup implements Stage. +func (*labelStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/labels_test.go b/internal/component/loki/process/stages/labels_test.go index 1ef78aad723..a3690ac089c 100644 --- a/internal/component/loki/process/stages/labels_test.go +++ b/internal/component/loki/process/stages/labels_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/loki/pkg/push" ) var testLabelsYaml = ` stage.json { @@ -39,7 +40,29 @@ var testLabelsLogLineWithMissingKey = ` } ` -func TestLabelsPipeline_Labels(t *testing.T) { +var testLabelsStrucuturedMetadataYaml = ` +// Create strucutured metadata +stage.static_labels { + values = { + "foo" = "bar", + } +} +stage.structured_metadata { + values = { + "baz" = "foo", + } +} + +// Create label from structured metadata +stage.labels { + source_type = "structured_metadata" + values = { + "from_structured" = "baz", + } +} +` + +func TestLabelsPipeline_LabelsFromExtracted(t *testing.T) { pl, err := NewPipeline(log.NewNopLogger(), loadConfig(testLabelsYaml), prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) if err != nil { t.Fatal(err) @@ -53,6 +76,19 @@ func TestLabelsPipeline_Labels(t *testing.T) { assert.Equal(t, expectedLbls, out.Labels) } +func TestLabelsPipeline_LabelsFromStructuredMetadata(t *testing.T) { + pl, err := NewPipeline(log.NewNopLogger(), loadConfig(testLabelsStrucuturedMetadataYaml), prometheus.DefaultRegisterer, featuregate.StabilityGenerallyAvailable) + if err != nil { + t.Fatal(err) + } + expectedLbls := model.LabelSet{ + "from_structured": "bar", + } + + out := processEntries(pl, newEntry(nil, nil, "", time.Now()))[0] + assert.Equal(t, expectedLbls, out.Labels) +} + func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) { var buf bytes.Buffer w := log.NewSyncWriter(&buf) @@ -76,7 +112,7 @@ var ( lv3 = "" ) -var emptyLabelsConfig = LabelsConfig{nil} +var emptyLabelsConfig = LabelsConfig{nil, ""} func TestLabels(t *testing.T) { tests := map[string]struct { @@ -96,12 +132,37 @@ func TestLabels(t *testing.T) { err: fmt.Errorf(ErrInvalidLabelName, "\xfd"), expectedCfgs: nil, }, - "label value is set from name": { - config: LabelsConfig{Values: map[string]*string{ - "l1": &lv1, - "l2": nil, - "l3": &lv3, - }}, + "invalid source type": { + config: LabelsConfig{ + Values: map[string]*string{"l1": ptr("")}, + SourceType: "invalid_source_type", + }, + err: fmt.Errorf("invalid labels source_type: %s. Can only be 'extracted' or 'structured_metadata'", "invalid_source_type"), + expectedCfgs: nil, + }, + "label value is set from name for extracted": { + config: LabelsConfig{ + SourceType: SourceTypeExtractedMap, + Values: map[string]*string{ + "l1": &lv1, + "l2": nil, + "l3": &lv3, + }}, + err: nil, + expectedCfgs: map[string]string{ + "l1": lv1, + "l2": "l2", + "l3": "l3", + }, + }, + "label value is set from name for structured_metadata": { + config: LabelsConfig{ + SourceType: SourceTypeStructuredMetadata, + Values: map[string]*string{ + "l1": &lv1, + "l2": nil, + "l3": &lv3, + }}, err: nil, expectedCfgs: map[string]string{ "l1": lv1, @@ -114,7 +175,7 @@ func TestLabels(t *testing.T) { test := test t.Run(name, func(t *testing.T) { t.Parallel() - actual, err := validateLabelsConfig(test.config.Values) + actual, err := validateLabelsConfig(&test.config) if (err != nil) != (test.err != nil) { t.Errorf("validateLabelsConfig() expected error = %v, actual error = %v", test.err, err) return @@ -130,33 +191,66 @@ func TestLabels(t *testing.T) { } } -func TestLabelStage_Process(t *testing.T) { +func TestLabelsStage_Process(t *testing.T) { sourceName := "diff_source" tests := map[string]struct { - config LabelsConfig - extractedData map[string]any - inputLabels model.LabelSet - expectedLabels model.LabelSet + config LabelsConfig + extractedData map[string]any + strcturedMetadata push.LabelsAdapter + inputLabels model.LabelSet + expectedLabels model.LabelSet }{ - "extract_success": { + "extract_success_extracted": { LabelsConfig{Values: map[string]*string{ "testLabel": nil, }}, map[string]any{ "testLabel": "testValue", }, + push.LabelsAdapter{}, model.LabelSet{}, model.LabelSet{ "testLabel": "testValue", }, }, - "different_source_name": { + "extract_success_structured_metadata": { + LabelsConfig{ + SourceType: SourceTypeStructuredMetadata, + Values: map[string]*string{ + "testLabel": ptr("testStrucuturedMetadata"), + }}, + map[string]any{}, + push.LabelsAdapter{ + push.LabelAdapter{Name: "testStrucuturedMetadata", Value: "testValue"}, + }, + model.LabelSet{}, + model.LabelSet{ + "testLabel": "testValue", + }, + }, + "different_source_name_extracted": { LabelsConfig{Values: map[string]*string{ "testLabel": &sourceName, }}, map[string]any{ sourceName: "testValue", }, + push.LabelsAdapter{}, + model.LabelSet{}, + model.LabelSet{ + "testLabel": "testValue", + }, + }, + "different_source_name_structured_metadata": { + LabelsConfig{ + SourceType: SourceTypeStructuredMetadata, + Values: map[string]*string{ + "testLabel": &sourceName, + }}, + map[string]any{}, + push.LabelsAdapter{ + push.LabelAdapter{Name: sourceName, Value: "testValue"}, + }, model.LabelSet{}, model.LabelSet{ "testLabel": "testValue", @@ -167,6 +261,18 @@ func TestLabelStage_Process(t *testing.T) { "testLabel": &sourceName, }}, map[string]any{}, + push.LabelsAdapter{}, + model.LabelSet{}, + model.LabelSet{}, + }, + "empty_structured_metadata": { + LabelsConfig{ + SourceType: SourceTypeStructuredMetadata, + Values: map[string]*string{ + "testLabel": &sourceName, + }}, + map[string]any{}, + push.LabelsAdapter{}, model.LabelSet{}, model.LabelSet{}, }, @@ -180,7 +286,9 @@ func TestLabelStage_Process(t *testing.T) { t.Fatal(err) } - out := processEntries(st, newEntry(test.extractedData, test.inputLabels, "", time.Time{}))[0] + entry := newEntry(test.extractedData, test.inputLabels, "", time.Time{}) + entry.StructuredMetadata = test.strcturedMetadata + out := processEntries(st, entry)[0] assert.Equal(t, test.expectedLabels, out.Labels) }) } diff --git a/internal/component/loki/process/stages/structured_metadata.go b/internal/component/loki/process/stages/structured_metadata.go index 03d0576fb1f..008f936d776 100644 --- a/internal/component/loki/process/stages/structured_metadata.go +++ b/internal/component/loki/process/stages/structured_metadata.go @@ -1,6 +1,8 @@ package stages import ( + "errors" + "fmt" "reflect" "regexp" @@ -15,12 +17,35 @@ type StructuredMetadataConfig struct { Regex string `alloy:"regex,attr,optional"` } +// validateStructuredMetadataConfig validates the structured metadata stage config. +func validateStructuredMetadataConfig(c map[string]*string) (map[string]string, error) { + // We must not mutate the c.Values, create a copy with changes we need. + ret := map[string]string{} + if c == nil { + return nil, errors.New(ErrEmptyLabelStageConfig) + } + for labelName, labelSrc := range c { + // TODO: add support for different validation schemes. + //nolint:staticcheck + if !model.LabelName(labelName).IsValid() { + return nil, fmt.Errorf(ErrInvalidLabelName, labelName) + } + // If no label source was specified, use the key name + if labelSrc == nil || *labelSrc == "" { + ret[labelName] = labelName + } else { + ret[labelName] = *labelSrc + } + } + return ret, nil +} + func newStructuredMetadataStage(logger log.Logger, configs StructuredMetadataConfig) (Stage, error) { var validatedLabelsConfig map[string]string var err error if len(configs.Values) > 0 { - validatedLabelsConfig, err = validateLabelsConfig(configs.Values) + validatedLabelsConfig, err = validateStructuredMetadataConfig(configs.Values) if err != nil { return nil, err } @@ -81,6 +106,30 @@ func (s *structuredMetadataStage) Run(in chan Entry) chan Entry { }) } +type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue) + +func processLabelsConfigs(logger log.Logger, extracted map[string]any, labelsConfig map[string]string, consumer labelsConsumer) { + for lName, lSrc := range labelsConfig { + if lValue, ok := extracted[lSrc]; ok { + s, err := getString(lValue) + if err != nil { + if Debug { + level.Debug(logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) + } + continue + } + labelValue := model.LabelValue(s) + if !labelValue.IsValid() { + if Debug { + level.Debug(logger).Log("msg", "invalid label value parsed", "value", labelValue) + } + continue + } + consumer(model.LabelName(lName), labelValue) + } + } +} + func (s *structuredMetadataStage) extractFromLabels(e Entry) Entry { labels := e.Labels foundLabels := []model.LabelName{} diff --git a/internal/component/loki/process/stages/truncate.go b/internal/component/loki/process/stages/truncate.go index 3025ab8fa74..102b5b9b435 100644 --- a/internal/component/loki/process/stages/truncate.go +++ b/internal/component/loki/process/stages/truncate.go @@ -1,9 +1,7 @@ package stages import ( - "encoding" "errors" - "fmt" "maps" "slices" "strings" @@ -36,46 +34,14 @@ type TruncateConfig struct { } type RuleConfig struct { - Limit units.Base2Bytes `alloy:"limit,attr"` - Suffix string `alloy:"suffix,attr,optional"` - Sources []string `alloy:"sources,attr,optional"` - SourceType TruncateSourceType `alloy:"source_type,attr,optional"` + Limit units.Base2Bytes `alloy:"limit,attr"` + Suffix string `alloy:"suffix,attr,optional"` + Sources []string `alloy:"sources,attr,optional"` + SourceType SourceType `alloy:"source_type,attr,optional"` effectiveLimit units.Base2Bytes } -var ( - _ encoding.TextMarshaler = TruncateSourceType("") - _ encoding.TextUnmarshaler = (*TruncateSourceType)(nil) -) - -type TruncateSourceType string - -// UnmarshalText implements encoding.TextUnmarshaler. -func (t *TruncateSourceType) UnmarshalText(text []byte) error { - str := string(text) - switch str { - case string(TruncateSourceLine), string(TruncateSourceLabel), string(TruncateSourceStructuredMetadata), string(TruncateSourceExtractedMap): - *t = TruncateSourceType(str) - default: - return fmt.Errorf("unknown source_type: %s", str) - } - - return nil -} - -// MarshalText implements encoding.TextMarshaler. -func (t TruncateSourceType) MarshalText() (text []byte, err error) { - return []byte(t), nil -} - -const ( - TruncateSourceLine TruncateSourceType = "line" - TruncateSourceLabel TruncateSourceType = "label" - TruncateSourceStructuredMetadata TruncateSourceType = "structured_metadata" - TruncateSourceExtractedMap TruncateSourceType = "extracted" -) - // validateTruncateConfig validates the TruncateConfig for the truncateStage func validateTruncateConfig(cfg *TruncateConfig) error { if len(cfg.Rules) == 0 { @@ -90,10 +56,10 @@ func validateTruncateConfig(cfg *TruncateConfig) error { } if r.SourceType == "" { - r.SourceType = TruncateSourceLine + r.SourceType = SourceTypeLine } - if r.SourceType == TruncateSourceLine && len(r.Sources) > 0 { + if r.SourceType == SourceTypeLine && len(r.Sources) > 0 { return errors.New(errSourcesForLine) } @@ -138,7 +104,7 @@ func (m *truncateStage) Run(in chan Entry) chan Entry { truncated := map[string]struct{}{} for _, r := range m.cfg.Rules { switch r.SourceType { - case TruncateSourceLine: + case SourceTypeLine: if len(e.Line) > int(r.effectiveLimit) { e.Line = e.Line[:r.effectiveLimit] + r.Suffix markTruncated(m.truncatedCount, truncated, truncateLineField) @@ -147,7 +113,7 @@ func (m *truncateStage) Run(in chan Entry) chan Entry { level.Debug(m.logger).Log("msg", "line has been truncated", "limit", r.effectiveLimit, "truncated_line", e.Line) } } - case TruncateSourceLabel: + case SourceTypeLabel: if len(r.Sources) > 0 { for _, source := range r.Sources { name := model.LabelName(source) @@ -160,7 +126,7 @@ func (m *truncateStage) Run(in chan Entry) chan Entry { m.tryTruncateLabel(r, e.Labels, k, v, truncated) } } - case TruncateSourceStructuredMetadata: + case SourceTypeStructuredMetadata: if len(r.Sources) > 0 { for i, v := range e.StructuredMetadata { if slices.Contains(r.Sources, v.Name) { @@ -174,7 +140,7 @@ func (m *truncateStage) Run(in chan Entry) chan Entry { e.StructuredMetadata[i] = m.tryTruncateStructuredMetadata(r, v, truncated) } } - case TruncateSourceExtractedMap: + case SourceTypeExtractedMap: if len(r.Sources) > 0 { for _, source := range r.Sources { if v, ok := e.Extracted[source]; ok { diff --git a/internal/component/loki/process/stages/truncate_test.go b/internal/component/loki/process/stages/truncate_test.go index d617bdf887c..af9b631a38a 100644 --- a/internal/component/loki/process/stages/truncate_test.go +++ b/internal/component/loki/process/stages/truncate_test.go @@ -80,7 +80,7 @@ func Test_TruncateStage_Process(t *testing.T) { config: []*RuleConfig{ { Limit: 15, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Suffix: "[truncated]", }, }, @@ -96,7 +96,7 @@ func Test_TruncateStage_Process(t *testing.T) { config: []*RuleConfig{ { Limit: 15, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Suffix: "[truncated]", Sources: []string{"app"}, }, @@ -113,7 +113,7 @@ func Test_TruncateStage_Process(t *testing.T) { config: []*RuleConfig{ { Limit: 15, - SourceType: TruncateSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Suffix: "", }, }, @@ -132,7 +132,7 @@ func Test_TruncateStage_Process(t *testing.T) { config: []*RuleConfig{ { Limit: 15, - SourceType: TruncateSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Suffix: "", Sources: []string{"meta1"}, }, @@ -155,18 +155,18 @@ func Test_TruncateStage_Process(t *testing.T) { }, { Limit: 15, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Suffix: "[truncated]", Sources: []string{"app"}, }, { Limit: 15, - SourceType: TruncateSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Suffix: "", }, { Limit: 8, - SourceType: TruncateSourceExtractedMap, + SourceType: SourceTypeExtractedMap, Sources: []string{"field2"}, }, }, @@ -298,7 +298,7 @@ func Test_ValidateTruncateConfig(t *testing.T) { config: &TruncateConfig{ Rules: []*RuleConfig{{ Limit: 10, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Suffix: "...", }}, }, @@ -309,7 +309,7 @@ func Test_ValidateTruncateConfig(t *testing.T) { config: &TruncateConfig{ Rules: []*RuleConfig{{ Limit: 10, - SourceType: TruncateSourceStructuredMetadata, + SourceType: SourceTypeStructuredMetadata, Suffix: "...", }}, }, @@ -321,7 +321,7 @@ func Test_ValidateTruncateConfig(t *testing.T) { Rules: []*RuleConfig{ { Limit: 10, - SourceType: TruncateSourceLabel, + SourceType: SourceTypeLabel, Sources: []string{"app"}, Suffix: "...", }, diff --git a/internal/component/loki/process/stages/types.go b/internal/component/loki/process/stages/types.go new file mode 100644 index 00000000000..f9749c7e1cd --- /dev/null +++ b/internal/component/loki/process/stages/types.go @@ -0,0 +1,38 @@ +package stages + +import ( + "encoding" + "fmt" +) + +type SourceType string + +const ( + SourceTypeLine SourceType = "line" + SourceTypeLabel SourceType = "label" + SourceTypeStructuredMetadata SourceType = "structured_metadata" + SourceTypeExtractedMap SourceType = "extracted" +) + +var ( + _ encoding.TextMarshaler = SourceType("") + _ encoding.TextUnmarshaler = (*SourceType)(nil) +) + +// UnmarshalText implements encoding.TextUnmarshaler. +func (t *SourceType) UnmarshalText(text []byte) error { + str := string(text) + switch str { + case string(SourceTypeLine), string(SourceTypeLabel), string(SourceTypeStructuredMetadata), string(SourceTypeExtractedMap): + *t = SourceType(str) + default: + return fmt.Errorf("unknown source_type: %s", str) + } + + return nil +} + +// MarshalText implements encoding.TextMarshaler. +func (t SourceType) MarshalText() (text []byte, err error) { + return []byte(t), nil +} diff --git a/internal/converter/internal/promtailconvert/internal/build/limits_config.go b/internal/converter/internal/promtailconvert/internal/build/limits_config.go index 8b018935803..b1632b2911c 100644 --- a/internal/converter/internal/promtailconvert/internal/build/limits_config.go +++ b/internal/converter/internal/promtailconvert/internal/build/limits_config.go @@ -44,7 +44,7 @@ func buildLimitsConfigStages(cfg limit.Config) []stages.StageConfig { Rules: []*stages.RuleConfig{ { Limit: lineSizeBytes, - SourceType: stages.TruncateSourceLine, + SourceType: stages.SourceTypeLine, }, }, },