Skip to content

Commit e07bf64

Browse files
committed
allow multiple management planes
1 parent c42a042 commit e07bf64

File tree

14 files changed

+365
-101
lines changed

14 files changed

+365
-101
lines changed

internal/command/command_plugin.go

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,37 +38,45 @@ type (
3838
}
3939

4040
CommandPlugin struct {
41-
messagePipe bus.MessagePipeInterface
42-
config *config.Config
43-
subscribeCancel context.CancelFunc
44-
conn grpc.GrpcConnectionInterface
45-
commandService commandService
46-
subscribeChannel chan *mpi.ManagementPlaneRequest
47-
subscribeMutex sync.Mutex
41+
messagePipe bus.MessagePipeInterface
42+
config *config.Config
43+
subscribeCancel context.CancelFunc
44+
conn grpc.GrpcConnectionInterface
45+
commandService commandService
46+
subscribeChannel chan *mpi.ManagementPlaneRequest
47+
commandServerType string
48+
subscribeMutex sync.Mutex
4849
}
4950
)
5051

51-
func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface) *CommandPlugin {
52+
func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface,
53+
commandServerType string,
54+
) *CommandPlugin {
5255
return &CommandPlugin{
53-
config: agentConfig,
54-
conn: grpcConnection,
55-
subscribeChannel: make(chan *mpi.ManagementPlaneRequest),
56+
config: agentConfig,
57+
conn: grpcConnection,
58+
subscribeChannel: make(chan *mpi.ManagementPlaneRequest),
59+
commandServerType: commandServerType,
5660
}
5761
}
5862

5963
func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error {
60-
slog.DebugContext(ctx, "Starting command plugin")
64+
slog.DebugContext(ctx, "Starting command plugin", "command_server_type", cp.commandServerType)
6165

6266
cp.messagePipe = messagePipe
6367
cp.commandService = NewCommandService(cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)
6468

65-
go cp.monitorSubscribeChannel(ctx)
69+
newCtx := context.WithValue(
70+
ctx,
71+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType),
72+
)
73+
go cp.monitorSubscribeChannel(newCtx)
6674

6775
return nil
6876
}
6977

7078
func (cp *CommandPlugin) Close(ctx context.Context) error {
71-
slog.InfoContext(ctx, "Closing command plugin")
79+
slog.InfoContext(ctx, "Closing command plugin", "command_server_type", cp.commandServerType)
7280

7381
cp.subscribeMutex.Lock()
7482
if cp.subscribeCancel != nil {
@@ -81,32 +89,43 @@ func (cp *CommandPlugin) Close(ctx context.Context) error {
8189

8290
func (cp *CommandPlugin) Info() *bus.Info {
8391
return &bus.Info{
84-
Name: "command",
92+
Name: cp.commandServerType,
8593
}
8694
}
8795

8896
func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
89-
switch msg.Topic {
90-
case bus.ConnectionResetTopic:
91-
cp.processConnectionReset(ctx, msg)
92-
case bus.ResourceUpdateTopic:
93-
cp.processResourceUpdate(ctx, msg)
94-
case bus.InstanceHealthTopic:
95-
cp.processInstanceHealth(ctx, msg)
96-
case bus.DataPlaneHealthResponseTopic:
97-
cp.processDataPlaneHealth(ctx, msg)
98-
case bus.DataPlaneResponseTopic:
99-
cp.processDataPlaneResponse(ctx, msg)
100-
default:
101-
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
97+
slog.DebugContext(ctx, "Processing command", "command_server_type", logger.ServerType(ctx))
98+
99+
if logger.ServerType(ctx) == cp.commandServerType || logger.ServerType(ctx) == "" {
100+
switch msg.Topic {
101+
case bus.ConnectionResetTopic:
102+
cp.processConnectionReset(ctx, msg)
103+
case bus.ResourceUpdateTopic:
104+
cp.processResourceUpdate(ctx, msg)
105+
case bus.InstanceHealthTopic:
106+
cp.processInstanceHealth(ctx, msg)
107+
case bus.DataPlaneHealthResponseTopic:
108+
cp.processDataPlaneHealth(ctx, msg)
109+
case bus.DataPlaneResponseTopic:
110+
cp.processDataPlaneResponse(ctx, msg)
111+
default:
112+
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
113+
}
114+
} else {
115+
slog.Info("Sever type is not right ignoring message", "command_server_type",
116+
logger.ServerType(ctx), "topic", msg.Topic)
102117
}
103118
}
104119

105120
func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Message) {
106121
slog.DebugContext(ctx, "Command plugin received resource update message")
107122
if resource, ok := msg.Data.(*mpi.Resource); ok {
108123
if !cp.commandService.IsConnected() {
109-
cp.createConnection(ctx, resource)
124+
newCtx := context.WithValue(
125+
ctx,
126+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType),
127+
)
128+
cp.createConnection(newCtx, resource)
110129
} else {
111130
statusErr := cp.commandService.UpdateDataPlaneStatus(ctx, resource)
112131
if statusErr != nil {
@@ -145,13 +164,14 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me
145164
correlationID := logger.CorrelationID(ctx)
146165
if err != nil {
147166
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
148-
cp.messagePipe.Process(ctx, &bus.Message{
167+
168+
cp.processDataPlaneResponse(ctx, &bus.Message{
149169
Topic: bus.DataPlaneResponseTopic,
150170
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
151171
"Failed to send the health status update", err.Error()),
152172
})
153173
}
154-
cp.messagePipe.Process(ctx, &bus.Message{
174+
cp.processDataPlaneResponse(ctx, &bus.Message{
155175
Topic: bus.DataPlaneResponseTopic,
156176
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
157177
"Successfully sent health status update", ""),
@@ -164,7 +184,8 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes
164184
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
165185
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
166186
if err != nil {
167-
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
187+
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err,
188+
"command_server_type", cp.commandServerType)
168189
}
169190
}
170191
}
@@ -208,6 +229,7 @@ func (cp *CommandPlugin) Subscriptions() []string {
208229
}
209230
}
210231

232+
// nolint: revive, cyclop
211233
func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
212234
for {
213235
select {
@@ -226,12 +248,24 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
226248
slog.InfoContext(ctx, "Received management plane config upload request")
227249
cp.handleConfigUploadRequest(newCtx, message)
228250
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
251+
if cp.commandServerType != "command" {
252+
slog.WarnContext(newCtx, "Auxiliary command server can not perform config apply",
253+
"command_server_type", cp.commandServerType)
254+
255+
return
256+
}
229257
slog.InfoContext(ctx, "Received management plane config apply request")
230258
cp.handleConfigApplyRequest(newCtx, message)
231259
case *mpi.ManagementPlaneRequest_HealthRequest:
232260
slog.InfoContext(ctx, "Received management plane health request")
233261
cp.handleHealthRequest(newCtx)
234262
case *mpi.ManagementPlaneRequest_ActionRequest:
263+
if cp.commandServerType != "command" {
264+
slog.WarnContext(newCtx, "Auxiliary command server can not perform api action",
265+
"command_server_type", cp.commandServerType)
266+
267+
return
268+
}
235269
slog.InfoContext(ctx, "Received management plane action request")
236270
cp.handleAPIActionRequest(newCtx, message)
237271
default:

internal/command/command_plugin_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,17 @@ import (
3131
"github.com/stretchr/testify/require"
3232
)
3333

34+
const serverType = "command"
35+
3436
func TestCommandPlugin_Info(t *testing.T) {
35-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
37+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
3638
info := commandPlugin.Info()
3739

3840
assert.Equal(t, "command", info.Name)
3941
}
4042

4143
func TestCommandPlugin_Subscriptions(t *testing.T) {
42-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
44+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
4345
subscriptions := commandPlugin.Subscriptions()
4446

4547
assert.Equal(
@@ -60,7 +62,7 @@ func TestCommandPlugin_Init(t *testing.T) {
6062
messagePipe := busfakes.NewFakeMessagePipe()
6163
fakeCommandService := &commandfakes.FakeCommandService{}
6264

63-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
65+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
6466
err := commandPlugin.Init(ctx, messagePipe)
6567
require.NoError(t, err)
6668

@@ -79,7 +81,7 @@ func TestCommandPlugin_createConnection(t *testing.T) {
7981
commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil)
8082
messagePipe := busfakes.NewFakeMessagePipe()
8183

82-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
84+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
8385
err := commandPlugin.Init(ctx, messagePipe)
8486
commandPlugin.commandService = commandService
8587
require.NoError(t, err)
@@ -111,7 +113,7 @@ func TestCommandPlugin_Process(t *testing.T) {
111113
messagePipe := busfakes.NewFakeMessagePipe()
112114
fakeCommandService := &commandfakes.FakeCommandService{}
113115

114-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
116+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
115117
err := commandPlugin.Init(ctx, messagePipe)
116118
require.NoError(t, err)
117119
defer commandPlugin.Close(ctx)
@@ -219,7 +221,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {
219221

220222
agentConfig := types.AgentConfig()
221223
agentConfig.Features = test.configFeatures
222-
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{})
224+
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
223225
err := commandPlugin.Init(ctx, messagePipe)
224226
require.NoError(tt, err)
225227
defer commandPlugin.Close(ctx)
@@ -319,7 +321,7 @@ func TestCommandPlugin_FeatureDisabled(t *testing.T) {
319321

320322
agentConfig.Features = test.configFeatures
321323

322-
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{})
324+
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
323325
err := commandPlugin.Init(ctx, messagePipe)
324326
commandPlugin.commandService = fakeCommandService
325327
require.NoError(tt, err)
@@ -344,7 +346,7 @@ func TestMonitorSubscribeChannel(t *testing.T) {
344346
logBuf := &bytes.Buffer{}
345347
stub.StubLoggerWith(logBuf)
346348

347-
cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
349+
cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
348350
cp.subscribeCancel = cncl
349351

350352
message := protos.CreateManagementPlaneRequest()
@@ -383,7 +385,7 @@ func Test_createDataPlaneResponse(t *testing.T) {
383385
Error: "",
384386
},
385387
}
386-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
388+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, serverType)
387389
result := commandPlugin.createDataPlaneResponse(expected.GetMessageMeta().GetCorrelationId(),
388390
expected.GetCommandResponse().GetStatus(),
389391
expected.GetCommandResponse().GetMessage(), expected.GetCommandResponse().GetError())

internal/command/command_service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ func (cs *CommandService) UpdateClient(ctx context.Context, client mpi.CommandSe
261261
cs.subscribeClientMutex.Unlock()
262262

263263
cs.isConnected.Store(false)
264+
// Will this have the sever type ?
264265
resp, err := cs.CreateConnection(ctx, cs.resource)
265266
if err != nil {
266267
return err

0 commit comments

Comments
 (0)