@@ -4,92 +4,145 @@ import (
44 "errors"
55 "fmt"
66 "reflect"
7- "time"
87
98 "github.com/go-kit/log"
109 "github.com/prometheus/common/model"
1110
1211 "github.com/grafana/alloy/internal/runtime/logging/level"
12+ "github.com/grafana/loki/pkg/push"
1313)
1414
1515const (
1616 ErrEmptyLabelStageConfig = "label stage config cannot be empty"
1717 ErrInvalidLabelName = "invalid label name: %s"
18+ ErrInvalidSourceType = "invalid labels source_type: %s. Can only be 'extracted' or 'structured_metadata'"
19+
20+ LabelsSourceStructuredMetadata string = "structured_metadata"
21+ LabelsSourceExtractedMap string = "extracted"
1822)
1923
2024// LabelsConfig is a set of labels to be extracted
2125type LabelsConfig struct {
22- Values map [string ]* string `alloy:"values,attr"`
26+ Values map [string ]* string `alloy:"values,attr"`
27+ SourceType string `alloy:"source_type,attr,optional"`
2328}
2429
2530// validateLabelsConfig validates the Label stage configuration
26- func validateLabelsConfig (c map [string ]* string ) (map [string ]string , error ) {
27- // We must not mutate the c.Values, create a copy with changes we need.
28- ret := map [string ]string {}
29- if c == nil {
30- return nil , errors .New (ErrEmptyLabelStageConfig )
31+ func validateLabelsConfig (cfg * LabelsConfig ) error {
32+ if cfg .Values == nil {
33+ return errors .New (ErrEmptyLabelStageConfig )
34+ }
35+
36+ if cfg .SourceType == "" {
37+ cfg .SourceType = LabelsSourceExtractedMap
3138 }
32- for labelName , labelSrc := range c {
39+
40+ switch cfg .SourceType {
41+ case LabelsSourceExtractedMap , LabelsSourceStructuredMetadata :
42+ default :
43+ return fmt .Errorf (ErrInvalidSourceType , cfg .SourceType )
44+ }
45+
46+ // We must not mutate the c.Values, create a copy with changes we need.
47+ returnValues := map [string ]* string {}
48+ for labelName , labelSrc := range cfg .Values {
3349 // TODO: add support for different validation schemes.
3450 //nolint:staticcheck
3551 if ! model .LabelName (labelName ).IsValid () {
36- return nil , fmt .Errorf (ErrInvalidLabelName , labelName )
52+ return fmt .Errorf (ErrInvalidLabelName , labelName )
3753 }
3854 // If no label source was specified, use the key name
3955 if labelSrc == nil || * labelSrc == "" {
40- ret [labelName ] = labelName
56+ returnValues [labelName ] = & labelName
4157 } else {
42- ret [labelName ] = * labelSrc
58+ returnValues [labelName ] = labelSrc
4359 }
4460 }
45- return ret , nil
61+ cfg .Values = returnValues
62+ return nil
4663}
4764
4865// newLabelStage creates a new label stage to set labels from extracted data
4966func newLabelStage (logger log.Logger , configs LabelsConfig ) (Stage , error ) {
50- labelsConfig , err := validateLabelsConfig (configs . Values )
67+ err := validateLabelsConfig (& configs )
5168 if err != nil {
5269 return nil , err
5370 }
54- return toStage ( & labelStage {
55- labelsConfig : labelsConfig ,
56- logger : logger ,
57- }) , nil
71+ return & labelStage {
72+ cfg : & configs ,
73+ logger : logger ,
74+ }, nil
5875}
5976
6077// labelStage sets labels from extracted data
6178type labelStage struct {
62- labelsConfig map [ string ] string
63- logger log.Logger
79+ cfg * LabelsConfig
80+ logger log.Logger
6481}
6582
66- // Process implements Stage
67- func (l * labelStage ) Process (labels model.LabelSet , extracted map [string ]any , _ * time.Time , _ * string ) {
68- processLabelsConfigs (l .logger , extracted , l .labelsConfig , func (labelName model.LabelName , labelValue model.LabelValue ) {
69- labels [labelName ] = labelValue
70- })
83+ // Run implements Stage
84+ func (l * labelStage ) Run (in chan Entry ) chan Entry {
85+ out := make (chan Entry )
86+ go func () {
87+ defer close (out )
88+ for e := range in {
89+ switch l .cfg .SourceType {
90+ case LabelsSourceExtractedMap :
91+ l .addLabelFromExtractedMap (e .Labels , e .Extracted )
92+ case LabelsSourceStructuredMetadata :
93+ l .addLabelsFromStructuredMetadata (e .Labels , e .StructuredMetadata )
94+ }
95+ out <- e
96+ }
97+ }()
98+ return out
7199}
72100
73- type labelsConsumer func (labelName model.LabelName , labelValue model.LabelValue )
74-
75- func processLabelsConfigs (logger log.Logger , extracted map [string ]any , labelsConfig map [string ]string , consumer labelsConsumer ) {
76- for lName , lSrc := range labelsConfig {
77- if lValue , ok := extracted [lSrc ]; ok {
101+ func (l * labelStage ) addLabelFromExtractedMap (labels model.LabelSet , extracted map [string ]any ) {
102+ for lName , lSrc := range l .cfg .Values {
103+ if lValue , ok := extracted [* lSrc ]; ok {
78104 s , err := getString (lValue )
79105 if err != nil {
80106 if Debug {
81- level .Debug (logger ).Log ("msg" , "failed to convert extracted label value to string" , "err" , err , "type" , reflect .TypeOf (lValue ))
107+ level .Debug (l . logger ).Log ("msg" , "failed to convert extracted label value to string" , "err" , err , "type" , reflect .TypeOf (lValue ))
82108 }
83109 continue
84110 }
85111 labelValue := model .LabelValue (s )
86112 if ! labelValue .IsValid () {
87113 if Debug {
88- level .Debug (logger ).Log ("msg" , "invalid label value parsed" , "value" , labelValue )
114+ level .Debug (l . logger ).Log ("msg" , "invalid label value parsed" , "value" , labelValue )
89115 }
90116 continue
91117 }
92- consumer (model .LabelName (lName ), labelValue )
118+
119+ labels [model .LabelName (lName )] = labelValue
120+ }
121+ }
122+ }
123+
124+ func (l * labelStage ) addLabelsFromStructuredMetadata (labels model.LabelSet , metadata push.LabelsAdapter ) {
125+ for lName , lSrc := range l .cfg .Values {
126+ for _ , kv := range metadata {
127+ if kv .Name != * lSrc {
128+ continue
129+ }
130+
131+ labelValue := model .LabelValue (kv .Value )
132+ if ! labelValue .IsValid () {
133+ if Debug {
134+ level .Debug (l .logger ).Log ("msg" , "invalid structured metadata label value" , "label" , lName , "value" , labelValue )
135+ }
136+ break
137+ }
138+
139+ labels [model .LabelName (lName )] = labelValue
140+ break
93141 }
94142 }
95143}
144+
145+ // Cleanup implements Stage.
146+ func (* labelStage ) Cleanup () {
147+ // no-op
148+ }
0 commit comments