-
Notifications
You must be signed in to change notification settings - Fork 579
feat(loki.process): Support structured metadata as source type of stage.labels for loki.process #5055
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(loki.process): Support structured metadata as source type of stage.labels for loki.process #5055
Changes from all commits
4e6c224
712bf39
9164d17
5945375
49c2cb3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,32 +4,51 @@ import ( | |
| "errors" | ||
| "fmt" | ||
| "reflect" | ||
| "time" | ||
|
|
||
| "github.com/go-kit/log" | ||
| "github.com/prometheus/common/model" | ||
|
|
||
| "github.com/grafana/alloy/internal/runtime/logging/level" | ||
| "github.com/grafana/loki/pkg/push" | ||
| ) | ||
|
|
||
| const ( | ||
| ErrEmptyLabelStageConfig = "label stage config cannot be empty" | ||
| ErrInvalidLabelName = "invalid label name: %s" | ||
| ErrInvalidSourceType = "invalid labels source_type: %s. Can only be 'extracted' or 'structured_metadata'" | ||
|
|
||
| LabelsSourceStructuredMetadata string = "structured_metadata" | ||
| LabelsSourceExtractedMap string = "extracted" | ||
| ) | ||
|
|
||
| // LabelsConfig is a set of labels to be extracted | ||
| type LabelsConfig struct { | ||
| Values map[string]*string `alloy:"values,attr"` | ||
| Values map[string]*string `alloy:"values,attr"` | ||
| SourceType SourceType `alloy:"source_type,attr,optional"` | ||
| } | ||
|
|
||
| // validateLabelsConfig validates the Label stage configuration | ||
| func validateLabelsConfig(c map[string]*string) (map[string]string, error) { | ||
| func validateLabelsConfig(cfg *LabelsConfig) (map[string]string, error) { | ||
| if cfg.Values == nil { | ||
| return nil, errors.New(ErrEmptyLabelStageConfig) | ||
| } | ||
|
|
||
| if cfg.SourceType == "" { | ||
| cfg.SourceType = SourceTypeExtractedMap | ||
| } | ||
|
|
||
| switch cfg.SourceType { | ||
| case SourceTypeExtractedMap, SourceTypeStructuredMetadata: | ||
| default: | ||
| return nil, fmt.Errorf(ErrInvalidSourceType, cfg.SourceType) | ||
| } | ||
|
|
||
| // We must not mutate the c.Values, create a copy with changes we need. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have this comment but then we still change c.Values to the new map, why was this change necessary?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was implemented to get access to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure but we should still not set new values for c.Values but return a map like we did before this change. We can keep both in labelStage
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, will change it! Does this mean I'm not supposed to change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally not.
Ideally we use these but I have noticed that a lot of stage configs don't do it but I don't want to block this work further so lets just create the map like it did before and return it
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the clarification! :) I've reverted the changes from |
||
| ret := map[string]string{} | ||
| if c == nil { | ||
| if cfg.Values == nil { | ||
| return nil, errors.New(ErrEmptyLabelStageConfig) | ||
| } | ||
| for labelName, labelSrc := range c { | ||
| for labelName, labelSrc := range cfg.Values { | ||
| // TODO: add support for different validation schemes. | ||
| //nolint:staticcheck | ||
| if !model.LabelName(labelName).IsValid() { | ||
|
|
@@ -42,54 +61,93 @@ func validateLabelsConfig(c map[string]*string) (map[string]string, error) { | |
| ret[labelName] = *labelSrc | ||
| } | ||
| } | ||
|
|
||
| return ret, nil | ||
| } | ||
|
|
||
| // newLabelStage creates a new label stage to set labels from extracted data | ||
| func newLabelStage(logger log.Logger, configs LabelsConfig) (Stage, error) { | ||
| labelsConfig, err := validateLabelsConfig(configs.Values) | ||
| labelsConfig, err := validateLabelsConfig(&configs) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return toStage(&labelStage{ | ||
| return &labelStage{ | ||
| cfg: &configs, | ||
| labelsConfig: labelsConfig, | ||
| logger: logger, | ||
| }), nil | ||
| }, nil | ||
| } | ||
|
|
||
| // labelStage sets labels from extracted data | ||
| type labelStage struct { | ||
| cfg *LabelsConfig | ||
| labelsConfig map[string]string | ||
| logger log.Logger | ||
| } | ||
|
|
||
| // Process implements Stage | ||
| func (l *labelStage) Process(labels model.LabelSet, extracted map[string]any, _ *time.Time, _ *string) { | ||
| processLabelsConfigs(l.logger, extracted, l.labelsConfig, func(labelName model.LabelName, labelValue model.LabelValue) { | ||
| labels[labelName] = labelValue | ||
| }) | ||
| // Run implements Stage | ||
| func (l *labelStage) Run(in chan Entry) chan Entry { | ||
|
kalleep marked this conversation as resolved.
|
||
| out := make(chan Entry) | ||
| go func() { | ||
| defer close(out) | ||
| for e := range in { | ||
| switch l.cfg.SourceType { | ||
| case SourceTypeExtractedMap: | ||
| l.addLabelFromExtractedMap(e.Labels, e.Extracted) | ||
| case SourceTypeStructuredMetadata: | ||
| l.addLabelsFromStructuredMetadata(e.Labels, e.StructuredMetadata) | ||
|
kalleep marked this conversation as resolved.
|
||
| } | ||
| out <- e | ||
| } | ||
| }() | ||
| return out | ||
| } | ||
|
|
||
| type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue) | ||
|
|
||
| func processLabelsConfigs(logger log.Logger, extracted map[string]any, labelsConfig map[string]string, consumer labelsConsumer) { | ||
| for lName, lSrc := range labelsConfig { | ||
| func (l *labelStage) addLabelFromExtractedMap(labels model.LabelSet, extracted map[string]any) { | ||
| for lName, lSrc := range l.labelsConfig { | ||
| if lValue, ok := extracted[lSrc]; ok { | ||
| s, err := getString(lValue) | ||
| if err != nil { | ||
| if Debug { | ||
| level.Debug(logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) | ||
| level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) | ||
| } | ||
| continue | ||
| } | ||
| labelValue := model.LabelValue(s) | ||
| if !labelValue.IsValid() { | ||
| if Debug { | ||
| level.Debug(logger).Log("msg", "invalid label value parsed", "value", labelValue) | ||
| level.Debug(l.logger).Log("msg", "invalid label value parsed", "value", labelValue) | ||
| } | ||
| continue | ||
| } | ||
| consumer(model.LabelName(lName), labelValue) | ||
|
|
||
| labels[model.LabelName(lName)] = labelValue | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (l *labelStage) addLabelsFromStructuredMetadata(labels model.LabelSet, metadata push.LabelsAdapter) { | ||
| for lName, lSrc := range l.labelsConfig { | ||
| for _, kv := range metadata { | ||
| if kv.Name != lSrc { | ||
| continue | ||
| } | ||
|
|
||
| labelValue := model.LabelValue(kv.Value) | ||
| if !labelValue.IsValid() { | ||
| if Debug { | ||
| level.Debug(l.logger).Log("msg", "invalid structured metadata label value", "label", lName, "value", labelValue) | ||
| } | ||
| break | ||
| } | ||
|
|
||
| labels[model.LabelName(lName)] = labelValue | ||
| break | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Cleanup implements Stage. | ||
| func (*labelStage) Cleanup() { | ||
| // no-op | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example should include the recommendation to strip the moved attributes with a stage.structured_metadata_drop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.