@@ -25,6 +25,17 @@ var _ bus.Plugin = (*CommandPlugin)(nil)
2525
2626//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate
2727//counterfeiter:generate . commandService
28+ type ServerType int
29+
30+ const (
31+ Command ServerType = iota
32+ Auxiliary
33+ )
34+
35+ var serverType = map [ServerType ]string {
36+ Command : "command" ,
37+ Auxiliary : "auxiliary" ,
38+ }
2839
2940type (
3041 commandService interface {
@@ -44,13 +55,13 @@ type (
4455 conn grpc.GrpcConnectionInterface
4556 commandService commandService
4657 subscribeChannel chan * mpi.ManagementPlaneRequest
47- commandServerType string
58+ commandServerType ServerType
4859 subscribeMutex sync.Mutex
4960 }
5061)
5162
5263func NewCommandPlugin (agentConfig * config.Config , grpcConnection grpc.GrpcConnectionInterface ,
53- commandServerType string ,
64+ commandServerType ServerType ,
5465) * CommandPlugin {
5566 return & CommandPlugin {
5667 config : agentConfig ,
@@ -61,22 +72,22 @@ func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnec
6172}
6273
6374func (cp * CommandPlugin ) Init (ctx context.Context , messagePipe bus.MessagePipeInterface ) error {
64- slog .DebugContext (ctx , "Starting command plugin" , "command_server_type" , cp .commandServerType )
75+ newCtx := context .WithValue (
76+ ctx ,
77+ logger .ServerTypeContextKey , slog .Any (logger .ServerTypeKey , cp .commandServerType .String ()),
78+ )
79+ slog .DebugContext (newCtx , "Starting command plugin" , "command_server_type" , cp .commandServerType .String ())
6580
6681 cp .messagePipe = messagePipe
6782 cp .commandService = NewCommandService (cp .conn .CommandServiceClient (), cp .config , cp .subscribeChannel )
6883
69- newCtx := context .WithValue (
70- ctx ,
71- logger .ServerTypeContextKey , slog .Any (logger .ServerTypeKey , cp .commandServerType ),
72- )
7384 go cp .monitorSubscribeChannel (newCtx )
7485
7586 return nil
7687}
7788
7889func (cp * CommandPlugin ) Close (ctx context.Context ) error {
79- slog .InfoContext (ctx , "Closing command plugin" , "command_server_type" , cp .commandServerType )
90+ slog .InfoContext (ctx , "Closing command plugin" , "command_server_type" , cp .commandServerType . String () )
8091
8192 cp .subscribeMutex .Lock ()
8293 if cp .subscribeCancel != nil {
@@ -89,14 +100,14 @@ func (cp *CommandPlugin) Close(ctx context.Context) error {
89100
90101func (cp * CommandPlugin ) Info () * bus.Info {
91102 return & bus.Info {
92- Name : cp .commandServerType ,
103+ Name : cp .commandServerType . String () ,
93104 }
94105}
95106
96107func (cp * CommandPlugin ) Process (ctx context.Context , msg * bus.Message ) {
97108 slog .DebugContext (ctx , "Processing command" , "command_server_type" , logger .ServerType (ctx ))
98109
99- if logger .ServerType (ctx ) == cp .commandServerType || logger .ServerType (ctx ) == "" {
110+ if logger .ServerType (ctx ) == cp .commandServerType . String () || logger .ServerType (ctx ) == "" {
100111 switch msg .Topic {
101112 case bus .ConnectionResetTopic :
102113 cp .processConnectionReset (ctx , msg )
@@ -111,9 +122,6 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
111122 default :
112123 slog .DebugContext (ctx , "Command plugin received unknown topic" , "topic" , msg .Topic )
113124 }
114- } else {
115- slog .Info ("Sever type is not right ignoring message" , "command_server_type" ,
116- logger .ServerType (ctx ), "topic" , msg .Topic )
117125 }
118126}
119127
@@ -123,7 +131,7 @@ func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Mes
123131 if ! cp .commandService .IsConnected () {
124132 newCtx := context .WithValue (
125133 ctx ,
126- logger .ServerTypeContextKey , slog .Any (logger .ServerTypeKey , cp .commandServerType ),
134+ logger .ServerTypeContextKey , slog .Any (logger .ServerTypeKey , cp .commandServerType . String () ),
127135 )
128136 cp .createConnection (newCtx , resource )
129137 } else {
@@ -185,7 +193,7 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes
185193 err := cp .commandService .UpdateDataPlaneHealth (ctx , instances )
186194 if err != nil {
187195 slog .ErrorContext (ctx , "Unable to update data plane health" , "error" , err ,
188- "command_server_type" , cp .commandServerType )
196+ "command_server_type" , cp .commandServerType . String () )
189197 }
190198 }
191199}
@@ -248,9 +256,10 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
248256 slog .InfoContext (ctx , "Received management plane config upload request" )
249257 cp .handleConfigUploadRequest (newCtx , message )
250258 case * mpi.ManagementPlaneRequest_ConfigApplyRequest :
251- if cp .commandServerType != "command" {
259+ if cp .commandServerType != Command {
252260 slog .WarnContext (newCtx , "Auxiliary command server can not perform config apply" ,
253- "command_server_type" , cp .commandServerType )
261+ "command_server_type" , cp .commandServerType .String ())
262+ cp .handleInvalidRequest (newCtx , message )
254263
255264 return
256265 }
@@ -260,9 +269,10 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
260269 slog .InfoContext (ctx , "Received management plane health request" )
261270 cp .handleHealthRequest (newCtx )
262271 case * mpi.ManagementPlaneRequest_ActionRequest :
263- if cp .commandServerType != "command" {
272+ if cp .commandServerType != Command {
264273 slog .WarnContext (newCtx , "Auxiliary command server can not perform api action" ,
265- "command_server_type" , cp .commandServerType )
274+ "command_server_type" , cp .commandServerType .String ())
275+ cp .handleInvalidRequest (newCtx , message )
266276
267277 return
268278 }
@@ -354,6 +364,21 @@ func (cp *CommandPlugin) handleHealthRequest(newCtx context.Context) {
354364 cp .messagePipe .Process (newCtx , & bus.Message {Topic : bus .DataPlaneHealthRequestTopic })
355365}
356366
367+ func (cp * CommandPlugin ) handleInvalidRequest (ctx context.Context , message * mpi.ManagementPlaneRequest ) {
368+ err := cp .commandService .SendDataPlaneResponse (ctx , & mpi.DataPlaneResponse {
369+ MessageMeta : message .GetMessageMeta (),
370+ CommandResponse : & mpi.CommandResponse {
371+ Status : mpi .CommandResponse_COMMAND_STATUS_FAILURE ,
372+ Message : "Can not perform write action as auxiliary command server" ,
373+ Error : "request not allowed" ,
374+ },
375+ InstanceId : message .GetActionRequest ().GetInstanceId (),
376+ })
377+ if err != nil {
378+ slog .ErrorContext (ctx , "Unable to send data plane response" , "error" , err )
379+ }
380+ }
381+
357382func (cp * CommandPlugin ) createDataPlaneResponse (correlationID string , status mpi.CommandResponse_CommandStatus ,
358383 message , err string ,
359384) * mpi.DataPlaneResponse {
@@ -370,3 +395,7 @@ func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mp
370395 },
371396 }
372397}
398+
399+ func (s ServerType ) String () string {
400+ return serverType [s ]
401+ }
0 commit comments