Skip to content

Commit 8c422cc

Browse files
committed
Merge branch 'add-auxiliary-command-server-to-agent-config' into add-read-only-file-plugin
2 parents a6b6e52 + 650a0d9 commit 8c422cc

File tree

5 files changed

+62
-34
lines changed

5 files changed

+62
-34
lines changed

internal/command/command_plugin.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ var _ bus.Plugin = (*CommandPlugin)(nil)
2525

2626
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate
2727
//counterfeiter:generate . commandService
28+
type ServerType int
29+
30+
const (
31+
Command ServerType = iota
32+
Auxiliary
33+
)
34+
35+
var serverType = map[ServerType]string{
36+
Command: "command",
37+
Auxiliary: "auxiliary",
38+
}
2839

2940
type (
3041
commandService interface {
@@ -44,13 +55,13 @@ type (
4455
conn grpc.GrpcConnectionInterface
4556
commandService commandService
4657
subscribeChannel chan *mpi.ManagementPlaneRequest
47-
commandServerType string
58+
commandServerType ServerType
4859
subscribeMutex sync.Mutex
4960
}
5061
)
5162

5263
func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface,
53-
commandServerType string,
64+
commandServerType ServerType,
5465
) *CommandPlugin {
5566
return &CommandPlugin{
5667
config: agentConfig,
@@ -61,22 +72,22 @@ func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnec
6172
}
6273

6374
func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error {
64-
slog.DebugContext(ctx, "Starting command plugin", "command_server_type", cp.commandServerType)
75+
newCtx := context.WithValue(
76+
ctx,
77+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
78+
)
79+
slog.DebugContext(newCtx, "Starting command plugin", "command_server_type", cp.commandServerType.String())
6580

6681
cp.messagePipe = messagePipe
6782
cp.commandService = NewCommandService(cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)
6883

69-
newCtx := context.WithValue(
70-
ctx,
71-
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType),
72-
)
7384
go cp.monitorSubscribeChannel(newCtx)
7485

7586
return nil
7687
}
7788

7889
func (cp *CommandPlugin) Close(ctx context.Context) error {
79-
slog.InfoContext(ctx, "Closing command plugin", "command_server_type", cp.commandServerType)
90+
slog.InfoContext(ctx, "Closing command plugin", "command_server_type", cp.commandServerType.String())
8091

8192
cp.subscribeMutex.Lock()
8293
if cp.subscribeCancel != nil {
@@ -89,14 +100,14 @@ func (cp *CommandPlugin) Close(ctx context.Context) error {
89100

90101
func (cp *CommandPlugin) Info() *bus.Info {
91102
return &bus.Info{
92-
Name: cp.commandServerType,
103+
Name: cp.commandServerType.String(),
93104
}
94105
}
95106

96107
func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
97108
slog.DebugContext(ctx, "Processing command", "command_server_type", logger.ServerType(ctx))
98109

99-
if logger.ServerType(ctx) == cp.commandServerType || logger.ServerType(ctx) == "" {
110+
if logger.ServerType(ctx) == cp.commandServerType.String() || logger.ServerType(ctx) == "" {
100111
switch msg.Topic {
101112
case bus.ConnectionResetTopic:
102113
cp.processConnectionReset(ctx, msg)
@@ -111,9 +122,6 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
111122
default:
112123
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
113124
}
114-
} else {
115-
slog.Info("Sever type is not right ignoring message", "command_server_type",
116-
logger.ServerType(ctx), "topic", msg.Topic)
117125
}
118126
}
119127

@@ -123,7 +131,7 @@ func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Mes
123131
if !cp.commandService.IsConnected() {
124132
newCtx := context.WithValue(
125133
ctx,
126-
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType),
134+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
127135
)
128136
cp.createConnection(newCtx, resource)
129137
} else {
@@ -185,7 +193,7 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes
185193
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
186194
if err != nil {
187195
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err,
188-
"command_server_type", cp.commandServerType)
196+
"command_server_type", cp.commandServerType.String())
189197
}
190198
}
191199
}
@@ -248,9 +256,10 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
248256
slog.InfoContext(ctx, "Received management plane config upload request")
249257
cp.handleConfigUploadRequest(newCtx, message)
250258
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
251-
if cp.commandServerType != "command" {
259+
if cp.commandServerType != Command {
252260
slog.WarnContext(newCtx, "Auxiliary command server can not perform config apply",
253-
"command_server_type", cp.commandServerType)
261+
"command_server_type", cp.commandServerType.String())
262+
cp.handleInvalidRequest(newCtx, message)
254263

255264
return
256265
}
@@ -260,9 +269,10 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
260269
slog.InfoContext(ctx, "Received management plane health request")
261270
cp.handleHealthRequest(newCtx)
262271
case *mpi.ManagementPlaneRequest_ActionRequest:
263-
if cp.commandServerType != "command" {
272+
if cp.commandServerType != Command {
264273
slog.WarnContext(newCtx, "Auxiliary command server can not perform api action",
265-
"command_server_type", cp.commandServerType)
274+
"command_server_type", cp.commandServerType.String())
275+
cp.handleInvalidRequest(newCtx, message)
266276

267277
return
268278
}
@@ -354,6 +364,21 @@ func (cp *CommandPlugin) handleHealthRequest(newCtx context.Context) {
354364
cp.messagePipe.Process(newCtx, &bus.Message{Topic: bus.DataPlaneHealthRequestTopic})
355365
}
356366

367+
func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context, message *mpi.ManagementPlaneRequest) {
368+
err := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{
369+
MessageMeta: message.GetMessageMeta(),
370+
CommandResponse: &mpi.CommandResponse{
371+
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
372+
Message: "Can not perform write action as auxiliary command server",
373+
Error: "request not allowed",
374+
},
375+
InstanceId: message.GetActionRequest().GetInstanceId(),
376+
})
377+
if err != nil {
378+
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
379+
}
380+
}
381+
357382
func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
358383
message, err string,
359384
) *mpi.DataPlaneResponse {
@@ -370,3 +395,7 @@ func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mp
370395
},
371396
}
372397
}
398+
399+
func (s ServerType) String() string {
400+
return serverType[s]
401+
}

internal/command/command_plugin_test.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,15 @@ import (
3131
"github.com/stretchr/testify/require"
3232
)
3333

34-
const serverType = "command"
35-
3634
func TestCommandPlugin_Info(t *testing.T) {
37-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
35+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
3836
info := commandPlugin.Info()
3937

4038
assert.Equal(t, "command", info.Name)
4139
}
4240

4341
func TestCommandPlugin_Subscriptions(t *testing.T) {
44-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
42+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
4543
subscriptions := commandPlugin.Subscriptions()
4644

4745
assert.Equal(
@@ -62,7 +60,7 @@ func TestCommandPlugin_Init(t *testing.T) {
6260
messagePipe := busfakes.NewFakeMessagePipe()
6361
fakeCommandService := &commandfakes.FakeCommandService{}
6462

65-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
63+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
6664
err := commandPlugin.Init(ctx, messagePipe)
6765
require.NoError(t, err)
6866

@@ -81,7 +79,7 @@ func TestCommandPlugin_createConnection(t *testing.T) {
8179
commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil)
8280
messagePipe := busfakes.NewFakeMessagePipe()
8381

84-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
82+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
8583
err := commandPlugin.Init(ctx, messagePipe)
8684
commandPlugin.commandService = commandService
8785
require.NoError(t, err)
@@ -113,7 +111,7 @@ func TestCommandPlugin_Process(t *testing.T) {
113111
messagePipe := busfakes.NewFakeMessagePipe()
114112
fakeCommandService := &commandfakes.FakeCommandService{}
115113

116-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
114+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
117115
err := commandPlugin.Init(ctx, messagePipe)
118116
require.NoError(t, err)
119117
defer commandPlugin.Close(ctx)
@@ -221,7 +219,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {
221219

222220
agentConfig := types.AgentConfig()
223221
agentConfig.Features = test.configFeatures
224-
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
222+
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, Command)
225223
err := commandPlugin.Init(ctx, messagePipe)
226224
require.NoError(tt, err)
227225
defer commandPlugin.Close(ctx)
@@ -321,7 +319,7 @@ func TestCommandPlugin_FeatureDisabled(t *testing.T) {
321319

322320
agentConfig.Features = test.configFeatures
323321

324-
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
322+
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, Command)
325323
err := commandPlugin.Init(ctx, messagePipe)
326324
commandPlugin.commandService = fakeCommandService
327325
require.NoError(tt, err)
@@ -346,7 +344,7 @@ func TestMonitorSubscribeChannel(t *testing.T) {
346344
logBuf := &bytes.Buffer{}
347345
stub.StubLoggerWith(logBuf)
348346

349-
cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
347+
cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
350348
cp.subscribeCancel = cncl
351349

352350
message := protos.CreateManagementPlaneRequest()
@@ -385,7 +383,7 @@ func Test_createDataPlaneResponse(t *testing.T) {
385383
Error: "",
386384
},
387385
}
388-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
386+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
389387
result := commandPlugin.createDataPlaneResponse(expected.GetMessageMeta().GetCorrelationId(),
390388
expected.GetCommandResponse().GetStatus(),
391389
expected.GetCommandResponse().GetMessage(), expected.GetCommandResponse().GetError())

internal/command/command_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (cs *CommandService) UpdateClient(ctx context.Context, client mpi.CommandSe
261261
cs.subscribeClientMutex.Unlock()
262262

263263
cs.isConnected.Store(false)
264-
// Will this have the sever type ?
264+
265265
resp, err := cs.CreateConnection(ctx, cs.resource)
266266
if err != nil {
267267
return err

internal/logger/logger.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func New(logPath, level string) *slog.Logger {
7575
contextHandler{
7676
handler, []any{
7777
CorrelationIDContextKey,
78+
ServerTypeContextKey,
7879
},
7980
})
8081
}

internal/plugin/plugin_manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo
4747
if err != nil {
4848
slog.WarnContext(ctx, "Failed to create gRPC connection for command server", "error", err)
4949
} else {
50-
commandPlugin := command.NewCommandPlugin(agentConfig, grpcConnection, "command")
50+
commandPlugin := command.NewCommandPlugin(agentConfig, grpcConnection, command.Command)
5151
plugins = append(plugins, commandPlugin)
5252
filePlugin := file.NewFilePlugin(agentConfig, grpcConnection)
5353
plugins = append(plugins, filePlugin)
@@ -68,13 +68,13 @@ func addAuxiliaryCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin
6868
if err != nil {
6969
slog.WarnContext(ctx, "Failed to create gRPC connection for auxiliary command server", "error", err)
7070
} else {
71-
auxCommandPlugin := command.NewCommandPlugin(agentConfig, auxGRPCConnection, "auxiliary")
71+
auxCommandPlugin := command.NewCommandPlugin(agentConfig, auxGRPCConnection, command.Auxiliary)
7272
plugins = append(plugins, auxCommandPlugin)
7373
readFilePlugin := file.NewReadFilePlugin(agentConfig, auxGRPCConnection)
7474
plugins = append(plugins, readFilePlugin)
7575
}
7676
} else {
77-
slog.InfoContext(ctx, "Agent is not connected to an auxiliary management plane. "+
77+
slog.DebugContext(ctx, "Agent is not connected to an auxiliary management plane. "+
7878
"Configure a auxiliary command server to establish a connection.")
7979
}
8080

0 commit comments

Comments
 (0)