@@ -16,6 +16,7 @@ import (
1616 "log"
1717 "os"
1818 "slices"
19+ "sync"
1920
2021 "github.com/openconfig/gnmic/pkg/api/types"
2122 "github.com/openconfig/gnmic/pkg/api/utils"
@@ -38,6 +39,7 @@ type valueTag struct {
3839 Debug bool `mapstructure:"debug,omitempty" json:"debug,omitempty"`
3940 logger * log.Logger
4041
42+ m * sync.RWMutex
4143 applyRules []map [uint64 ]* applyRule
4244}
4345
@@ -49,7 +51,7 @@ type rule struct {
4951
5052func init () {
5153 formatters .Register (processorType , func () formatters.EventProcessor {
52- return & valueTag {logger : log .New (io .Discard , "" , 0 )}
54+ return & valueTag {m : new (sync. RWMutex ), logger : log .New (io .Discard , "" , 0 )}
5355 })
5456}
5557
@@ -69,7 +71,7 @@ func (vt *valueTag) Init(cfg interface{}, opts ...formatters.Option) error {
6971
7072 vt .applyRules = make ([]map [uint64 ]* applyRule , len (vt .Rules ))
7173 for i := range vt .applyRules {
72- vt .applyRules [i ] = make (map [uint64 ]* applyRule )
74+ vt .applyRules [i ] = make (map [uint64 ]* applyRule , 0 )
7375 }
7476
7577 if vt .logger .Writer () != io .Discard {
@@ -93,16 +95,29 @@ type applyRule struct {
9395}
9496
9597func (vt * valueTag ) Apply (evs ... * formatters.EventMsg ) []* formatters.EventMsg {
96- vt .updateApplyRules (evs )
97- for i , arSet := range vt .applyRules {
98- for _ , ar := range arSet {
99- for _ , ev := range evs {
98+ vt .m .Lock ()
99+ defer vt .m .Unlock ()
100+
101+ for _ , ev := range evs {
102+ for i , r := range vt .Rules {
103+ if v , ok := ev .Values [r .ValueName ]; ok {
104+ // calculate apply rule Key
105+ k := vt .applyRuleKey (ev .Tags , r )
106+ vt.applyRules [i ][k ] = & applyRule {
107+ tags : copyTags (ev .Tags ), // copy map
108+ value : v ,
109+ }
110+ if r .Consume {
111+ delete (ev .Values , r .ValueName )
112+ }
113+ }
114+ for _ , ar := range vt .applyRules [i ] {
100115 if includedIn (ar .tags , ev .Tags ) {
101116 switch v := ar .value .(type ) {
102117 case string :
103- ev .Tags [vt . Rules [ i ] .TagName ] = v
118+ ev .Tags [r .TagName ] = v
104119 default :
105- ev .Tags [vt . Rules [ i ] .TagName ] = fmt .Sprint (ar .value )
120+ ev .Tags [r .TagName ] = fmt .Sprint (ar .value )
106121 }
107122 }
108123 }
@@ -139,24 +154,6 @@ func includedIn(a, b map[string]string) bool {
139154
140155func (vt * valueTag ) WithProcessors (procs map [string ]map [string ]any ) {}
141156
142- func (vt * valueTag ) updateApplyRules (evs []* formatters.EventMsg ) {
143- for i , r := range vt .Rules {
144- for _ , ev := range evs {
145- if v , ok := ev .Values [r .ValueName ]; ok {
146- // calculate apply rule Key
147- k := vt .applyRuleKey (ev .Tags , r )
148- vt.applyRules [i ][k ] = & applyRule {
149- tags : copyTags (ev .Tags ), // copy map
150- value : v ,
151- }
152- if r .Consume {
153- delete (ev .Values , r .ValueName )
154- }
155- }
156- }
157- }
158- }
159-
160157// the apply rule key is a hash of the valueName and the event msg tags
161158func (vt * valueTag ) applyRuleKey (m map [string ]string , r * rule ) uint64 {
162159 keys := make ([]string , 0 , len (m ))
0 commit comments