Skip to content

Commit 6cd2a29

Browse files
authored
Refactor agent logging (#1447)
1 parent a403571 commit 6cd2a29

27 files changed

+754
-358
lines changed

api/grpc/mpi/v1/command.pb.go

Lines changed: 219 additions & 134 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/grpc/mpi/v1/command.pb.validate.go

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/grpc/mpi/v1/command.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,25 @@ message UpdateDataPlaneHealthResponse {}
152152

153153
// Reports the status of an associated command. This may be in response to a ManagementPlaneRequest
154154
message DataPlaneResponse {
155+
enum RequestType {
156+
UNSPECIFIED_REQUEST = 0;
157+
CONFIG_APPLY_REQUEST = 1;
158+
CONFIG_UPLOAD_REQUEST = 2;
159+
HEALTH_REQUEST = 3;
160+
STATUS_REQUEST = 4;
161+
API_ACTION_REQUEST = 5;
162+
COMMAND_STATUS_REQUEST = 6;
163+
UPDATE_AGENT_CONFIG_REQUEST = 7;
164+
}
165+
155166
// Meta-information associated with a message
156167
mpi.v1.MessageMeta message_meta = 1;
157168
// The command response with the associated request
158169
mpi.v1.CommandResponse command_response = 2;
159170
// The instance identifier, if applicable, for this response
160171
string instance_id = 3;
172+
// The management plane request type that is being responded to
173+
RequestType request_type = 4;
161174
}
162175

163176
// A Management Plane request for information, triggers an associated rpc on the Data Plane

docs/proto/protos.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
- [UpdateHTTPUpstreamServers](#mpi-v1-UpdateHTTPUpstreamServers)
8686
- [UpdateStreamServers](#mpi-v1-UpdateStreamServers)
8787

88+
- [DataPlaneResponse.RequestType](#mpi-v1-DataPlaneResponse-RequestType)
8889
- [InstanceHealth.InstanceHealthStatus](#mpi-v1-InstanceHealth-InstanceHealthStatus)
8990
- [InstanceMeta.InstanceType](#mpi-v1-InstanceMeta-InstanceType)
9091
- [Log.LogLevel](#mpi-v1-Log-LogLevel)
@@ -862,6 +863,7 @@ Reports the status of an associated command. This may be in response to a Manage
862863
| message_meta | [MessageMeta](#mpi-v1-MessageMeta) | | Meta-information associated with a message |
863864
| command_response | [CommandResponse](#mpi-v1-CommandResponse) | | The command response with the associated request |
864865
| instance_id | [string](#string) | | The instance identifier, if applicable, for this response |
866+
| request_type | [DataPlaneResponse.RequestType](#mpi-v1-DataPlaneResponse-RequestType) | | The management plane request type that is being responded to |
865867

866868

867869

@@ -1326,6 +1328,24 @@ Update Upstream Stream Servers for an instance
13261328

13271329

13281330

1331+
<a name="mpi-v1-DataPlaneResponse-RequestType"></a>
1332+
1333+
### DataPlaneResponse.RequestType
1334+
1335+
1336+
| Name | Number | Description |
1337+
| ---- | ------ | ----------- |
1338+
| UNSPECIFIED_REQUEST | 0 | |
1339+
| CONFIG_APPLY_REQUEST | 1 | |
1340+
| CONFIG_UPLOAD_REQUEST | 2 | |
1341+
| HEALTH_REQUEST | 3 | |
1342+
| STATUS_REQUEST | 4 | |
1343+
| API_ACTION_REQUEST | 5 | |
1344+
| COMMAND_STATUS_REQUEST | 6 | |
1345+
| UPDATE_AGENT_CONFIG_REQUEST | 7 | |
1346+
1347+
1348+
13291349
<a name="mpi-v1-InstanceHealth-InstanceHealthStatus"></a>
13301350

13311351
### InstanceHealth.InstanceHealthStatus

internal/bus/message_pipe.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,13 @@ func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentCon
206206

207207
// If the agent update was received from a create connection request no data plane response needs to be sent
208208
if topic == AgentConfigUpdateTopic {
209-
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
210-
"Failed to update agent config", reconfigureError.Error())
209+
response := p.createDataPlaneResponse(
210+
correlationID,
211+
mpi.CommandResponse_COMMAND_STATUS_FAILURE,
212+
mpi.DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST,
213+
"Failed to update agent config",
214+
reconfigureError.Error(),
215+
)
211216
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
212217
}
213218

@@ -222,8 +227,13 @@ func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentCon
222227

223228
slog.InfoContext(ctx, "Finished reconfiguring plugins", "plugins", p.plugins)
224229
if topic == AgentConfigUpdateTopic {
225-
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
226-
"Successfully updated agent config", "")
230+
response := p.createDataPlaneResponse(
231+
correlationID,
232+
mpi.CommandResponse_COMMAND_STATUS_OK,
233+
mpi.DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST,
234+
"Successfully updated agent config",
235+
"",
236+
)
227237
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
228238
}
229239
}
@@ -339,7 +349,10 @@ func (p *MessagePipe) initPlugins(ctx context.Context) {
339349
}
340350
}
341351

342-
func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
352+
func (p *MessagePipe) createDataPlaneResponse(
353+
correlationID string,
354+
status mpi.CommandResponse_CommandStatus,
355+
requestType mpi.DataPlaneResponse_RequestType,
343356
message, err string,
344357
) *mpi.DataPlaneResponse {
345358
return &mpi.DataPlaneResponse{
@@ -353,6 +366,7 @@ func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.C
353366
Message: message,
354367
Error: err,
355368
},
369+
RequestType: requestType,
356370
}
357371
}
358372

internal/collector/otel_collector_plugin.go

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro
143143
return errors.New("OTel collector already running")
144144
}
145145

146-
slog.InfoContext(ctx, "Starting OTel collector")
147146
bootErr := oc.bootup(runCtx)
148147
if bootErr != nil {
149148
slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr)
@@ -163,30 +162,7 @@ func (oc *Collector) Info() *bus.Info {
163162
func (oc *Collector) Close(ctx context.Context) error {
164163
slog.InfoContext(ctx, "Closing OTel Collector plugin")
165164

166-
if !oc.stopped {
167-
slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState())
168-
oc.service.Shutdown()
169-
oc.cancel()
170-
171-
settings := *oc.config.Client.Backoff
172-
settings.MaxElapsedTime = maxTimeToWaitForShutdown
173-
err := backoff.WaitUntil(ctx, &settings, func() error {
174-
if oc.service.GetState() == otelcol.StateClosed {
175-
return nil
176-
}
177-
178-
return errors.New("OTel Collector not in a closed state yet")
179-
})
180-
181-
if err != nil {
182-
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState())
183-
} else {
184-
slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState())
185-
oc.stopped = true
186-
}
187-
}
188-
189-
return nil
165+
return oc.shutdownCollector(ctx)
190166
}
191167

192168
// Process an incoming Message Bus message in the plugin
@@ -236,6 +212,33 @@ func (oc *Collector) Reconfigure(ctx context.Context, agentConfig *config.Config
236212
return nil
237213
}
238214

215+
func (oc *Collector) shutdownCollector(ctx context.Context) error {
216+
if !oc.stopped {
217+
slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState())
218+
oc.service.Shutdown()
219+
oc.cancel()
220+
221+
settings := *oc.config.Client.Backoff
222+
settings.MaxElapsedTime = maxTimeToWaitForShutdown
223+
err := backoff.WaitUntil(ctx, &settings, func() error {
224+
if oc.service.GetState() == otelcol.StateClosed {
225+
return nil
226+
}
227+
228+
return errors.New("OTel Collector not in a closed state yet")
229+
})
230+
231+
if err != nil {
232+
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState())
233+
} else {
234+
slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState())
235+
oc.stopped = true
236+
}
237+
}
238+
239+
return nil
240+
}
241+
239242
// Process receivers and log warning for sub-optimal configurations
240243
func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) {
241244
for _, receiver := range receivers {
@@ -275,11 +278,12 @@ func (oc *Collector) bootup(ctx context.Context) error {
275278
oc.setProxyIfNeeded(ctx)
276279
}
277280

281+
slog.InfoContext(ctx, "Starting OTel collector")
278282
appErr := oc.service.Run(ctx)
279283
if appErr != nil {
280284
errChan <- appErr
281285
}
282-
slog.InfoContext(ctx, "OTel collector run finished")
286+
slog.InfoContext(ctx, "OTel collector has stopped running")
283287
}()
284288

285289
for {
@@ -453,7 +457,7 @@ func (oc *Collector) writeRunningConfig(ctx context.Context, settings otelcol.Co
453457
}
454458

455459
func (oc *Collector) restartCollector(ctx context.Context) {
456-
err := oc.Close(ctx)
460+
err := oc.shutdownCollector(ctx)
457461
if err != nil {
458462
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err)
459463
return

internal/collector/settings.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,20 +96,20 @@ func createFile(confPath string) error {
9696

9797
// Generates an OTel Collector config to a file by injecting the Metrics Config to a Go template.
9898
func writeCollectorConfig(conf *config.Collector) error {
99+
confPath := filepath.Clean(conf.ConfigPath)
100+
101+
slog.Info("Writing OTel collector config", "config_path", confPath)
102+
99103
if conf.Processors.Resource["default"] != nil {
100104
addDefaultResourceProcessor(conf.Pipelines.Metrics)
101105
addDefaultResourceProcessor(conf.Pipelines.Logs)
102106
}
103107

104-
slog.Info("Writing OTel collector config")
105-
106108
otelcolTemplate, templateErr := template.New(otelTemplatePath).Parse(otelcolTemplate)
107109
if templateErr != nil {
108110
return templateErr
109111
}
110112

111-
confPath := filepath.Clean(conf.ConfigPath)
112-
113113
// Ensure file exists and has correct permissions
114114
if err := ensureFileExists(confPath); err != nil {
115115
return err

internal/command/command_plugin.go

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,24 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me
217217

218218
cp.processDataPlaneResponse(ctx, &bus.Message{
219219
Topic: bus.DataPlaneResponseTopic,
220-
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
221-
"Failed to send the health status update", err.Error()),
220+
Data: cp.createDataPlaneResponse(
221+
correlationID,
222+
mpi.CommandResponse_COMMAND_STATUS_FAILURE,
223+
mpi.DataPlaneResponse_HEALTH_REQUEST,
224+
"Failed to send the health status update",
225+
err.Error(),
226+
),
222227
})
223228
}
224229
cp.processDataPlaneResponse(ctx, &bus.Message{
225230
Topic: bus.DataPlaneResponseTopic,
226-
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
227-
"Successfully sent health status update", ""),
231+
Data: cp.createDataPlaneResponse(
232+
correlationID,
233+
mpi.CommandResponse_COMMAND_STATUS_OK,
234+
mpi.DataPlaneResponse_HEALTH_REQUEST,
235+
"Successfully sent health status update",
236+
"",
237+
),
228238
})
229239
}
230240
}
@@ -242,8 +252,25 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes
242252
func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.Message) {
243253
slog.DebugContext(ctx, "Command plugin received data plane response message")
244254
if response, ok := msg.Data.(*mpi.DataPlaneResponse); ok {
245-
slog.InfoContext(ctx, "Sending data plane response message", "message",
246-
response.GetCommandResponse().GetMessage(), "status", response.GetCommandResponse().GetStatus())
255+
// To prevent this type of request from spamming the logs too much, we use debug level
256+
if response.GetRequestType() != mpi.DataPlaneResponse_HEALTH_REQUEST {
257+
slog.InfoContext(
258+
ctx,
259+
"Sending data plane response message",
260+
"message", response.GetCommandResponse().GetMessage(),
261+
"status", response.GetCommandResponse().GetStatus(),
262+
"error", response.GetCommandResponse().GetError(),
263+
)
264+
} else {
265+
slog.DebugContext(
266+
ctx,
267+
"Sending data plane response message",
268+
"message", response.GetCommandResponse().GetMessage(),
269+
"status", response.GetCommandResponse().GetStatus(),
270+
"error", response.GetCommandResponse().GetError(),
271+
)
272+
}
273+
247274
err := cp.commandService.SendDataPlaneResponse(ctx, response)
248275
if err != nil {
249276
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
@@ -318,7 +345,8 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
318345
slog.InfoContext(ctx, "Received management plane config apply request")
319346
cp.handleConfigApplyRequest(newCtx, message)
320347
case *mpi.ManagementPlaneRequest_HealthRequest:
321-
slog.InfoContext(ctx, "Received management plane health request")
348+
// To prevent this type of request from spamming the logs too much, we use debug level
349+
slog.DebugContext(ctx, "Received management plane health request")
322350
cp.handleHealthRequest(newCtx)
323351
case *mpi.ManagementPlaneRequest_ActionRequest:
324352
if cp.commandServerType != model.Command {
@@ -445,7 +473,10 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context,
445473
}
446474
}
447475

448-
func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
476+
func (cp *CommandPlugin) createDataPlaneResponse(
477+
correlationID string,
478+
status mpi.CommandResponse_CommandStatus,
479+
requestType mpi.DataPlaneResponse_RequestType,
449480
message, err string,
450481
) *mpi.DataPlaneResponse {
451482
return &mpi.DataPlaneResponse{
@@ -459,5 +490,6 @@ func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mp
459490
Message: message,
460491
Error: err,
461492
},
493+
RequestType: requestType,
462494
}
463495
}

internal/command/command_plugin_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ func Test_createDataPlaneResponse(t *testing.T) {
434434
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
435435
result := commandPlugin.createDataPlaneResponse(expected.GetMessageMeta().GetCorrelationId(),
436436
expected.GetCommandResponse().GetStatus(),
437+
expected.GetRequestType(),
437438
expected.GetCommandResponse().GetMessage(), expected.GetCommandResponse().GetError())
438439

439440
assert.Equal(t, expected.GetCommandResponse(), result.GetCommandResponse())

internal/command/command_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, e
524524
return nil
525525
}
526526

527-
slog.ErrorContext(ctx, "Failed to"+errorMsg, "error", err)
527+
slog.ErrorContext(ctx, "Failed to "+errorMsg, "error", err)
528528

529529
return err
530530
}

0 commit comments

Comments
 (0)