@@ -16,17 +16,19 @@ import (
1616 models "github.com/nginx/agent/sdk/v2/proto/events"
1717 "github.com/nginx/agent/v2/src/core"
1818 "strings"
19+ "sync"
1920
2021 log "github.com/sirupsen/logrus"
2122 "go.uber.org/atomic"
2223)
2324
2425type MetricsSender struct {
25- reporter client.MetricReporter
26- pipeline core.MessagePipeInterface
27- ctx context.Context
28- started * atomic.Bool
29- readyToSend * atomic.Bool
26+ reporter client.MetricReporter
27+ pipeline core.MessagePipeInterface
28+ ctx context.Context
29+ started * atomic.Bool
30+ readyToSend * atomic.Bool
31+ readyToSendMu sync.RWMutex
3032}
3133
3234func NewMetricsSender (reporter client.MetricReporter ) * MetricsSender {
@@ -49,21 +51,23 @@ func (r *MetricsSender) Init(pipeline core.MessagePipeInterface) {
4951
5052func (r * MetricsSender ) Close () {
5153 log .Info ("MetricsSender is wrapping up" )
54+ r .readyToSendMu .Lock ()
5255 r .started .Store (false )
5356 r .readyToSend .Store (false )
57+ defer r .readyToSendMu .Unlock ()
5458}
5559
5660func (r * MetricsSender ) Info () * core.Info {
5761 return core .NewInfo (agent_config .FeatureMetricsSender , "v0.0.1" )
5862}
5963
6064func (r * MetricsSender ) Process (msg * core.Message ) {
61- if msg .Exact (core .AgentConnected ) {
62- log .Debugf ("MetricsSender AgentConnected Before: %v" , r .readyToSend )
63- r .readyToSend .Store (true )
64- log .Debugf ("MetricsSender AgentConnected After %v" , r .readyToSend )
65- return
66- }
65+ // if msg.Exact(core.AgentConnected) {
66+ // log.Debugf("MetricsSender AgentConnected Before: %v", r.readyToSend)
67+ // r.readyToSend.Store(true)
68+ // log.Debugf("MetricsSender AgentConnected After %v", r.readyToSend)
69+ // return
70+ // }
6771
6872 if msg .Exact (core .CommMetrics ) {
6973 payloads , ok := msg .Data ().([]core.Payload )
@@ -115,11 +119,14 @@ func (r *MetricsSender) Process(msg *core.Message) {
115119func (r * MetricsSender ) metricSenderBackoff (agentConfig * proto.AgentConfig ) {
116120 log .Debugf ("update metric reporter client configuration to %+v" , agentConfig )
117121 if agentConfig .Details .Features != nil {
118- for _ , feature := range agentConfig .Details .Features {
119- if feature == agent_config .FeatureMetricsSender {
120- r .readyToSend .Store (true )
121- break
122- }
122+ if r .isFeatureEnabled (agentConfig ) {
123+ r .readyToSendMu .Lock ()
124+ r .readyToSend .Store (true )
125+ r .readyToSendMu .Unlock ()
126+ } else {
127+ r .readyToSendMu .Lock ()
128+ r .readyToSend .Store (false )
129+ r .readyToSendMu .Unlock ()
123130 }
124131 }
125132 if agentConfig .GetDetails () == nil || agentConfig .GetDetails ().GetServer () == nil || agentConfig .GetDetails ().GetServer ().GetBackoff () == nil {
@@ -134,3 +141,16 @@ func (r *MetricsSender) metricSenderBackoff(agentConfig *proto.AgentConfig) {
134141func (r * MetricsSender ) Subscriptions () []string {
135142 return []string {core .CommMetrics , core .AgentConnected , core .AgentConfigChanged }
136143}
144+
145+ func (r * MetricsSender ) isFeatureEnabled (agentConfig * proto.AgentConfig ) bool {
146+ var isFeatureEnabled bool
147+ if agentConfig .Details .Features != nil {
148+ for _ , feature := range agentConfig .Details .Features {
149+ if feature == agent_config .FeatureMetricsSender {
150+ isFeatureEnabled = true
151+ break
152+ }
153+ }
154+ }
155+ return isFeatureEnabled
156+ }
0 commit comments