Skip to content

Commit a36b48f

Browse files
committed
Fix the metrics feature so that it can be enabled and disabled remotely
1 parent 2bf562d commit a36b48f

39 files changed

+150
-102
lines changed

src/core/pipe.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex
7171
pluginsRegistered := []string{}
7272
extensionPluginsRegistered := []string{}
7373

74-
for _, plugin := range p.plugins {
74+
for _, plugin := range plugins {
7575
for _, subscription := range plugin.Subscriptions() {
7676
p.regMu.Lock()
7777
err := p.bus.Subscribe(subscription, plugin.Process)
@@ -83,7 +83,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex
8383
pluginsRegistered = append(pluginsRegistered, *plugin.Info().name)
8484
}
8585

86-
for _, plugin := range p.extensionPlugins {
86+
for _, plugin := range extensionPlugins {
8787
for _, subscription := range plugin.Subscriptions() {
8888
p.regMu.Lock()
8989
err := p.bus.Subscribe(subscription, plugin.Process)
@@ -94,6 +94,7 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex
9494
}
9595
extensionPluginsRegistered = append(extensionPluginsRegistered, *plugin.Info().name)
9696
}
97+
9798
log.Infof("The following core plugins have been registered: %q", pluginsRegistered)
9899
log.Infof("The following extension plugins have been registered: %q", extensionPluginsRegistered)
99100

@@ -127,6 +128,8 @@ func (p *MessagePipe) DeRegister(pluginNames []string) error {
127128
return err
128129
}
129130
}
131+
132+
plugin = nil
130133
}
131134
}
132135

src/plugins/commander.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,15 @@ func NewCommander(cmdr client.Commander, config *config.Config) *Commander {
3838
}
3939

4040
func (c *Commander) Init(pipeline core.MessagePipeInterface) {
41+
log.Info("Commander initializing")
4142
c.pipeline = pipeline
4243
c.ctx = pipeline.Context()
43-
log.Info("Commander initializing")
4444
go c.dispatchLoop()
4545
}
4646

4747
func (c *Commander) Close() {
4848
log.Info("Commander is wrapping up")
49+
log.Info("Commander is closed")
4950
}
5051

5152
func (c *Commander) Info() *core.Info {
@@ -158,7 +159,6 @@ func (c *Commander) dispatchLoop() {
158159
case *proto.Command_AgentConnectRequest, *proto.Command_AgentConnectResponse:
159160
topic = core.AgentConnected
160161
case *proto.Command_AgentConfigRequest, *proto.Command_AgentConfig:
161-
log.Debugf("agent config %T command data type received and ignored", cmd.GetData())
162162
topic = core.AgentConfig
163163
case *proto.Command_CmdStatus:
164164
data := cmd.GetData().(*proto.Command_CmdStatus)

src/plugins/common.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/nginx/agent/v2/src/core/config"
77
"github.com/nginx/agent/v2/src/extensions"
88
log "github.com/sirupsen/logrus"
9+
"go.uber.org/atomic"
910

1011
agent_config "github.com/nginx/agent/sdk/v2/agent/config"
1112
"github.com/nginx/agent/sdk/v2/agent/events"
@@ -36,15 +37,15 @@ func LoadPlugins(commander client.Commander, binary core.NginxBinary, env core.E
3637

3738
if (loadedConfig.IsFeatureEnabled(agent_config.FeatureMetrics) || loadedConfig.IsFeatureEnabled(agent_config.FeatureMetricsSender)) && reporter != nil {
3839
corePlugins = append(corePlugins,
39-
NewMetricsSender(reporter),
40+
NewMetricsSender(reporter, atomic.NewBool(false)),
4041
)
4142
}
4243

4344
corePlugins = append(corePlugins,
4445
NewConfigReader(loadedConfig),
4546
NewNginx(commander, binary, env, loadedConfig, processes),
4647
NewExtensions(loadedConfig, env),
47-
NewFeatures(commander, loadedConfig, env, binary, loadedConfig.Version, processes, agentEventsMeta),
48+
NewFeatures(commander, reporter, loadedConfig, env, binary, loadedConfig.Version, processes, agentEventsMeta),
4849
)
4950

5051
if loadedConfig.IsFeatureEnabled(agent_config.FeatureRegistration) {

src/plugins/config_reader.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func NewConfigReader(config *config.Config) *ConfigReader {
3737
}
3838

3939
func (r *ConfigReader) Init(pipeline core.MessagePipeInterface) {
40+
log.Info("ConfigReader initializing")
4041
r.messagePipeline = pipeline
4142
}
4243

@@ -46,6 +47,7 @@ func (r *ConfigReader) Info() *core.Info {
4647

4748
func (r *ConfigReader) Close() {
4849
log.Info("ConfigReader is wrapping up")
50+
log.Info("ConfigReader is closed")
4951
}
5052

5153
func (r *ConfigReader) Process(msg *core.Message) {
@@ -73,8 +75,10 @@ func (r *ConfigReader) Process(msg *core.Message) {
7375
// Update the agent config on disk
7476
switch commandData := cmd.Data.(type) {
7577
case *proto.Command_AgentConfig:
78+
log.Debugf("Config reader: AgentConfig message recevied: %v, topic: %v", commandData, msg.Topic())
7679
r.updateAgentConfig(commandData.AgentConfig)
7780
case *proto.Command_AgentConnectResponse:
81+
log.Debugf("Config reader: AgentConnectResponse message recevied: %v, topic: %v", commandData, msg.Topic())
7882
r.updateAgentConfig(commandData.AgentConnectResponse.AgentConfig)
7983
}
8084
}
@@ -152,7 +156,7 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig)
152156
}
153157

154158
if synchronizeFeatures {
155-
log.Debugf("Agent config features changed, synchronizing features")
159+
log.Info("Agent config features changed, synchronizing features")
156160
r.synchronizeFeatures(payloadAgentConfig)
157161
r.config.Features = payloadAgentConfig.Details.Features
158162
}

src/plugins/dataplane_status.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func (dps *DataPlaneStatus) Close() {
9090
dps.softwareDetailsMutex.Unlock()
9191
dps.healthTicker.Stop()
9292
dps.sendStatus <- true
93+
log.Info("DataPlaneStatus is closed")
9394
}
9495

9596
func (dps *DataPlaneStatus) Info() *core.Info {

src/plugins/events.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (a *Events) Init(pipeline core.MessagePipeInterface) {
6464

6565
func (a *Events) Close() {
6666
log.Info("Events is wrapping up")
67+
log.Info("Events is closed")
6768
}
6869

6970
func (a *Events) Process(msg *core.Message) {

src/plugins/extensions.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func (e *Extensions) Init(pipeline core.MessagePipeInterface) {
3737

3838
func (e *Extensions) Close() {
3939
log.Info("Extensions is wrapping up")
40+
log.Info("Extensions is closed")
4041
}
4142

4243
func (e *Extensions) Process(msg *core.Message) {

src/plugins/features.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import (
1414
sdkGRPC "github.com/nginx/agent/sdk/v2/grpc"
1515
"github.com/nginx/agent/v2/src/core"
1616
"github.com/nginx/agent/v2/src/core/config"
17+
"go.uber.org/atomic"
1718

1819
"github.com/google/uuid"
1920
log "github.com/sirupsen/logrus"
2021
)
2122

2223
type Features struct {
2324
commander client.Commander
25+
metricReporter client.MetricReporter
2426
pipeline core.MessagePipeInterface
2527
conf *config.Config
2628
env core.Environment
@@ -33,6 +35,7 @@ type Features struct {
3335

3436
func NewFeatures(
3537
commander client.Commander,
38+
metricReporter client.MetricReporter,
3639
conf *config.Config,
3740
env core.Environment,
3841
binary core.NginxBinary,
@@ -42,6 +45,7 @@ func NewFeatures(
4245
) *Features {
4346
return &Features{
4447
commander: commander,
48+
metricReporter: metricReporter,
4549
conf: conf,
4650
env: env,
4751
binary: binary,
@@ -94,6 +98,7 @@ func (f *Features) Init(pipeline core.MessagePipeInterface) {
9498

9599
func (f *Features) Close() {
96100
log.Info("Features is wrapping up")
101+
log.Info("Features is closed")
97102
}
98103

99104
func (f *Features) Info() *core.Info {
@@ -135,7 +140,7 @@ func (f *Features) Process(msg *core.Message) {
135140

136141
func (f *Features) enableMetricsFeature(_ string) []core.Plugin {
137142
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) {
138-
log.Debugf("Enabling metrics feature")
143+
log.Debug("Enabling metrics feature")
139144
conf, err := config.GetConfig(f.conf.ClientID)
140145
if err != nil {
141146
log.Warnf("Unable to get agent config, %v", err)
@@ -144,7 +149,7 @@ func (f *Features) enableMetricsFeature(_ string) []core.Plugin {
144149

145150
metrics := NewMetrics(f.conf, f.env, f.binary, f.processes)
146151
metricsThrottle := NewMetricsThrottle(f.conf, f.env)
147-
metricsSender := NewMetricsSender(f.commander)
152+
metricsSender := NewMetricsSender(f.metricReporter, atomic.NewBool(true))
148153

149154
return []core.Plugin{metrics, metricsThrottle, metricsSender}
150155
}
@@ -154,7 +159,7 @@ func (f *Features) enableMetricsFeature(_ string) []core.Plugin {
154159
func (f *Features) enableMetricsCollectionFeature(_ string) []core.Plugin {
155160
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
156161
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsCollection) {
157-
log.Debugf("Enabling metrics-collection feature")
162+
log.Debug("Enabling metrics-collection feature")
158163
conf, err := config.GetConfig(f.conf.ClientID)
159164
if err != nil {
160165
log.Warnf("Unable to get agent config, %v", err)
@@ -171,7 +176,7 @@ func (f *Features) enableMetricsCollectionFeature(_ string) []core.Plugin {
171176
func (f *Features) enableMetricsThrottleFeature(_ string) []core.Plugin {
172177
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
173178
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsThrottle) {
174-
log.Debugf("Enabling metrics-throttle feature")
179+
log.Debug("Enabling metrics-throttle feature")
175180
conf, err := config.GetConfig(f.conf.ClientID)
176181
if err != nil {
177182
log.Warnf("Unable to get agent config, %v", err)
@@ -188,14 +193,14 @@ func (f *Features) enableMetricsThrottleFeature(_ string) []core.Plugin {
188193
func (f *Features) enableMetricsSenderFeature(_ string) []core.Plugin {
189194
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetrics) &&
190195
!f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureMetricsSender) {
191-
log.Debugf("Enabling metrics-sender feature")
196+
log.Debug("Enabling metrics-sender feature")
192197
conf, err := config.GetConfig(f.conf.ClientID)
193198
if err != nil {
194199
log.Warnf("Unable to get agent config, %v", err)
195200
}
196201
f.conf = conf
197202

198-
metricsSender := NewMetricsSender(f.commander)
203+
metricsSender := NewMetricsSender(f.metricReporter, atomic.NewBool(true))
199204

200205
return []core.Plugin{metricsSender}
201206
}
@@ -204,8 +209,8 @@ func (f *Features) enableMetricsSenderFeature(_ string) []core.Plugin {
204209

205210
func (f *Features) enableAgentAPIFeature(_ string) []core.Plugin {
206211
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureAgentAPI) {
212+
log.Debug("Enabling agent-api feature")
207213
conf, err := config.GetConfig(f.conf.ClientID)
208-
log.Debugf("Enabling agent-api feature")
209214
if err != nil {
210215
log.Warnf("Unable to get agent config, %v", err)
211216
}
@@ -220,7 +225,7 @@ func (f *Features) enableAgentAPIFeature(_ string) []core.Plugin {
220225

221226
func (f *Features) enableRegistrationFeature(_ string) []core.Plugin {
222227
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureRegistration) {
223-
log.Debugf("Enabling registration feature")
228+
log.Debug("Enabling registration feature")
224229
conf, err := config.GetConfig(f.conf.ClientID)
225230
if err != nil {
226231
log.Warnf("Unable to get agent config, %v", err)
@@ -236,7 +241,7 @@ func (f *Features) enableRegistrationFeature(_ string) []core.Plugin {
236241

237242
func (f *Features) enableDataPlaneStatusFeature(_ string) []core.Plugin {
238243
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureDataPlaneStatus) {
239-
log.Debugf("Enabling dataplane-status feature")
244+
log.Debug("Enabling dataplane-status feature")
240245
conf, err := config.GetConfig(f.conf.ClientID)
241246
if err != nil {
242247
log.Warnf("Unable to get agent config, %v", err)
@@ -252,7 +257,7 @@ func (f *Features) enableDataPlaneStatusFeature(_ string) []core.Plugin {
252257

253258
func (f *Features) enableProcessWatcherFeature(_ string) []core.Plugin {
254259
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureProcessWatcher) {
255-
log.Debugf("Enabling process-watcher feature")
260+
log.Debug("Enabling process-watcher feature")
256261
conf, err := config.GetConfig(f.conf.ClientID)
257262
if err != nil {
258263
log.Warnf("Unable to get agent config, %v", err)
@@ -268,23 +273,23 @@ func (f *Features) enableProcessWatcherFeature(_ string) []core.Plugin {
268273

269274
func (f *Features) enableActivityEventsFeature(_ string) []core.Plugin {
270275
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureActivityEvents) {
271-
log.Debugf("Enabling activity-events feature")
276+
log.Debug("Enabling activity-events feature")
272277
conf, err := config.GetConfig(f.conf.ClientID)
273278
if err != nil {
274279
log.Warnf("Unable to get agent config, %v", err)
275280
}
276281
f.conf = conf
277282

278-
events := NewEvents(f.conf, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.agentEventsMeta)
283+
eventsPlugin := NewEvents(f.conf, f.env, sdkGRPC.NewMessageMeta(uuid.NewString()), f.binary, f.agentEventsMeta)
279284

280-
return []core.Plugin{events}
285+
return []core.Plugin{eventsPlugin}
281286
}
282287
return []core.Plugin{}
283288
}
284289

285290
func (f *Features) enableFileWatcherFeature(_ string) []core.Plugin {
286291
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureFileWatcher) {
287-
log.Debugf("Enabling file-watcher feature")
292+
log.Debug("Enabling file-watcher feature")
288293
conf, err := config.GetConfig(f.conf.ClientID)
289294
if err != nil {
290295
log.Warnf("Unable to get agent config, %v", err)
@@ -303,7 +308,7 @@ func (f *Features) enableNginxCountingFeature(_ string) []core.Plugin {
303308
countingPlugins := []core.Plugin{}
304309
if len(f.conf.Nginx.NginxCountingSocket) > 0 {
305310
if !f.pipeline.IsPluginAlreadyRegistered(agent_config.FeatureNginxCounting) {
306-
log.Debugf("Enabling nginx-counting feature")
311+
log.Debug("Enabling nginx-counting feature")
307312
conf, err := config.GetConfig(f.conf.ClientID)
308313
if err != nil {
309314
log.Warnf("Unable to get agent config, %v", err)

src/plugins/features_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,11 @@ func TestFeatures_Process(t *testing.T) {
9393
binary.On("UpdateNginxDetailsFromProcesses", mock.Anything).Return()
9494

9595
cmdr := tutils.NewMockCommandClient()
96+
reporter := tutils.NewMockMetricsReportClient()
9697

9798
configuration, _ := config.GetConfig("1234")
9899

99-
pluginUnderTest := NewFeatures(cmdr, configuration, env, binary, "agentVersion", processes, events.NewAgentEventMeta(
100+
pluginUnderTest := NewFeatures(cmdr, reporter, configuration, env, binary, "agentVersion", processes, events.NewAgentEventMeta(
100101
config.MODULE,
101102
"v0.0.1",
102103
"75231",

src/plugins/file_watcher.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (fw *FileWatcher) Close() {
104104
fw.enabled = false
105105
fw.cancelFunction()
106106
fw.watcher.Close()
107+
log.Info("File Watcher is closed")
107108
}
108109

109110
func (fw *FileWatcher) Process(message *core.Message) {

0 commit comments

Comments
 (0)