@@ -17,6 +17,7 @@ import (
1717 "io"
1818 "log"
1919 "net"
20+ "slices"
2021 "strconv"
2122 "strings"
2223 "sync"
@@ -45,6 +46,8 @@ const (
4546 defaultActionTimeout = 30 * time .Second
4647)
4748
49+ var templateFunctions = template.FuncMap {"join" : strings .Join }
50+
4851func init () {
4952 loaders .Register (loaderType , func () loaders.TargetLoader {
5053 return & consulLoader {
@@ -140,6 +143,29 @@ func (c *consulLoader) Init(ctx context.Context, cfg map[string]interface{}, log
140143 se .tags [t ] = struct {}{}
141144 }
142145 }
146+ // parse tempaltes if present
147+ for i , se := range c .cfg .Services {
148+ if se .Config == nil {
149+ continue
150+ }
151+ if name , ok := se .Config ["name" ].(string ); ok {
152+ nameTemplate , err := template .New (fmt .Sprintf ("targetName-%d" , i )).Funcs (templateFunctions ).Option ("missingkey=zero" ).Parse (name )
153+ if err != nil {
154+ return err
155+ }
156+ se .targetNameTemplate = nameTemplate
157+ }
158+ if eventTags , ok := se .Config ["event-tags" ].(map [string ]any ); ok {
159+ se .targetTagsTemplate = make (map [string ]* template.Template )
160+ for tagName , tagTemplateString := range eventTags {
161+ tagTemplate , err := template .New (fmt .Sprintf ("tagTemplate-%s-%d" , tagName , i )).Funcs (templateFunctions ).Option ("missingkey=zero" ).Parse (fmt .Sprintf ("%v" , tagTemplateString ))
162+ if err != nil {
163+ return err
164+ }
165+ se .targetTagsTemplate [tagName ] = tagTemplate
166+ }
167+ }
168+ }
143169
144170 err = c .readVars (ctx )
145171 if err != nil {
@@ -222,53 +248,57 @@ CLIENT:
222248}
223249
224250func (c * consulLoader ) RunOnce (ctx context.Context ) (map [string ]* types.TargetConfig , error ) {
225- err := c .initClient ()
226- if err != nil {
251+ if err := c .initClient (); err != nil {
227252 return nil , err
228253 }
229254 result := make (map [string ]* types.TargetConfig )
230255 rsChan := make (chan * api.ServiceEntry )
231- m := new (sync.Mutex )
232256 wg := new (sync.WaitGroup )
233- wg .Add (len (c .cfg .Services ))
234257
258+ // fan-out queries
235259 for _ , s := range c .cfg .Services {
260+ wg .Add (1 )
236261 go func (s * serviceDef ) {
262+ defer wg .Done ()
237263 ses , _ , err := c .client .Health ().ServiceMultipleTags (s .Name , s .Tags , true , & api.QueryOptions {})
238264 if err != nil {
239265 c .logger .Printf ("failed to get service %q instances: %v" , s .Name , err )
240266 return
241267 }
242268 for _ , se := range ses {
243- rsChan <- se
269+ select {
270+ case rsChan <- se :
271+ case <- ctx .Done ():
272+ return
273+ }
244274 }
245275 }(s )
246276 }
247277
278+ // closer
248279 go func () {
249- m .Lock ()
250- defer m .Unlock ()
251- for {
252- select {
253- case se , ok := <- rsChan :
254- if ! ok {
255- return
256- }
257- tc , err := c .serviceEntryToTargetConfig (se )
258- if err != nil {
259- c .logger .Printf ("failed to convert service %+v to target config: %v" , se , err )
260- }
280+ wg .Wait ()
281+ close (rsChan )
282+ }()
283+
284+ for {
285+ select {
286+ case se , ok := <- rsChan :
287+ if ! ok {
288+ return result , nil
289+ }
290+ tc , err := c .serviceEntryToTargetConfig (se )
291+ if err != nil {
292+ c .logger .Printf ("failed to convert service %+v to target config: %v" , se , err )
293+ continue
294+ }
295+ if tc != nil {
261296 result [tc .Name ] = tc
262- case <- ctx .Done ():
263- return
264297 }
298+ case <- ctx .Done ():
299+ return result , ctx .Err ()
265300 }
266- }()
267- wg .Wait ()
268- close (rsChan )
269- m .Lock ()
270- defer m .Unlock ()
271- return result , nil
301+ }
272302}
273303
274304//
@@ -386,14 +416,7 @@ SRV:
386416 // match service tags
387417 if len (sd .tags ) > 0 {
388418 for requiredTag := range sd .tags {
389- found := false
390- for _ , serviceTag := range se .Service .Tags {
391- if serviceTag == requiredTag {
392- found = true
393- break
394- }
395- }
396- if ! found {
419+ if ! slices .Contains (se .Service .Tags , requiredTag ) {
397420 goto SRV
398421 }
399422 }
@@ -417,15 +440,9 @@ SRV:
417440
418441 tc .Name = se .Service .ID
419442
420- if configName , ok := sd .Config ["name" ].(string ); ok {
421- nameTemplate , err := template .New ("targetName" ).Option ("missingkey=zero" ).Parse (configName )
422- if err != nil {
423- c .logger .Println ("Could not parse nameTemplate" )
424- }
425- sd .targetNameTemplate = nameTemplate
426-
443+ if sd .targetNameTemplate != nil {
427444 buffer .Reset ()
428- err = sd .targetNameTemplate .Execute (& buffer , se .Service )
445+ err : = sd .targetNameTemplate .Execute (& buffer , se .Service )
429446 if err != nil {
430447 c .logger .Println ("Could not execute nameTemplate" )
431448 continue
@@ -434,20 +451,7 @@ SRV:
434451 }
435452
436453 // Create Event tags from Consul via templates
437- if configEventTags , ok := sd .Config ["event-tags" ].(map [string ]interface {}); ok {
438- // Allow to use join function in tags
439- templateFunctions := template.FuncMap {"join" : strings .Join }
440-
441- sd .targetTagsTemplate = make (map [string ]* template.Template )
442- for tagName , tagTemplateString := range configEventTags {
443- tagTemplate , err := template .New (tagName ).Funcs (templateFunctions ).Option ("missingkey=zero" ).Parse (fmt .Sprintf ("%v" , tagTemplateString ))
444- if err != nil {
445- c .logger .Println ("Could not parse tagTemplate:" , tagName )
446- continue
447- }
448- sd .targetTagsTemplate [tagName ] = tagTemplate
449- }
450-
454+ if len (sd .targetTagsTemplate ) > 0 {
451455 eventTags := make (map [string ]string )
452456 for tagName , tagTemplate := range sd .targetTagsTemplate {
453457 buffer .Reset ()
@@ -630,18 +634,14 @@ func (c *consulLoader) runActions(ctx context.Context, tcs map[string]*types.Tar
630634func (c * consulLoader ) runOnAddActions (ctx context.Context , tName string , tcs map [string ]* types.TargetConfig ) error {
631635 aCtx := & actions.Context {
632636 Input : tName ,
633- Env : make (map [string ]interface {} ),
637+ Env : make (map [string ]any ),
634638 Vars : c .vars ,
635639 Targets : tcs ,
636640 }
637641 for _ , act := range c .addActions {
638642 c .logger .Printf ("running action %q for target %q" , act .NName (), tName )
639643 res , err := act .Run (ctx , aCtx )
640644 if err != nil {
641- // delete target from known targets map
642- c .m .Lock ()
643- delete (c .lastTargets , tName )
644- c .m .Unlock ()
645645 return fmt .Errorf ("action %q for target %q failed: %v" , act .NName (), tName , err )
646646 }
647647
@@ -655,7 +655,7 @@ func (c *consulLoader) runOnAddActions(ctx context.Context, tName string, tcs ma
655655 return nil
656656}
657657
658- func (c * consulLoader ) runOnDeleteActions (ctx context.Context , tName string , tcs map [string ]* types.TargetConfig ) error {
658+ func (c * consulLoader ) runOnDeleteActions (ctx context.Context , tName string , _ map [string ]* types.TargetConfig ) error {
659659 env := make (map [string ]interface {})
660660 for _ , act := range c .delActions {
661661 res , err := act .Run (ctx , & actions.Context {Input : tName , Env : env , Vars : c .vars })
0 commit comments