Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/command.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/common.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/files.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 75 additions & 1 deletion internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ type (
isConnected *atomic.Bool
subscribeCancel context.CancelFunc
subscribeChannel chan *mpi.ManagementPlaneRequest
instances []*mpi.Instance
subscribeMutex sync.Mutex
subscribeClientMutex sync.Mutex
instancesMutex sync.Mutex
}
)

Expand All @@ -60,6 +62,7 @@ func NewCommandService(
agentConfig: agentConfig,
isConnected: isConnected,
subscribeChannel: subscribeChannel,
instances: []*mpi.Instance{},
}

var subscribeCtx context.Context
Expand Down Expand Up @@ -126,6 +129,10 @@ func (cs *CommandService) UpdateDataPlaneStatus(
}
slog.DebugContext(ctx, "UpdateDataPlaneStatus response", "response", response)

cs.instancesMutex.Lock()
defer cs.instancesMutex.Unlock()
cs.instances = resource.GetInstances()

return err
}

Expand Down Expand Up @@ -246,6 +253,10 @@ func (cs *CommandService) CreateConnection(

cs.isConnected.Store(true)

cs.instancesMutex.Lock()
defer cs.instancesMutex.Unlock()
cs.instances = resource.GetInstances()

return response, nil
}

Expand Down Expand Up @@ -335,12 +346,75 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
return recvError
}

cs.subscribeChannel <- request
if cs.isValidRequest(ctx, request) {
cs.subscribeChannel <- request
}

return nil
}
}

func (cs *CommandService) isValidRequest(ctx context.Context, request *mpi.ManagementPlaneRequest) bool {
var validRequest bool

switch request.GetRequest().(type) {
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
requestInstanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
validRequest = cs.checkIfInstanceExists(ctx, request, requestInstanceID)
case *mpi.ManagementPlaneRequest_ConfigUploadRequest:
requestInstanceID := request.GetConfigUploadRequest().GetOverview().GetConfigVersion().GetInstanceId()
validRequest = cs.checkIfInstanceExists(ctx, request, requestInstanceID)
case *mpi.ManagementPlaneRequest_ActionRequest:
requestInstanceID := request.GetActionRequest().GetInstanceId()
validRequest = cs.checkIfInstanceExists(ctx, request, requestInstanceID)
default:
validRequest = true
}

return validRequest
}

func (cs *CommandService) checkIfInstanceExists(
ctx context.Context,
request *mpi.ManagementPlaneRequest,
requestInstanceID string,
) bool {
instanceFound := false

cs.instancesMutex.Lock()
for _, instance := range cs.instances {
if instance.GetInstanceMeta().GetInstanceId() == requestInstanceID {
instanceFound = true
}
}
cs.instancesMutex.Unlock()

if !instanceFound {
slog.WarnContext(
ctx,
"Unable to handle request, instance not found",
"instance", requestInstanceID,
"request", request,
)

response := &mpi.DataPlaneResponse{
MessageMeta: request.GetMessageMeta(),
CommandResponse: &mpi.CommandResponse{
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
Message: "Unable to handle request",
Error: "Instance ID not found",
},
InstanceId: requestInstanceID,
}
err := cs.SendDataPlaneResponse(ctx, response)
if err != nil {
slog.ErrorContext(ctx, "Failed to send data plane response", "error", err)
}
}

return instanceFound
}

// Retry callback for establishing the connection between the Management Plane and the Agent.
func (cs *CommandService) connectCallback(
ctx context.Context,
Expand Down
137 changes: 137 additions & 0 deletions internal/command/command_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,140 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) {

require.NoError(t, err)
}

func TestCommandService_isValidRequest(t *testing.T) {
ctx := context.Background()
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
subscribeClient := &FakeSubscribeClient{}

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
make(chan *mpi.ManagementPlaneRequest),
)

commandService.subscribeClientMutex.Lock()
commandService.subscribeClient = subscribeClient
commandService.subscribeClientMutex.Unlock()

nginxInstance := protos.GetNginxOssInstance([]string{})

commandService.instances = append(commandService.instances, nginxInstance)

testCases := []struct {
req *mpi.ManagementPlaneRequest
name string
result bool
}{
{
name: "Test 1: valid health request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_HealthRequest{HealthRequest: &mpi.HealthRequest{}},
},
result: true,
},
{
name: "Test 2: valid config apply request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: protos.CreateConfigApplyRequest(&mpi.FileOverview{
Files: make([]*mpi.File, 0),
ConfigVersion: &mpi.ConfigVersion{
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
Version: "e23brbei3u2bru93",
},
}),
},
},
result: true,
},
{
name: "Test 3: invalid config apply request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: protos.CreateConfigApplyRequest(&mpi.FileOverview{
Files: make([]*mpi.File, 0),
ConfigVersion: &mpi.ConfigVersion{
InstanceId: "unknown-id",
Version: "e23brbei3u2bru93",
},
}),
},
},
result: false,
},
{
name: "Test 4: valid config upload request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{
ConfigUploadRequest: &mpi.ConfigUploadRequest{
Overview: &mpi.FileOverview{
Files: make([]*mpi.File, 0),
ConfigVersion: &mpi.ConfigVersion{
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
Version: "e23brbei3u2bru93",
},
},
},
},
},
result: true,
},
{
name: "Test 5: invalid config upload request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{
ConfigUploadRequest: &mpi.ConfigUploadRequest{
Overview: &mpi.FileOverview{
Files: make([]*mpi.File, 0),
ConfigVersion: &mpi.ConfigVersion{
InstanceId: "unknown-id",
Version: "e23brbei3u2bru93",
},
},
},
},
},
result: false,
},
{
name: "Test 6: valid action request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ActionRequest{
ActionRequest: &mpi.APIActionRequest{
InstanceId: nginxInstance.GetInstanceMeta().GetInstanceId(),
Action: nil,
},
},
},
result: true,
},
{
name: "Test 7: invalid action request",
req: &mpi.ManagementPlaneRequest{
MessageMeta: protos.CreateMessageMeta(),
Request: &mpi.ManagementPlaneRequest_ActionRequest{
ActionRequest: &mpi.APIActionRequest{
InstanceId: "unknown-id",
Action: nil,
},
},
},
result: false,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
result := commandService.isValidRequest(ctx, testCase.req)
assert.Equal(t, testCase.result, result)
})
}
}
4 changes: 4 additions & 0 deletions internal/resource/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) er
var instance *mpi.Instance
operator := r.instanceOperators[instanceID]

if operator == nil {
return fmt.Errorf("instance %s not found", instanceID)
}

for _, resourceInstance := range r.resource.GetInstances() {
if resourceInstance.GetInstanceMeta().GetInstanceId() == instanceID {
instance = resourceInstance
Expand Down
14 changes: 12 additions & 2 deletions internal/resource/resource_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,40 @@ func TestResourceService_ApplyConfig(t *testing.T) {
ctx := context.Background()

tests := []struct {
instanceID string
reloadErr error
validateErr error
expected error
name string
}{
{
name: "Test 1: Successful reload",
instanceID: protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(),
reloadErr: nil,
validateErr: nil,
expected: nil,
},
{
name: "Test 2: Failed reload",
instanceID: protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(),
reloadErr: fmt.Errorf("something went wrong"),
validateErr: nil,
expected: fmt.Errorf("failed to reload NGINX %w", fmt.Errorf("something went wrong")),
},
{
name: "Test 3: Failed validate",
instanceID: protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(),
reloadErr: nil,
validateErr: fmt.Errorf("something went wrong"),
expected: fmt.Errorf("failed validating config %w", fmt.Errorf("something went wrong")),
},
{
name: "Test 4: Unknown instance ID",
instanceID: "unknown",
reloadErr: nil,
validateErr: nil,
expected: fmt.Errorf("instance unknown not found"),
},
}

for _, test := range tests {
Expand All @@ -283,8 +294,7 @@ func TestResourceService_ApplyConfig(t *testing.T) {
}
resourceService.resource.Instances = instances

reloadError := resourceService.ApplyConfig(ctx,
protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId())
reloadError := resourceService.ApplyConfig(ctx, test.instanceID)
assert.Equal(t, test.expected, reloadError)
})
}
Expand Down
Loading