Skip to content

Commit a8cb72b

Browse files
committed
Add support for remotely configuring the log level and path
1 parent ed33c6a commit a8cb72b

17 files changed

+260
-120
lines changed

internal/collector/otel_collector_plugin.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ func (oc *Collector) Process(ctx context.Context, msg *bus.Message) {
192192
oc.handleNginxConfigUpdate(ctx, msg)
193193
case bus.ResourceUpdateTopic:
194194
oc.handleResourceUpdate(ctx, msg)
195+
case bus.AgentConfigUpdateTopic:
196+
oc.handleAgentConfigUpdate(ctx, msg)
195197
default:
196198
slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic)
197199
}
@@ -202,6 +204,7 @@ func (oc *Collector) Subscriptions() []string {
202204
return []string{
203205
bus.ResourceUpdateTopic,
204206
bus.NginxConfigUpdateTopic,
207+
bus.AgentConfigUpdateTopic,
205208
}
206209
}
207210

@@ -329,6 +332,17 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)
329332
}
330333
}
331334

335+
func (oc *Collector) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) {
336+
slog.DebugContext(ctx, "OTel collector plugin received agent config update message")
337+
agentConfig, ok := msg.Data.(*config.Config)
338+
if !ok {
339+
slog.ErrorContext(ctx, "Unable to cast message payload to *config.Config", "payload", msg.Data)
340+
return
341+
}
342+
343+
oc.config = agentConfig
344+
}
345+
332346
func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource) bool {
333347
resourceProcessorUpdated := false
334348

internal/command/command_plugin.go

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type (
3737
Subscribe(ctx context.Context)
3838
IsConnected() bool
3939
CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error)
40-
UpdateAgentConfiguration(ctx context.Context, request *mpi.AgentConfig) error
40+
UpdateAgentConfig(ctx context.Context, request *mpi.AgentConfig) (*config.Config, error)
4141
}
4242

4343
CommandPlugin struct {
@@ -128,8 +128,6 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
128128
cp.processDataPlaneHealth(ctxWithMetadata, msg)
129129
case bus.DataPlaneResponseTopic:
130130
cp.processDataPlaneResponse(ctxWithMetadata, msg)
131-
case bus.AgentConfigUpdateTopic:
132-
cp.processAgentConfigUpdate(ctxWithMetadata, msg)
133131
default:
134132
slog.DebugContext(ctxWithMetadata, "Command plugin received unknown topic", "topic", msg.Topic)
135133
}
@@ -143,7 +141,6 @@ func (cp *CommandPlugin) Subscriptions() []string {
143141
bus.InstanceHealthTopic,
144142
bus.DataPlaneHealthResponseTopic,
145143
bus.DataPlaneResponseTopic,
146-
bus.AgentConfigUpdateTopic,
147144
}
148145
}
149146

@@ -185,12 +182,23 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res
185182
Data: createConnectionResponse,
186183
})
187184

188-
// update agent configuration after connection is created, and notify other plugins
189-
_ = cp.commandService.UpdateAgentConfiguration(ctx, createConnectionResponse.AgentConfig)
190-
cp.messagePipe.Process(ctx, &bus.Message{
191-
Topic: bus.AgentConfigUpdateTopic,
192-
Data: createConnectionResponse.AgentConfig,
193-
})
185+
if createConnectionResponse.GetAgentConfig() != nil {
186+
newAgentConfig, updateConfigError := cp.commandService.UpdateAgentConfig(
187+
ctx,
188+
createConnectionResponse.GetAgentConfig(),
189+
)
190+
if updateConfigError != nil {
191+
slog.ErrorContext(ctx, "Unable to update agent configuration", "error", updateConfigError)
192+
} else {
193+
slog.DebugContext(
194+
ctx, "Notifying other plugins of agent configuration update from create connection response",
195+
)
196+
cp.messagePipe.Process(ctx, &bus.Message{
197+
Topic: bus.AgentConfigUpdateTopic,
198+
Data: newAgentConfig,
199+
})
200+
}
201+
}
194202
}
195203
}
196204

@@ -273,19 +281,6 @@ func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Me
273281
}
274282
}
275283

276-
func (cp *CommandPlugin) processAgentConfigUpdate(ctx context.Context, msg *bus.Message) {
277-
slog.DebugContext(ctx, "Command plugin received agent config update message", "data", msg.Data)
278-
//
279-
if mpiConf, ok := msg.Data.(*mpi.AgentConfig); ok {
280-
err := cp.commandService.UpdateAgentConfiguration(ctx, mpiConf)
281-
if err != nil {
282-
slog.ErrorContext(ctx, "Unable to update agent configuration", "error", err)
283-
}
284-
} else {
285-
slog.ErrorContext(ctx, "Invalid data for agent config update message", "data", msg.Data)
286-
}
287-
}
288-
289284
//nolint:revive // cognitive complexity is 14
290285
func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
291286
for {
@@ -436,8 +431,47 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context,
436431
}
437432

438433
func (cp *CommandPlugin) handleAgentConfigUpdateRequest(ctx context.Context, request *mpi.ManagementPlaneRequest) {
439-
// notify plugins about the agent config update request
440-
cp.Process(ctx, &bus.Message{Topic: bus.AgentConfigUpdateTopic, Data: request})
434+
newAgentConfig, err := cp.commandService.UpdateAgentConfig(
435+
ctx,
436+
request.GetUpdateNginxAgentConfigurationRequest().GetAgentConfig(),
437+
)
438+
if err != nil {
439+
slog.ErrorContext(ctx, "Command service was unable to update agent configuration", "error", err)
440+
441+
responseError := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{
442+
MessageMeta: &mpi.MessageMeta{
443+
MessageId: id.GenerateMessageID(),
444+
CorrelationId: request.GetMessageMeta().GetCorrelationId(),
445+
Timestamp: timestamppb.Now(),
446+
},
447+
CommandResponse: &mpi.CommandResponse{
448+
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
449+
Message: "Failed to update agent configuration",
450+
},
451+
})
452+
453+
if responseError != nil {
454+
slog.ErrorContext(ctx, "Unable to send data plane response", "error", responseError)
455+
}
456+
} else {
457+
cp.Process(ctx, &bus.Message{Topic: bus.AgentConfigUpdateTopic, Data: newAgentConfig})
458+
459+
responseError := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{
460+
MessageMeta: &mpi.MessageMeta{
461+
MessageId: id.GenerateMessageID(),
462+
CorrelationId: request.GetMessageMeta().GetCorrelationId(),
463+
Timestamp: timestamppb.Now(),
464+
},
465+
CommandResponse: &mpi.CommandResponse{
466+
Status: mpi.CommandResponse_COMMAND_STATUS_OK,
467+
Message: "Successfully updated agent configuration",
468+
},
469+
})
470+
471+
if responseError != nil {
472+
slog.ErrorContext(ctx, "Unable to send data plane response", "error", responseError)
473+
}
474+
}
441475
}
442476

443477
func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,

internal/command/command_plugin_test.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,20 @@ func TestCommandPlugin_Init(t *testing.T) {
7777

7878
func TestCommandPlugin_createConnection(t *testing.T) {
7979
ctx := context.Background()
80+
response := &mpi.CreateConnectionResponse{
81+
Response: &mpi.CommandResponse{
82+
Status: mpi.CommandResponse_COMMAND_STATUS_OK,
83+
Message: "Connection created successfully",
84+
},
85+
AgentConfig: &mpi.AgentConfig{
86+
Log: &mpi.Log{
87+
LogLevel: mpi.Log_LOG_LEVEL_DEBUG,
88+
},
89+
},
90+
}
91+
8092
commandService := &commandfakes.FakeCommandService{}
81-
commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil)
93+
commandService.CreateConnectionReturns(response, nil)
8294
messagePipe := busfakes.NewFakeMessagePipe()
8395

8496
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
@@ -98,14 +110,15 @@ func TestCommandPlugin_createConnection(t *testing.T) {
98110

99111
assert.Eventually(
100112
t,
101-
func() bool { return len(messagePipe.Messages()) == 1 },
113+
func() bool { return len(messagePipe.Messages()) == 2 },
102114
2*time.Second,
103115
10*time.Millisecond,
104116
)
105117

106118
messages := messagePipe.Messages()
107-
assert.Len(t, messages, 1)
119+
assert.Len(t, messages, 2)
108120
assert.Equal(t, bus.ConnectionCreatedTopic, messages[0].Topic)
121+
assert.Equal(t, bus.AgentConfigUpdateTopic, messages[1].Topic)
109122
}
110123

111124
func TestCommandPlugin_Process(t *testing.T) {
@@ -151,17 +164,6 @@ func TestCommandPlugin_Process(t *testing.T) {
151164
Data: commandPlugin.conn,
152165
})
153166
require.Equal(t, 1, fakeCommandService.UpdateClientCallCount())
154-
155-
commandPlugin.Process(ctx, &bus.Message{
156-
Topic: bus.AgentConfigUpdateTopic,
157-
Data: mpi.AgentConfig{
158-
Log: &mpi.Log{
159-
LogLevel: mpi.Log_LOG_LEVEL_DEBUG,
160-
LogPath: "somewhere",
161-
},
162-
},
163-
})
164-
require.Equal(t, 1, fakeCommandService.UpdateAgentConfigurationCallCount())
165167
}
166168

167169
func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {

internal/command/command_service.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"errors"
1111
"fmt"
1212
"log/slog"
13+
"reflect"
1314
"sync"
1415
"sync/atomic"
1516

@@ -182,29 +183,29 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m
182183
)
183184
}
184185

185-
func (cs *CommandService) UpdateAgentConfiguration(ctx context.Context, mpiConfig *mpi.AgentConfig) error {
186+
func (cs *CommandService) UpdateAgentConfig(
187+
ctx context.Context,
188+
mpiConfig *mpi.AgentConfig,
189+
) (*config.Config, error) {
186190
if !cs.isConnected.Load() {
187-
return errors.New("command service client not connected yet")
191+
return nil, errors.New("command service client not connected yet")
188192
}
189-
slog.DebugContext(ctx, "Updating agent configuration", "config", mpiConfig)
193+
slog.InfoContext(ctx, "Updating agent configuration", "config", mpiConfig)
190194

191-
if mpiConfig.Log != nil {
192-
slog.DebugContext(ctx, "Updating log configuration", "log", mpiConfig.Log)
193-
logConf := config.Log{
194-
Level: config.MapConfigLogLevelToSlogLevel(mpiConfig.Log.LogLevel),
195-
Path: mpiConfig.Log.LogPath,
196-
}
197-
cs.agentConfig.Log = &logConf
195+
updatedLog := config.FromAgentConfigLogProto(mpiConfig.GetLog())
196+
197+
if mpiConfig.GetLog() != nil && !reflect.DeepEqual(cs.agentConfig.Log, updatedLog) {
198+
slog.InfoContext(ctx, "Updating log configuration", "log", updatedLog)
199+
cs.agentConfig.Log = updatedLog
198200

199-
// Reinitialize logger with new configuration
200201
slogger := logger.New(
201202
cs.agentConfig.Log.Path,
202203
cs.agentConfig.Log.Level,
203204
)
204205
slog.SetDefault(slogger)
205206
}
206207

207-
return nil
208+
return cs.agentConfig, nil
208209
}
209210

210211
// Subscribe to the Management Plane for incoming commands.

internal/command/command_service_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,11 +400,13 @@ func TestCommandService_UpdateAgentConfiguration(t *testing.T) {
400400
LogPath: "/etc/nginx-agent",
401401
},
402402
}
403-
404-
err := commandService.UpdateAgentConfiguration(ctx, updatedConfig)
403+
404+
newAgentConfig, err := commandService.UpdateAgentConfig(ctx, updatedConfig)
405405
require.NoError(t, err)
406406
require.Equal(t, "DEBUG", commandService.agentConfig.Log.Level)
407407
require.Equal(t, "/etc/nginx-agent", commandService.agentConfig.Log.Path)
408+
require.Equal(t, "DEBUG", newAgentConfig.Log.Level)
409+
require.Equal(t, "/etc/nginx-agent", newAgentConfig.Log.Path)
408410

409411
updatedLogger := slog.Default()
410412
require.NotEqual(t, originalLogger, updatedLogger)

0 commit comments

Comments
 (0)