Skip to content

Commit 145571a

Browse files
committed
Merge branch 'v3' into add-nginx-plus-api-actions
2 parents 5b594b5 + 2b427d0 commit 145571a

File tree

7 files changed

+276
-24
lines changed

7 files changed

+276
-24
lines changed

internal/command/command_service.go

Lines changed: 98 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@ type (
4040
subscribeCancel context.CancelFunc
4141
subscribeChannel chan *mpi.ManagementPlaneRequest
4242
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
43+
instances []*mpi.Instance
4344
subscribeMutex sync.Mutex
4445
subscribeClientMutex sync.Mutex
4546
configApplyRequestQueueMutex sync.Mutex
47+
instancesMutex sync.Mutex
4648
}
4749
)
4850

@@ -61,6 +63,7 @@ func NewCommandService(
6163
isConnected: isConnected,
6264
subscribeChannel: subscribeChannel,
6365
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
66+
instances: []*mpi.Instance{},
6467
}
6568

6669
var subscribeCtx context.Context
@@ -127,6 +130,10 @@ func (cs *CommandService) UpdateDataPlaneStatus(
127130
}
128131
slog.DebugContext(ctx, "UpdateDataPlaneStatus response", "response", response)
129132

133+
cs.instancesMutex.Lock()
134+
defer cs.instancesMutex.Unlock()
135+
cs.instances = resource.GetInstances()
136+
130137
return err
131138
}
132139

@@ -252,6 +259,10 @@ func (cs *CommandService) CreateConnection(
252259

253260
cs.isConnected.Store(true)
254261

262+
cs.instancesMutex.Lock()
263+
defer cs.instancesMutex.Unlock()
264+
cs.instances = resource.GetInstances()
265+
255266
return response, nil
256267
}
257268

@@ -416,30 +427,101 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
416427
return recvError
417428
}
418429

419-
switch request.GetRequest().(type) {
420-
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
421-
cs.configApplyRequestQueueMutex.Lock()
422-
defer cs.configApplyRequestQueueMutex.Unlock()
423-
424-
instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
425-
cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request)
426-
if len(cs.configApplyRequestQueue[instanceID]) == 1 {
430+
if cs.isValidRequest(ctx, request) {
431+
switch request.GetRequest().(type) {
432+
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
433+
cs.queueConfigApplyRequests(ctx, request)
434+
default:
427435
cs.subscribeChannel <- request
428-
} else {
429-
slog.DebugContext(
430-
ctx,
431-
"Config apply request is already in progress, queuing new config apply request",
432-
"request", request,
433-
)
434436
}
435-
default:
436-
cs.subscribeChannel <- request
437437
}
438438

439439
return nil
440440
}
441441
}
442442

443+
func (cs *CommandService) queueConfigApplyRequests(ctx context.Context, request *mpi.ManagementPlaneRequest) {
444+
cs.configApplyRequestQueueMutex.Lock()
445+
defer cs.configApplyRequestQueueMutex.Unlock()
446+
447+
instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
448+
cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request)
449+
if len(cs.configApplyRequestQueue[instanceID]) == 1 {
450+
cs.subscribeChannel <- request
451+
} else {
452+
slog.DebugContext(
453+
ctx,
454+
"Config apply request is already in progress, queuing new config apply request",
455+
"request", request,
456+
)
457+
}
458+
}
459+
460+
func (cs *CommandService) isValidRequest(ctx context.Context, request *mpi.ManagementPlaneRequest) bool {
461+
var validRequest bool
462+
463+
switch request.GetRequest().(type) {
464+
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
465+
requestInstanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
466+
validRequest = cs.checkIfInstanceExists(ctx, request, requestInstanceID)
467+
case *mpi.ManagementPlaneRequest_ConfigUploadRequest:
468+
requestInstanceID := request.GetConfigUploadRequest().GetOverview().GetConfigVersion().GetInstanceId()
469+
validRequest = cs.checkIfInstanceExists(ctx, request, requestInstanceID)
470+
case *mpi.ManagementPlaneRequest_ActionRequest:
471+
requestInstanceID := request.GetActionRequest().GetInstanceId()
472+
validRequest = cs.checkIfInstanceExists(ctx, request, requestInstanceID)
473+
default:
474+
validRequest = true
475+
}
476+
477+
return validRequest
478+
}
479+
480+
func (cs *CommandService) checkIfInstanceExists(
481+
ctx context.Context,
482+
request *mpi.ManagementPlaneRequest,
483+
requestInstanceID string,
484+
) bool {
485+
instanceFound := false
486+
487+
cs.instancesMutex.Lock()
488+
for _, instance := range cs.instances {
489+
if instance.GetInstanceMeta().GetInstanceId() == requestInstanceID {
490+
instanceFound = true
491+
}
492+
}
493+
cs.instancesMutex.Unlock()
494+
495+
if !instanceFound {
496+
slog.WarnContext(
497+
ctx,
498+
"Unable to handle request, instance not found",
499+
"instance", requestInstanceID,
500+
"request", request,
501+
)
502+
503+
response := &mpi.DataPlaneResponse{
504+
MessageMeta: &mpi.MessageMeta{
505+
MessageId: proto.GenerateMessageID(),
506+
CorrelationId: request.GetMessageMeta().GetCorrelationId(),
507+
Timestamp: timestamppb.Now(),
508+
},
509+
CommandResponse: &mpi.CommandResponse{
510+
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
511+
Message: "Unable to handle request",
512+
Error: "Instance ID not found",
513+
},
514+
InstanceId: requestInstanceID,
515+
}
516+
err := cs.SendDataPlaneResponse(ctx, response)
517+
if err != nil {
518+
slog.ErrorContext(ctx, "Failed to send data plane response", "error", err)
519+
}
520+
}
521+
522+
return instanceFound
523+
}
524+
443525
// Retry callback for establishing the connection between the Management Plane and the Agent.
444526
func (cs *CommandService) connectCallback(
445527
ctx context.Context,

internal/command/command_service_test.go

Lines changed: 147 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ func (*FakeConfigApplySubscribeClient) Send(*mpi.DataPlaneResponse) error {
5656

5757
// nolint: nilnil
5858
func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) {
59-
protos.CreateManagementPlaneRequest()
59+
nginxInstance := protos.GetNginxOssInstance([]string{})
60+
6061
return &mpi.ManagementPlaneRequest{
6162
MessageMeta: &mpi.MessageMeta{
6263
MessageId: "1",
@@ -67,7 +68,7 @@ func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, erro
6768
ConfigApplyRequest: &mpi.ConfigApplyRequest{
6869
Overview: &mpi.FileOverview{
6970
ConfigVersion: &mpi.ConfigVersion{
70-
InstanceId: "12314",
71+
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
7172
Version: "4215432",
7273
},
7374
},
@@ -113,6 +114,11 @@ func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
113114
subscribeChannel,
114115
)
115116

117+
nginxInstance := protos.GetNginxOssInstance([]string{})
118+
commandService.instancesMutex.Lock()
119+
commandService.instances = append(commandService.instances, nginxInstance)
120+
commandService.instancesMutex.Unlock()
121+
116122
defer commandService.CancelSubscription(ctx)
117123

118124
var wg sync.WaitGroup
@@ -389,3 +395,142 @@ func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) {
389395
assert.Equal(t, request3, commandService.configApplyRequestQueue["12314"][0])
390396
wg.Wait()
391397
}
398+
399+
func TestCommandService_isValidRequest(t *testing.T) {
400+
ctx := context.Background()
401+
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
402+
subscribeClient := &FakeSubscribeClient{}
403+
404+
commandService := NewCommandService(
405+
ctx,
406+
commandServiceClient,
407+
types.AgentConfig(),
408+
make(chan *mpi.ManagementPlaneRequest),
409+
)
410+
411+
commandService.subscribeClientMutex.Lock()
412+
commandService.subscribeClient = subscribeClient
413+
commandService.subscribeClientMutex.Unlock()
414+
415+
nginxInstance := protos.GetNginxOssInstance([]string{})
416+
417+
commandService.instancesMutex.Lock()
418+
commandService.instances = append(commandService.instances, nginxInstance)
419+
commandService.instancesMutex.Unlock()
420+
421+
testCases := []struct {
422+
req *mpi.ManagementPlaneRequest
423+
name string
424+
result bool
425+
}{
426+
{
427+
name: "Test 1: valid health request",
428+
req: &mpi.ManagementPlaneRequest{
429+
MessageMeta: protos.CreateMessageMeta(),
430+
Request: &mpi.ManagementPlaneRequest_HealthRequest{HealthRequest: &mpi.HealthRequest{}},
431+
},
432+
result: true,
433+
},
434+
{
435+
name: "Test 2: valid config apply request",
436+
req: &mpi.ManagementPlaneRequest{
437+
MessageMeta: protos.CreateMessageMeta(),
438+
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
439+
ConfigApplyRequest: protos.CreateConfigApplyRequest(&mpi.FileOverview{
440+
Files: make([]*mpi.File, 0),
441+
ConfigVersion: &mpi.ConfigVersion{
442+
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
443+
Version: "e23brbei3u2bru93",
444+
},
445+
}),
446+
},
447+
},
448+
result: true,
449+
},
450+
{
451+
name: "Test 3: invalid config apply request",
452+
req: &mpi.ManagementPlaneRequest{
453+
MessageMeta: protos.CreateMessageMeta(),
454+
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
455+
ConfigApplyRequest: protos.CreateConfigApplyRequest(&mpi.FileOverview{
456+
Files: make([]*mpi.File, 0),
457+
ConfigVersion: &mpi.ConfigVersion{
458+
InstanceId: "unknown-id",
459+
Version: "e23brbei3u2bru93",
460+
},
461+
}),
462+
},
463+
},
464+
result: false,
465+
},
466+
{
467+
name: "Test 4: valid config upload request",
468+
req: &mpi.ManagementPlaneRequest{
469+
MessageMeta: protos.CreateMessageMeta(),
470+
Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{
471+
ConfigUploadRequest: &mpi.ConfigUploadRequest{
472+
Overview: &mpi.FileOverview{
473+
Files: make([]*mpi.File, 0),
474+
ConfigVersion: &mpi.ConfigVersion{
475+
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
476+
Version: "e23brbei3u2bru93",
477+
},
478+
},
479+
},
480+
},
481+
},
482+
result: true,
483+
},
484+
{
485+
name: "Test 5: invalid config upload request",
486+
req: &mpi.ManagementPlaneRequest{
487+
MessageMeta: protos.CreateMessageMeta(),
488+
Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{
489+
ConfigUploadRequest: &mpi.ConfigUploadRequest{
490+
Overview: &mpi.FileOverview{
491+
Files: make([]*mpi.File, 0),
492+
ConfigVersion: &mpi.ConfigVersion{
493+
InstanceId: "unknown-id",
494+
Version: "e23brbei3u2bru93",
495+
},
496+
},
497+
},
498+
},
499+
},
500+
result: false,
501+
},
502+
{
503+
name: "Test 6: valid action request",
504+
req: &mpi.ManagementPlaneRequest{
505+
MessageMeta: protos.CreateMessageMeta(),
506+
Request: &mpi.ManagementPlaneRequest_ActionRequest{
507+
ActionRequest: &mpi.APIActionRequest{
508+
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
509+
Action: nil,
510+
},
511+
},
512+
},
513+
result: true,
514+
},
515+
{
516+
name: "Test 7: invalid action request",
517+
req: &mpi.ManagementPlaneRequest{
518+
MessageMeta: protos.CreateMessageMeta(),
519+
Request: &mpi.ManagementPlaneRequest_ActionRequest{
520+
ActionRequest: &mpi.APIActionRequest{
521+
InstanceId: "unknown-id",
522+
Action: nil,
523+
},
524+
},
525+
},
526+
result: false,
527+
},
528+
}
529+
530+
for _, testCase := range testCases {
531+
t.Run(testCase.name, func(t *testing.T) {
532+
result := commandService.isValidRequest(ctx, testCase.req)
533+
assert.Equal(t, testCase.result, result)
534+
})
535+
}
536+
}

internal/resource/resource_service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) er
163163
var instance *mpi.Instance
164164
operator := r.instanceOperators[instanceID]
165165

166+
if operator == nil {
167+
return fmt.Errorf("instance %s not found", instanceID)
168+
}
169+
166170
for _, resourceInstance := range r.resource.GetInstances() {
167171
if resourceInstance.GetInstanceMeta().GetInstanceId() == instanceID {
168172
instance = resourceInstance

internal/resource/resource_service_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,29 +287,40 @@ func TestResourceService_ApplyConfig(t *testing.T) {
287287
ctx := context.Background()
288288

289289
tests := []struct {
290+
instanceID string
290291
reloadErr error
291292
validateErr error
292293
expected error
293294
name string
294295
}{
295296
{
296297
name: "Test 1: Successful reload",
298+
instanceID: protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(),
297299
reloadErr: nil,
298300
validateErr: nil,
299301
expected: nil,
300302
},
301303
{
302304
name: "Test 2: Failed reload",
305+
instanceID: protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(),
303306
reloadErr: fmt.Errorf("something went wrong"),
304307
validateErr: nil,
305308
expected: fmt.Errorf("failed to reload NGINX %w", fmt.Errorf("something went wrong")),
306309
},
307310
{
308311
name: "Test 3: Failed validate",
312+
instanceID: protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(),
309313
reloadErr: nil,
310314
validateErr: fmt.Errorf("something went wrong"),
311315
expected: fmt.Errorf("failed validating config %w", fmt.Errorf("something went wrong")),
312316
},
317+
{
318+
name: "Test 4: Unknown instance ID",
319+
instanceID: "unknown",
320+
reloadErr: nil,
321+
validateErr: nil,
322+
expected: fmt.Errorf("instance unknown not found"),
323+
},
313324
}
314325

315326
for _, test := range tests {
@@ -330,8 +341,7 @@ func TestResourceService_ApplyConfig(t *testing.T) {
330341
}
331342
resourceService.resource.Instances = instances
332343

333-
reloadError := resourceService.ApplyConfig(ctx,
334-
protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId())
344+
reloadError := resourceService.ApplyConfig(ctx, test.instanceID)
335345
assert.Equal(t, test.expected, reloadError)
336346
})
337347
}

0 commit comments

Comments
 (0)