Skip to content

Commit 2bf562d

Browse files
committed
Fix the metrics feature so that it can be enabled and disabled remotely
1 parent 906b54a commit 2bf562d

File tree

13 files changed

+112
-59
lines changed

13 files changed

+112
-59
lines changed

src/core/environment.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,6 @@ func (env *EnvironmentType) IsContainer() bool {
342342
res, err, _ := singleflightGroup.Do(IsContainerKey, func() (interface{}, error) {
343343
for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} {
344344
if _, err := os.Stat(filename); err == nil {
345-
log.Debugf("Is a container because (%s) exists", filename)
346345
return true, nil
347346
}
348347
}

src/plugins/config_reader.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig)
152152
}
153153

154154
if synchronizeFeatures {
155+
log.Debugf("Agent config features changed, synchronizing features")
155156
r.synchronizeFeatures(payloadAgentConfig)
157+
r.config.Features = payloadAgentConfig.Details.Features
156158
}
157159

158160
r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig))
@@ -164,6 +166,7 @@ func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) {
164166
r.detailsMu.RLock()
165167
for _, feature := range r.config.Features {
166168
if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync {
169+
log.Debugf("Deregistering the feature %s", feature)
167170
r.deRegisterPlugin(feature)
168171
}
169172
}
@@ -177,12 +180,15 @@ func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) {
177180

178181
func (r *ConfigReader) deRegisterPlugin(data string) {
179182
if data == agent_config.FeatureFileWatcher {
180-
181183
err := r.messagePipeline.DeRegister([]string{agent_config.FeatureFileWatcher, agent_config.FeatureFileWatcherThrottle})
182184
if err != nil {
183185
log.Warnf("Error De-registering %v Plugin: %v", data, err)
184186
}
185-
187+
} else if data == agent_config.FeatureMetrics {
188+
err := r.messagePipeline.DeRegister([]string{agent_config.FeatureMetrics, agent_config.FeatureMetricsThrottle, agent_config.FeatureMetricsSender})
189+
if err != nil {
190+
log.Warnf("Error De-registering %v Plugin: %v", data, err)
191+
}
186192
} else {
187193
err := r.messagePipeline.DeRegister([]string{data})
188194
if err != nil {

src/plugins/dataplane_status.go

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,26 @@ import (
2323
)
2424

2525
type DataPlaneStatus struct {
26-
messagePipeline core.MessagePipeInterface
27-
ctx context.Context
28-
sendStatus chan bool
29-
healthTicker *time.Ticker
30-
interval time.Duration
31-
meta *proto.Metadata
32-
binary core.NginxBinary
33-
env core.Environment
34-
version string
35-
tags *[]string
36-
configDirs string
37-
lastSendDetails time.Time
38-
envHostInfo *proto.HostInfo
39-
reportInterval time.Duration
40-
softwareDetails map[string]*proto.DataplaneSoftwareDetails
41-
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
42-
softwareDetailsMutex sync.RWMutex
43-
structMu sync.RWMutex
44-
processes []*core.Process
26+
messagePipeline core.MessagePipeInterface
27+
ctx context.Context
28+
sendStatus chan bool
29+
healthTicker *time.Ticker
30+
interval time.Duration
31+
meta *proto.Metadata
32+
binary core.NginxBinary
33+
env core.Environment
34+
version string
35+
tags *[]string
36+
configDirs string
37+
lastSendDetails time.Time
38+
envHostInfo *proto.HostInfo
39+
reportInterval time.Duration
40+
softwareDetails map[string]*proto.DataplaneSoftwareDetails
41+
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
42+
nginxConfigActivityStatusesMutex sync.RWMutex
43+
softwareDetailsMutex sync.RWMutex
44+
structMu sync.RWMutex
45+
processes []*core.Process
4546
}
4647

4748
const (
@@ -81,7 +82,9 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) {
8182

8283
func (dps *DataPlaneStatus) Close() {
8384
log.Info("DataPlaneStatus is wrapping up")
85+
dps.nginxConfigActivityStatusesMutex.Lock()
8486
dps.nginxConfigActivityStatuses = nil
87+
dps.nginxConfigActivityStatusesMutex.Unlock()
8588
dps.softwareDetailsMutex.Lock()
8689
dps.softwareDetails = nil
8790
dps.softwareDetailsMutex.Unlock()
@@ -144,8 +147,10 @@ func (dps *DataPlaneStatus) Subscriptions() []string {
144147

145148
func (dps *DataPlaneStatus) updateNginxConfigActivityStatuses(newAgentActivityStatus *proto.AgentActivityStatus) {
146149
log.Tracef("DataplaneStatus: Updating nginxConfigActivityStatuses with %v", newAgentActivityStatus)
147-
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); ok {
150+
if _, ok := newAgentActivityStatus.GetStatus().(*proto.AgentActivityStatus_NginxConfigStatus); dps.nginxConfigActivityStatuses != nil && ok {
151+
dps.nginxConfigActivityStatusesMutex.Lock()
148152
dps.nginxConfigActivityStatuses[newAgentActivityStatus.GetNginxConfigStatus().GetNginxId()] = newAgentActivityStatus
153+
dps.nginxConfigActivityStatusesMutex.Unlock()
149154
}
150155
}
151156

@@ -184,6 +189,8 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface)
184189
func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus {
185190
forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails)
186191

192+
dps.nginxConfigActivityStatusesMutex.Lock()
193+
defer dps.nginxConfigActivityStatusesMutex.Unlock()
187194
agentActivityStatuses := []*proto.AgentActivityStatus{}
188195
for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses {
189196
agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus)

src/plugins/features.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (f *Features) Process(msg *core.Message) {
135135

136136
func (f *Features) enableMetricsFeature(_ string) []core.Plugin {
137137
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) {
138-
138+
log.Debugf("Enabling metrics feature")
139139
conf, err := config.GetConfig(f.conf.ClientID)
140140
if err != nil {
141141
log.Warnf("Unable to get agent config, %v", err)
@@ -154,7 +154,7 @@ func (f *Features) enableMetricsFeature(_ string) []core.Plugin {
154154
func (f *Features) enableMetricsCollectionFeature(_ string) []core.Plugin {
155155
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
156156
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) {
157-
157+
log.Debugf("Enabling metrics-collection feature")
158158
conf, err := config.GetConfig(f.conf.ClientID)
159159
if err != nil {
160160
log.Warnf("Unable to get agent config, %v", err)
@@ -171,7 +171,7 @@ func (f *Features) enableMetricsCollectionFeature(_ string) []core.Plugin {
171171
func (f *Features) enableMetricsThrottleFeature(_ string) []core.Plugin {
172172
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
173173
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) {
174-
174+
log.Debugf("Enabling metrics-throttle feature")
175175
conf, err := config.GetConfig(f.conf.ClientID)
176176
if err != nil {
177177
log.Warnf("Unable to get agent config, %v", err)
@@ -188,7 +188,7 @@ func (f *Features) enableMetricsThrottleFeature(_ string) []core.Plugin {
188188
func (f *Features) enableMetricsSenderFeature(_ string) []core.Plugin {
189189
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
190190
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) {
191-
191+
log.Debugf("Enabling metrics-sender feature")
192192
conf, err := config.GetConfig(f.conf.ClientID)
193193
if err != nil {
194194
log.Warnf("Unable to get agent config, %v", err)
@@ -205,6 +205,7 @@ func (f *Features) enableMetricsSenderFeature(_ string) []core.Plugin {
205205
func (f *Features) enableAgentAPIFeature(_ string) []core.Plugin {
206206
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) {
207207
conf, err := config.GetConfig(f.conf.ClientID)
208+
log.Debugf("Enabling agent-api feature")
208209
if err != nil {
209210
log.Warnf("Unable to get agent config, %v", err)
210211
}
@@ -219,6 +220,7 @@ func (f *Features) enableAgentAPIFeature(_ string) []core.Plugin {
219220

220221
func (f *Features) enableRegistrationFeature(_ string) []core.Plugin {
221222
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) {
223+
log.Debugf("Enabling registration feature")
222224
conf, err := config.GetConfig(f.conf.ClientID)
223225
if err != nil {
224226
log.Warnf("Unable to get agent config, %v", err)
@@ -234,6 +236,7 @@ func (f *Features) enableRegistrationFeature(_ string) []core.Plugin {
234236

235237
func (f *Features) enableDataPlaneStatusFeature(_ string) []core.Plugin {
236238
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureDataPlaneStatus) {
239+
log.Debugf("Enabling dataplane-status feature")
237240
conf, err := config.GetConfig(f.conf.ClientID)
238241
if err != nil {
239242
log.Warnf("Unable to get agent config, %v", err)
@@ -249,6 +252,7 @@ func (f *Features) enableDataPlaneStatusFeature(_ string) []core.Plugin {
249252

250253
func (f *Features) enableProcessWatcherFeature(_ string) []core.Plugin {
251254
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureProcessWatcher) {
255+
log.Debugf("Enabling process-watcher feature")
252256
conf, err := config.GetConfig(f.conf.ClientID)
253257
if err != nil {
254258
log.Warnf("Unable to get agent config, %v", err)
@@ -264,6 +268,7 @@ func (f *Features) enableProcessWatcherFeature(_ string) []core.Plugin {
264268

265269
func (f *Features) enableActivityEventsFeature(_ string) []core.Plugin {
266270
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureActivityEvents) {
271+
log.Debugf("Enabling activity-events feature")
267272
conf, err := config.GetConfig(f.conf.ClientID)
268273
if err != nil {
269274
log.Warnf("Unable to get agent config, %v", err)
@@ -279,6 +284,7 @@ func (f *Features) enableActivityEventsFeature(_ string) []core.Plugin {
279284

280285
func (f *Features) enableFileWatcherFeature(_ string) []core.Plugin {
281286
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureFileWatcher) {
287+
log.Debugf("Enabling file-watcher feature")
282288
conf, err := config.GetConfig(f.conf.ClientID)
283289
if err != nil {
284290
log.Warnf("Unable to get agent config, %v", err)
@@ -297,6 +303,7 @@ func (f *Features) enableNginxCountingFeature(_ string) []core.Plugin {
297303
countingPlugins := []core.Plugin{}
298304
if len(f.conf.Nginx.NginxCountingSocket) > 0 {
299305
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureNginxCounting) {
306+
log.Debugf("Enabling nginx-counting feature")
300307
conf, err := config.GetConfig(f.conf.ClientID)
301308
if err != nil {
302309
log.Warnf("Unable to get agent config, %v", err)

src/plugins/metrics.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Metrics struct {
4141
binary core.NginxBinary
4242
processesMutex sync.RWMutex
4343
processes []*core.Process
44+
cancel context.CancelFunc
4445
}
4546

4647
func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary, processes []*core.Process) *Metrics {
@@ -65,13 +66,14 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi
6566
func (m *Metrics) Init(pipeline core.MessagePipeInterface) {
6667
log.Info("Metrics initializing")
6768
m.pipeline = pipeline
68-
m.ctx = pipeline.Context()
69+
m.ctx, m.cancel = context.WithCancel(pipeline.Context())
6970
go m.metricsGoroutine()
7071
go m.drainBuffer(m.ctx)
7172
}
7273

7374
func (m *Metrics) Close() {
7475
m.collectorsMutex.Lock()
76+
m.cancel()
7577
m.collectors = nil
7678
m.collectorsMutex.Unlock()
7779
log.Info("Metrics is wrapping up")
@@ -179,6 +181,7 @@ func (m *Metrics) metricsGoroutine() {
179181
}
180182
return
181183
case <-m.ticker.C:
184+
log.Debug("Collecting metrics")
182185
m.collectStats()
183186

184187
if m.collectorsUpdate.Load() {

src/plugins/metrics_sender.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ func (r *MetricsSender) Info() *core.Info {
6060

6161
func (r *MetricsSender) Process(msg *core.Message) {
6262
if msg.Exact(core.AgentConnected) {
63-
r.readyToSend.Toggle()
63+
log.Debug("Metrics sender received agent connected message")
64+
r.readyToSend.Store(true)
6465
return
6566
}
6667

@@ -82,6 +83,7 @@ func (r *MetricsSender) Process(msg *core.Message) {
8283
if err != nil {
8384
log.Errorf("Failed to send MetricsReport: %v", err)
8485
} else {
86+
log.Debug("Metrics sender sent metrics report")
8587
r.pipeline.Process(core.NewMessage(core.MetricReportSent, nil))
8688
}
8789
case *models.EventReport:
@@ -99,9 +101,12 @@ func (r *MetricsSender) Process(msg *core.Message) {
99101
}
100102
}
101103
} else if msg.Exact(core.AgentConfigChanged) {
104+
log.Debug("Metrics sender received agent config changed message")
102105
switch config := msg.Data().(type) {
103106
case *proto.AgentConfig:
104107
r.metricSenderBackoff(config)
108+
// If metrics sender feature is re-enabled remotely then we need to set readyToSend to true
109+
r.readyToSend.Store(true)
105110
default:
106111
log.Warnf("metrics sender expected %T type, but got: %T", &proto.AgentConfig{}, msg.Data())
107112
}

test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go

Lines changed: 8 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)