diff --git a/internal/bus/topics.go b/internal/bus/topics.go index 521c44bfb..33372e465 100644 --- a/internal/bus/topics.go +++ b/internal/bus/topics.go @@ -17,11 +17,7 @@ const ( ConnectionResetTopic = "connection-reset" ConfigApplyRequestTopic = "config-apply-request" WriteConfigSuccessfulTopic = "write-config-successful" - ReloadSuccessfulTopic = "reload-successful" EnableWatchersTopic = "enable-watchers" - ConfigApplyFailedTopic = "config-apply-failed" - ConfigApplyCompleteTopic = "config-apply-complete" - RollbackWriteTopic = "rollback-write" DataPlaneHealthRequestTopic = "data-plane-health-request" DataPlaneHealthResponseTopic = "data-plane-health-response" APIActionRequestTopic = "api-action-request" diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index 5798b0976..2e58461da 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -31,7 +31,7 @@ import ( //counterfeiter:generate . fileOperator //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate -//counterfeiter:generate . fileManagerServiceInterface +//counterfeiter:generate . FileManagerServiceInterface //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate //counterfeiter:generate . fileServiceOperatorInterface @@ -86,7 +86,7 @@ type ( UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) } - fileManagerServiceInterface interface { + FileManagerServiceInterface interface { ConfigApply(ctx context.Context, configApplyRequest *mpi.ConfigApplyRequest) (writeStatus model.WriteStatus, err error) Rollback(ctx context.Context, instanceID string) error diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go deleted file mode 100644 index 58d4ace1a..000000000 --- a/internal/file/file_plugin.go +++ /dev/null @@ -1,484 +0,0 @@ -// Copyright (c) F5, Inc. -// -// This source code is licensed under the Apache License, Version 2.0 license found in the -// LICENSE file in the root directory of this source tree. - -package file - -import ( - "context" - "errors" - "fmt" - "log/slog" - "sync" - - "github.com/nginx/agent/v3/pkg/files" - "github.com/nginx/agent/v3/pkg/id" - - mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" - "github.com/nginx/agent/v3/internal/bus" - "github.com/nginx/agent/v3/internal/config" - "github.com/nginx/agent/v3/internal/grpc" - "github.com/nginx/agent/v3/internal/logger" - "github.com/nginx/agent/v3/internal/model" - "google.golang.org/protobuf/types/known/timestamppb" -) - -var _ bus.Plugin = (*FilePlugin)(nil) - -// The file plugin only writes, deletes and checks hashes of files -// the file plugin does not care about the instance type - -type FilePlugin struct { - manifestLock *sync.RWMutex - agentConfigMutex *sync.Mutex - messagePipe bus.MessagePipeInterface - config *config.Config - conn grpc.GrpcConnectionInterface - fileManagerService fileManagerServiceInterface - serverType model.ServerType -} - -func NewFilePlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface, - serverType model.ServerType, manifestLock *sync.RWMutex, -) *FilePlugin { - return &FilePlugin{ - config: agentConfig, - conn: grpcConnection, - serverType: serverType, - manifestLock: manifestLock, - agentConfigMutex: &sync.Mutex{}, - } -} - -func (fp *FilePlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { - ctx = context.WithValue( - ctx, - logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()), - ) - slog.DebugContext(ctx, "Starting file plugin") - - fp.messagePipe = messagePipe - fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config, fp.manifestLock) - - return nil -} - -func (fp *FilePlugin) Close(ctx context.Context) error { - ctx = context.WithValue( - ctx, - logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()), - ) - slog.InfoContext(ctx, "Closing file plugin") - - return fp.conn.Close(ctx) -} - -func (fp *FilePlugin) Info() *bus.Info { - name := "file" - if fp.serverType.String() == model.Auxiliary.String() { - name = "auxiliary-file" - } - - return &bus.Info{ - Name: name, - } -} - -//nolint:revive,cyclop // Cyclomatic complexity is acceptable for this function -func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) { - ctxWithMetadata := fp.config.NewContextWithLabels(ctx) - - if logger.ServerType(ctx) == "" { - ctxWithMetadata = context.WithValue( - ctxWithMetadata, - logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()), - ) - } - - if logger.ServerType(ctxWithMetadata) == fp.serverType.String() { - switch msg.Topic { - case bus.ConnectionResetTopic: - fp.handleConnectionReset(ctxWithMetadata, msg) - case bus.ConnectionCreatedTopic: - slog.DebugContext(ctxWithMetadata, "File plugin received connection created message") - fp.fileManagerService.SetIsConnected(true) - case bus.NginxConfigUpdateTopic: - fp.handleNginxConfigUpdate(ctxWithMetadata, msg) - case bus.ConfigUploadRequestTopic: - fp.handleConfigUploadRequest(ctxWithMetadata, msg) - case bus.ConfigApplyRequestTopic: - fp.handleConfigApplyRequest(ctxWithMetadata, msg) - case bus.ConfigApplyCompleteTopic: - fp.handleConfigApplyComplete(ctxWithMetadata, msg) - case bus.ReloadSuccessfulTopic: - fp.handleReloadSuccess(ctxWithMetadata, msg) - case bus.ConfigApplyFailedTopic: - fp.handleConfigApplyFailedRequest(ctxWithMetadata, msg) - case bus.AgentConfigUpdateTopic: - fp.handleAgentConfigUpdate(ctxWithMetadata, msg) - default: - slog.DebugContext(ctxWithMetadata, "File plugin received unknown topic", "topic", msg.Topic) - } - } -} - -func (fp *FilePlugin) Subscriptions() []string { - if fp.serverType == model.Auxiliary { - return []string{ - bus.ConnectionResetTopic, - bus.ConnectionCreatedTopic, - bus.NginxConfigUpdateTopic, - bus.ConfigUploadRequestTopic, - } - } - - return []string{ - bus.ConnectionResetTopic, - bus.ConnectionCreatedTopic, - bus.NginxConfigUpdateTopic, - bus.ConfigUploadRequestTopic, - bus.ConfigApplyRequestTopic, - bus.ConfigApplyFailedTopic, - bus.ReloadSuccessfulTopic, - bus.ConfigApplyCompleteTopic, - bus.AgentConfigUpdateTopic, - } -} - -func (fp *FilePlugin) Reconfigure(ctx context.Context, agentConfig *config.Config) error { - slog.DebugContext(ctx, "File plugin is reconfiguring to update agent configuration") - - fp.agentConfigMutex.Lock() - defer fp.agentConfigMutex.Unlock() - - fp.config = agentConfig - - return nil -} - -func (fp *FilePlugin) enableWatchers(ctx context.Context, - configContext *model.NginxConfigContext, - instanceID string, -) { - enableWatcher := &model.EnableWatchers{ - ConfigContext: configContext, - InstanceID: instanceID, - } - - fp.messagePipe.Process(ctx, &bus.Message{ - Data: enableWatcher, - Topic: bus.EnableWatchersTopic, - }) -} - -func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received connection reset message") - if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok { - var reconnect bool - err := fp.conn.Close(ctx) - if err != nil { - slog.ErrorContext(ctx, "File plugin: unable to close connection", "error", err) - } - fp.conn = newConnection - - reconnect = fp.fileManagerService.IsConnected() - fp.fileManagerService.ResetClient(ctx, fp.conn.FileServiceClient()) - fp.fileManagerService.SetIsConnected(reconnect) - - slog.DebugContext(ctx, "File manager service client reset successfully") - } -} - -func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received config apply complete message") - response, ok := msg.Data.(*mpi.DataPlaneResponse) - - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", "payload", msg.Data) - return - } - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) - fp.fileManagerService.ClearCache() - fp.enableWatchers(ctx, &model.NginxConfigContext{}, response.GetInstanceId()) -} - -func (fp *FilePlugin) handleReloadSuccess(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received reload success message", "data", msg.Data) - - successMessage, ok := msg.Data.(*model.ReloadSuccess) - - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ReloadSuccess", "payload", msg.Data) - return - } - - fp.fileManagerService.ClearCache() - fp.enableWatchers(ctx, successMessage.ConfigContext, successMessage.DataPlaneResponse.GetInstanceId()) - - if successMessage.ConfigContext.Files != nil { - slog.DebugContext(ctx, "Changes made during config apply, update files on disk") - updateError := fp.fileManagerService.UpdateCurrentFilesOnDisk( - ctx, - files.ConvertToMapOfFiles(successMessage.ConfigContext.Files), - true, - ) - if updateError != nil { - slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError) - } - } - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: successMessage.DataPlaneResponse}) -} - -func (fp *FilePlugin) handleConfigApplyFailedRequest(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received config failed message") - - data, ok := msg.Data.(*model.ConfigApplyMessage) - if data.InstanceID == "" || !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", - "payload", msg.Data) - fp.fileManagerService.ClearCache() - - return - } - - err := fp.fileManagerService.Rollback(ctx, data.InstanceID) - if err != nil { - rollbackResponse := fp.createDataPlaneResponse( - data.CorrelationID, - mpi.CommandResponse_COMMAND_STATUS_ERROR, - "Rollback failed", - data.InstanceID, - err.Error(), - ) - - applyResponse := fp.createDataPlaneResponse( - data.CorrelationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed, rollback failed", - data.InstanceID, - data.Error.Error(), - ) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse}) - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) - - return - } - - // Send RollbackWriteTopic with Correlation and Instance ID for use by resource plugin - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.RollbackWriteTopic, Data: data}) -} - -func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received config apply request message") - var response *mpi.DataPlaneResponse - correlationID := logger.CorrelationID(ctx) - - managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", - "payload", msg.Data) - - return - } - - request, requestOk := managementPlaneRequest.GetRequest().(*mpi.ManagementPlaneRequest_ConfigApplyRequest) - if !requestOk { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest_ConfigApplyRequest", - "payload", msg.Data) - - return - } - - configApplyRequest := request.ConfigApplyRequest - instanceID := configApplyRequest.GetOverview().GetConfigVersion().GetInstanceId() - - writeStatus, err := fp.fileManagerService.ConfigApply(ctx, configApplyRequest) - - switch writeStatus { - case model.NoChange: - slog.DebugContext(ctx, "No changes required for config apply request") - dpResponse := fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_OK, - "Config apply successful, no files to change", - instanceID, - "", - ) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: dpResponse}) - - return - case model.Error: - slog.ErrorContext( - ctx, - "Failed to apply config changes", - "instance_id", instanceID, - "error", err, - ) - response = fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed", - instanceID, - err.Error(), - ) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: response}) - - return - case model.RollbackRequired: - slog.ErrorContext( - ctx, - "Failed to apply config changes, rolling back", - "instance_id", instanceID, - "error", err, - ) - - response = fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_ERROR, - "Config apply failed, rolling back config", - instanceID, - err.Error(), - ) - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) - - rollbackErr := fp.fileManagerService.Rollback( - ctx, - instanceID, - ) - if rollbackErr != nil { - // include both the original apply error and the rollback error so the management plane - // receives actionable information about what failed during apply and what failed during rollback - applyErr := fmt.Errorf("config apply error: %w", err) - rbErr := fmt.Errorf("rollback error: %w", rollbackErr) - combinedErr := errors.Join(applyErr, rbErr) - - rollbackResponse := fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed, rollback failed", - instanceID, - combinedErr.Error(), - ) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: rollbackResponse}) - - return - } - - response = fp.createDataPlaneResponse( - correlationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed, rollback successful", - instanceID, - err.Error(), - ) - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: response}) - - return - case model.OK: - slog.DebugContext(ctx, "Changes required for config apply request") - // Send WriteConfigSuccessfulTopic with Correlation and Instance ID for use by resource plugin - data := &model.ConfigApplyMessage{ - CorrelationID: correlationID, - InstanceID: instanceID, - } - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.WriteConfigSuccessfulTopic, Data: data}) - } -} - -func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received nginx config update message") - nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data) - - return - } - - fp.fileManagerService.ConfigUpdate(ctx, nginxConfigContext) -} - -func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received config upload request message") - managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) - if !ok { - slog.ErrorContext( - ctx, - "Unable to cast message payload to *mpi.ManagementPlaneRequest", - "payload", msg.Data, - ) - - return - } - - configUploadRequest := managementPlaneRequest.GetConfigUploadRequest() - - correlationID := logger.CorrelationID(ctx) - - updatingFilesError := fp.fileManagerService.ConfigUpload(ctx, configUploadRequest) - - response := &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: correlationID, - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Successfully updated all files", - }, - InstanceId: configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), - RequestType: mpi.DataPlaneResponse_CONFIG_UPLOAD_REQUEST, - } - - if updatingFilesError != nil { - response.CommandResponse.Status = mpi.CommandResponse_COMMAND_STATUS_FAILURE - response.CommandResponse.Message = "Failed to update all files" - response.CommandResponse.Error = updatingFilesError.Error() - } - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) -} - -func (fp *FilePlugin) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received agent config update message") - - fp.agentConfigMutex.Lock() - defer fp.agentConfigMutex.Unlock() - - agentConfig, ok := msg.Data.(*config.Config) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *config.Config", "payload", msg.Data) - return - } - - fp.config = agentConfig -} - -func (fp *FilePlugin) createDataPlaneResponse( - correlationID string, - status mpi.CommandResponse_CommandStatus, - message, instanceID, err string, -) *mpi.DataPlaneResponse { - return &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: correlationID, - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: status, - Message: message, - Error: err, - }, - InstanceId: instanceID, - RequestType: mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, - } -} diff --git a/internal/file/file_plugin_test.go b/internal/file/file_plugin_test.go deleted file mode 100644 index 74aa6024a..000000000 --- a/internal/file/file_plugin_test.go +++ /dev/null @@ -1,528 +0,0 @@ -// Copyright (c) F5, Inc. -// -// This source code is licensed under the Apache License, Version 2.0 license found in the -// LICENSE file in the root directory of this source tree. - -package file - -import ( - "context" - "errors" - "os" - "sync" - "testing" - "time" - - "github.com/nginx/agent/v3/internal/bus/busfakes" - "google.golang.org/protobuf/types/known/timestamppb" - - mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" - "github.com/nginx/agent/v3/api/grpc/mpi/v1/v1fakes" - "github.com/nginx/agent/v3/internal/bus" - "github.com/nginx/agent/v3/internal/file/filefakes" - "github.com/nginx/agent/v3/internal/grpc/grpcfakes" - "github.com/nginx/agent/v3/internal/model" - "github.com/nginx/agent/v3/pkg/files" - "github.com/nginx/agent/v3/pkg/id" - "github.com/nginx/agent/v3/test/helpers" - "github.com/nginx/agent/v3/test/protos" - "github.com/nginx/agent/v3/test/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestFilePlugin_Info(t *testing.T) { - filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, - model.Command, &sync.RWMutex{}) - assert.Equal(t, "file", filePlugin.Info().Name) -} - -func TestFilePlugin_Close(t *testing.T) { - ctx := context.Background() - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) - filePlugin.Close(ctx) - - assert.Equal(t, 1, fakeGrpcConnection.CloseCallCount()) -} - -func TestFilePlugin_Subscriptions(t *testing.T) { - filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, - model.Command, &sync.RWMutex{}) - assert.Equal( - t, - []string{ - bus.ConnectionResetTopic, - bus.ConnectionCreatedTopic, - bus.NginxConfigUpdateTopic, - bus.ConfigUploadRequestTopic, - bus.ConfigApplyRequestTopic, - bus.ConfigApplyFailedTopic, - bus.ReloadSuccessfulTopic, - bus.ConfigApplyCompleteTopic, - bus.AgentConfigUpdateTopic, - }, - filePlugin.Subscriptions(), - ) - - readOnlyFilePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, - model.Auxiliary, &sync.RWMutex{}) - assert.Equal(t, []string{ - bus.ConnectionResetTopic, - bus.ConnectionCreatedTopic, - bus.NginxConfigUpdateTopic, - bus.ConfigUploadRequestTopic, - }, readOnlyFilePlugin.Subscriptions()) -} - -func TestFilePlugin_Process_NginxConfigUpdateTopic(t *testing.T) { - ctx := context.Background() - - fileMeta := protos.FileMeta("/etc/nginx/nginx/conf", "") - - message := &model.NginxConfigContext{ - Files: []*mpi.File{ - { - FileMeta: fileMeta, - }, - }, - } - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeFileServiceClient.UpdateOverviewReturns(&mpi.UpdateOverviewResponse{ - Overview: nil, - }, nil) - - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := busfakes.NewFakeMessagePipe() - - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConnectionCreatedTopic}) - filePlugin.Process(ctx, &bus.Message{Topic: bus.NginxConfigUpdateTopic, Data: message}) - - assert.Eventually( - t, - func() bool { return fakeFileServiceClient.UpdateOverviewCallCount() == 1 }, - 2*time.Second, - 10*time.Millisecond, - ) -} - -func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) { - ctx := context.Background() - tempDir := t.TempDir() - - filePath := tempDir + "/nginx.conf" - fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}") - fileHash := files.GenerateHash(fileContent) - - message := &mpi.ManagementPlaneRequest{ - Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ - ConfigApplyRequest: protos.CreateConfigApplyRequest(protos.FileOverview(filePath, fileHash)), - }, - } - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - agentConfig := types.AgentConfig() - agentConfig.AllowedDirectories = []string{tempDir} - - tests := []struct { - message *mpi.ManagementPlaneRequest - configApplyReturnsErr error - name string - configApplyStatus model.WriteStatus - }{ - { - name: "Test 1 - Success", - configApplyReturnsErr: nil, - configApplyStatus: model.OK, - message: message, - }, - { - name: "Test 2 - Fail, Rollback", - configApplyReturnsErr: errors.New("something went wrong"), - configApplyStatus: model.RollbackRequired, - message: message, - }, - { - name: "Test 3 - Fail, No Rollback", - configApplyReturnsErr: errors.New("something went wrong"), - configApplyStatus: model.Error, - message: message, - }, - { - name: "Test 4 - Fail to cast payload", - configApplyReturnsErr: errors.New("something went wrong"), - configApplyStatus: model.Error, - message: nil, - }, - { - name: "Test 5 - No changes needed", - configApplyReturnsErr: nil, - configApplyStatus: model.NoChange, - message: message, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - fakeFileManagerService := &filefakes.FakeFileManagerServiceInterface{} - fakeFileManagerService.ConfigApplyReturns(test.configApplyStatus, test.configApplyReturnsErr) - messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) - err := filePlugin.Init(ctx, messagePipe) - filePlugin.fileManagerService = fakeFileManagerService - require.NoError(t, err) - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplyRequestTopic, Data: test.message}) - - messages := messagePipe.Messages() - - switch { - case test.configApplyStatus == model.OK: - assert.Equal(t, bus.WriteConfigSuccessfulTopic, messages[0].Topic) - assert.Len(t, messages, 1) - - _, ok := messages[0].Data.(*model.ConfigApplyMessage) - assert.True(t, ok) - case test.configApplyStatus == model.RollbackRequired: - assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) - assert.Len(t, messages, 2) - dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_ERROR, - dataPlaneResponse.GetCommandResponse().GetStatus(), - ) - assert.Equal(t, "Config apply failed, rolling back config", - dataPlaneResponse.GetCommandResponse().GetMessage()) - assert.Equal(t, test.configApplyReturnsErr.Error(), dataPlaneResponse.GetCommandResponse().GetError()) - dataPlaneResponse, ok = messages[1].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, "Config apply failed, rollback successful", - dataPlaneResponse.GetCommandResponse().GetMessage()) - assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - dataPlaneResponse.GetCommandResponse().GetStatus()) - case test.configApplyStatus == model.NoChange: - assert.Len(t, messages, 1) - - response, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, bus.ConfigApplyCompleteTopic, messages[0].Topic) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_OK, - response.GetCommandResponse().GetStatus(), - ) - case test.message == nil: - assert.Empty(t, messages) - default: - assert.Len(t, messages, 1) - dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - dataPlaneResponse.GetCommandResponse().GetStatus(), - ) - assert.Equal(t, "Config apply failed", dataPlaneResponse.GetCommandResponse().GetMessage()) - assert.Equal(t, test.configApplyReturnsErr.Error(), dataPlaneResponse.GetCommandResponse().GetError()) - } - }) - } -} - -func TestFilePlugin_Process_ConfigUploadRequestTopic(t *testing.T) { - ctx := context.Background() - - tempDir := os.TempDir() - testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf") - defer helpers.RemoveFileWithErrorCheck(t, testFile.Name()) - fileMeta := protos.FileMeta(testFile.Name(), "") - - message := &mpi.ManagementPlaneRequest{ - Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{ - ConfigUploadRequest: &mpi.ConfigUploadRequest{ - Overview: &mpi.FileOverview{ - Files: []*mpi.File{ - { - FileMeta: fileMeta, - }, - { - FileMeta: fileMeta, - }, - }, - ConfigVersion: &mpi.ConfigVersion{ - InstanceId: "123", - Version: "f33ref3d32d3c32d3a", - }, - }, - }, - }, - } - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := busfakes.NewFakeMessagePipe() - - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConnectionCreatedTopic}) - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigUploadRequestTopic, Data: message}) - - assert.Eventually( - t, - func() bool { return fakeFileServiceClient.UpdateFileCallCount() == 2 }, - 2*time.Second, - 10*time.Millisecond, - ) - - messages := messagePipe.Messages() - assert.Len(t, messages, 1) - assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) - - dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_OK, - dataPlaneResponse.GetCommandResponse().GetStatus(), - ) -} - -func TestFilePlugin_Process_ConfigUploadRequestTopic_Failure(t *testing.T) { - ctx := context.Background() - - fileMeta := protos.FileMeta("/unknown/file.conf", "") - - message := &mpi.ManagementPlaneRequest{ - Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{ - ConfigUploadRequest: &mpi.ConfigUploadRequest{ - Overview: &mpi.FileOverview{ - Files: []*mpi.File{ - { - FileMeta: fileMeta, - }, - { - FileMeta: fileMeta, - }, - }, - ConfigVersion: protos.CreateConfigVersion(), - }, - }, - }, - } - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := busfakes.NewFakeMessagePipe() - - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConnectionCreatedTopic}) - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigUploadRequestTopic, Data: message}) - - assert.Eventually( - t, - func() bool { return len(messagePipe.Messages()) == 1 }, - 2*time.Second, - 10*time.Millisecond, - ) - - assert.Equal(t, 0, fakeFileServiceClient.UpdateFileCallCount()) - - messages := messagePipe.Messages() - assert.Len(t, messages, 1) - - assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) - - dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - dataPlaneResponse.GetCommandResponse().GetStatus(), - ) -} - -func TestFilePlugin_Process_ConfigApplyFailedTopic(t *testing.T) { - ctx := context.Background() - instanceID := protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId() - - tests := []struct { - name string - rollbackReturns error - instanceID string - }{ - { - name: "Test 1 - Rollback Success", - rollbackReturns: nil, - instanceID: instanceID, - }, - { - name: "Test 2 - Rollback Fail", - rollbackReturns: errors.New("something went wrong"), - instanceID: instanceID, - }, - - { - name: "Test 3 - Fail to cast payload", - rollbackReturns: errors.New("something went wrong"), - instanceID: "", - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - mockFileManager := &filefakes.FakeFileManagerServiceInterface{} - mockFileManager.RollbackReturns(test.rollbackReturns) - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - - messagePipe := busfakes.NewFakeMessagePipe() - agentConfig := types.AgentConfig() - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) - - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - filePlugin.fileManagerService = mockFileManager - - data := &model.ConfigApplyMessage{ - CorrelationID: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - InstanceID: test.instanceID, - Error: errors.New("something went wrong with config apply"), - } - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplyFailedTopic, Data: data}) - - messages := messagePipe.Messages() - - switch { - case test.rollbackReturns == nil: - assert.Equal(t, bus.RollbackWriteTopic, messages[0].Topic) - assert.Len(t, messages, 1) - - case test.instanceID == "": - assert.Empty(t, messages) - default: - rollbackMessage, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, "Rollback failed", rollbackMessage.GetCommandResponse().GetMessage()) - assert.Equal(t, test.rollbackReturns.Error(), rollbackMessage.GetCommandResponse().GetError()) - applyMessage, ok := messages[1].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, "Config apply failed, rollback failed", - applyMessage.GetCommandResponse().GetMessage()) - assert.Equal(t, data.Error.Error(), applyMessage.GetCommandResponse().GetError()) - assert.Len(t, messages, 2) - } - }) - } -} - -func TestFilePlugin_Process_ConfigApplyReloadSuccessTopic(t *testing.T) { - ctx := context.Background() - instance := protos.NginxOssInstance([]string{}) - mockFileManager := &filefakes.FakeFileManagerServiceInterface{} - - messagePipe := busfakes.NewFakeMessagePipe() - agentConfig := types.AgentConfig() - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) - - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - filePlugin.fileManagerService = mockFileManager - - expectedResponse := &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Config apply successful", - Error: "", - }, - InstanceId: instance.GetInstanceMeta().GetInstanceId(), - } - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: &model.ReloadSuccess{ - ConfigContext: &model.NginxConfigContext{}, - DataPlaneResponse: expectedResponse, - }}) - - messages := messagePipe.Messages() - - watchers, ok := messages[0].Data.(*model.EnableWatchers) - assert.True(t, ok) - assert.Equal(t, bus.EnableWatchersTopic, messages[0].Topic) - assert.Equal(t, &model.NginxConfigContext{}, watchers.ConfigContext) - assert.Equal(t, instance.GetInstanceMeta().GetInstanceId(), watchers.InstanceID) - - response, ok := messages[1].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - assert.Equal(t, bus.DataPlaneResponseTopic, messages[1].Topic) - - assert.Equal(t, expectedResponse.GetCommandResponse().GetStatus(), response.GetCommandResponse().GetStatus()) - assert.Equal(t, expectedResponse.GetCommandResponse().GetMessage(), response.GetCommandResponse().GetMessage()) - assert.Equal(t, expectedResponse.GetCommandResponse().GetError(), response.GetCommandResponse().GetError()) - assert.Equal(t, expectedResponse.GetMessageMeta().GetCorrelationId(), response.GetMessageMeta().GetCorrelationId()) - - assert.Equal(t, expectedResponse.GetInstanceId(), response.GetInstanceId()) -} - -func TestFilePlugin_Process_ConfigApplyCompleteTopic(t *testing.T) { - ctx := context.Background() - instance := protos.NginxOssInstance([]string{}) - mockFileManager := &filefakes.FakeFileManagerServiceInterface{} - - messagePipe := busfakes.NewFakeMessagePipe() - agentConfig := types.AgentConfig() - fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) - - err := filePlugin.Init(ctx, messagePipe) - require.NoError(t, err) - filePlugin.fileManagerService = mockFileManager - expectedResponse := &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Config apply successful", - Error: "", - }, - InstanceId: instance.GetInstanceMeta().GetInstanceId(), - } - - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: expectedResponse}) - - messages := messagePipe.Messages() - response, ok := messages[0].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) - - assert.Equal(t, expectedResponse.GetCommandResponse().GetStatus(), response.GetCommandResponse().GetStatus()) - assert.Equal(t, expectedResponse.GetCommandResponse().GetMessage(), response.GetCommandResponse().GetMessage()) - assert.Equal(t, expectedResponse.GetCommandResponse().GetError(), response.GetCommandResponse().GetError()) - assert.Equal(t, expectedResponse.GetMessageMeta().GetCorrelationId(), response.GetMessageMeta().GetCorrelationId()) - - assert.Equal(t, expectedResponse.GetInstanceId(), response.GetInstanceId()) -} diff --git a/internal/file/filefakes/fake_file_manager_service_interface.go b/internal/file/filefakes/fake_file_manager_service_interface.go index f2af670fe..10016297e 100644 --- a/internal/file/filefakes/fake_file_manager_service_interface.go +++ b/internal/file/filefakes/fake_file_manager_service_interface.go @@ -6,6 +6,7 @@ import ( "sync" v1 "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/file" "github.com/nginx/agent/v3/internal/model" ) @@ -645,3 +646,5 @@ func (fake *FakeFileManagerServiceInterface) recordInvocation(key string, args [ } fake.invocations[key] = append(fake.invocations[key], args) } + +var _ file.FileManagerServiceInterface = new(FakeFileManagerServiceInterface) diff --git a/internal/nginx/nginx_plugin.go b/internal/nginx/nginx_plugin.go index afe12f537..e4db22791 100644 --- a/internal/nginx/nginx_plugin.go +++ b/internal/nginx/nginx_plugin.go @@ -8,27 +8,37 @@ package nginx import ( "context" "errors" + "fmt" "log/slog" "sync" mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/bus" "github.com/nginx/agent/v3/internal/config" response "github.com/nginx/agent/v3/internal/datasource/proto" + "github.com/nginx/agent/v3/internal/file" + "github.com/nginx/agent/v3/internal/grpc" "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/internal/model" - - "github.com/nginx/agent/v3/internal/bus" + "github.com/nginx/agent/v3/pkg/files" + "github.com/nginx/agent/v3/pkg/id" + "google.golang.org/protobuf/types/known/timestamppb" ) // The Nginx plugin listens for a writeConfigSuccessfulTopic from the file plugin after the config apply -// files have been written. The Nginx plugin then, validates the config, reloads the instance and monitors the logs. +// files have been written. The Nginx plugin then, validates the config, +// reloads the instance and monitors the logs. // This is done in the Nginx plugin to make the file plugin usable for every type of instance. -type Nginx struct { - messagePipe bus.MessagePipeInterface - nginxService nginxServiceInterface - agentConfig *config.Config - agentConfigMutex *sync.Mutex +type NginxPlugin struct { + messagePipe bus.MessagePipeInterface + nginxService nginxServiceInterface + agentConfig *config.Config + agentConfigMutex *sync.Mutex + manifestLock *sync.RWMutex + conn grpc.GrpcConnectionInterface + fileManagerService file.FileManagerServiceInterface + serverType model.ServerType } type errResponse struct { @@ -43,38 +53,65 @@ type plusAPIErr struct { Href string `json:"href"` } -var _ bus.Plugin = (*Nginx)(nil) +var _ bus.Plugin = (*NginxPlugin)(nil) -func NewNginx(agentConfig *config.Config) *Nginx { - return &Nginx{ +func NewNginx(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface, + serverType model.ServerType, manifestLock *sync.RWMutex, +) *NginxPlugin { + return &NginxPlugin{ agentConfig: agentConfig, + conn: grpcConnection, + serverType: serverType, + manifestLock: manifestLock, agentConfigMutex: &sync.Mutex{}, } } -func (n *Nginx) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { +func (n *NginxPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { + ctx = context.WithValue( + ctx, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, n.serverType.String()), + ) slog.DebugContext(ctx, "Starting nginx plugin") n.messagePipe = messagePipe n.nginxService = NewNginxService(ctx, n.agentConfig) + n.fileManagerService = file.NewFileManagerService(n.conn.FileServiceClient(), n.agentConfig, n.manifestLock) return nil } -func (*Nginx) Close(ctx context.Context) error { +func (n *NginxPlugin) Close(ctx context.Context) error { + ctx = context.WithValue( + ctx, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, n.serverType.String()), + ) slog.InfoContext(ctx, "Closing nginx plugin") - return nil + + return n.conn.Close(ctx) } -func (*Nginx) Info() *bus.Info { +func (n *NginxPlugin) Info() *bus.Info { + name := "nginx" + if n.serverType.String() == model.Auxiliary.String() { + name = "auxiliary-nginx" + } + return &bus.Info{ - Name: "nginx", + Name: name, } } -// cyclomatic complexity 11 max is 10 +//nolint:revive,cyclop // cyclomatic complexity 16 max is 12 +func (n *NginxPlugin) Process(ctx context.Context, msg *bus.Message) { + ctxWithMetadata := n.agentConfig.NewContextWithLabels(ctx) + if logger.ServerType(ctx) == "" { + ctxWithMetadata = context.WithValue( + ctxWithMetadata, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, n.serverType.String()), + ) + } -func (n *Nginx) Process(ctx context.Context, msg *bus.Message) { switch msg.Topic { case bus.ResourceUpdateTopic: resourceUpdate, ok := msg.Data.(*mpi.Resource) @@ -89,30 +126,52 @@ func (n *Nginx) Process(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "Nginx plugin received update resource message") return - case bus.WriteConfigSuccessfulTopic: - n.handleWriteConfigSuccessful(ctx, msg) - case bus.RollbackWriteTopic: - n.handleRollbackWrite(ctx, msg) case bus.APIActionRequestTopic: n.handleAPIActionRequest(ctx, msg) - case bus.AgentConfigUpdateTopic: - n.handleAgentConfigUpdate(ctx, msg) + case bus.ConnectionResetTopic: + if logger.ServerType(ctxWithMetadata) == n.serverType.String() { + n.handleConnectionReset(ctxWithMetadata, msg) + } + case bus.ConnectionCreatedTopic: + if logger.ServerType(ctxWithMetadata) == n.serverType.String() { + slog.DebugContext(ctxWithMetadata, "Nginx plugin received connection created message") + n.fileManagerService.SetIsConnected(true) + } + case bus.NginxConfigUpdateTopic: + if logger.ServerType(ctxWithMetadata) == n.serverType.String() { + n.handleNginxConfigUpdate(ctxWithMetadata, msg) + } + case bus.ConfigUploadRequestTopic: + if logger.ServerType(ctxWithMetadata) == n.serverType.String() { + n.handleConfigUploadRequest(ctxWithMetadata, msg) + } + case bus.ConfigApplyRequestTopic: + if logger.ServerType(ctxWithMetadata) == n.serverType.String() { + n.handleConfigApplyRequest(ctxWithMetadata, msg) + } default: - slog.DebugContext(ctx, "Unknown topic", "topic", msg.Topic) + slog.DebugContext(ctx, "NGINX plugin received message with unknown topic", "topic", msg.Topic) } } -func (*Nginx) Subscriptions() []string { - return []string{ - bus.ResourceUpdateTopic, - bus.WriteConfigSuccessfulTopic, - bus.RollbackWriteTopic, +func (n *NginxPlugin) Subscriptions() []string { + subscriptions := []string{ bus.APIActionRequestTopic, - bus.AgentConfigUpdateTopic, + bus.ConnectionResetTopic, + bus.ConnectionCreatedTopic, + bus.NginxConfigUpdateTopic, + bus.ConfigUploadRequestTopic, + bus.ResourceUpdateTopic, + } + + if n.serverType == model.Command { + subscriptions = append(subscriptions, bus.ConfigApplyRequestTopic) } + + return subscriptions } -func (n *Nginx) Reconfigure(ctx context.Context, agentConfig *config.Config) error { +func (n *NginxPlugin) Reconfigure(ctx context.Context, agentConfig *config.Config) error { slog.DebugContext(ctx, "Nginx plugin is reconfiguring to update agent configuration") n.agentConfigMutex.Lock() @@ -123,7 +182,81 @@ func (n *Nginx) Reconfigure(ctx context.Context, agentConfig *config.Config) err return nil } -func (n *Nginx) handleAPIActionRequest(ctx context.Context, msg *bus.Message) { +func (n *NginxPlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Nginx plugin received config upload request message") + managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) + if !ok { + slog.ErrorContext( + ctx, + "Unable to cast message payload to *mpi.ManagementPlaneRequest", + "payload", msg.Data, + ) + + return + } + + configUploadRequest := managementPlaneRequest.GetConfigUploadRequest() + + correlationID := logger.CorrelationID(ctx) + + updatingFilesError := n.fileManagerService.ConfigUpload(ctx, configUploadRequest) + + dataplaneResponse := &mpi.DataPlaneResponse{ + MessageMeta: &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: correlationID, + Timestamp: timestamppb.Now(), + }, + CommandResponse: &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Successfully updated all files", + }, + InstanceId: configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), + RequestType: mpi.DataPlaneResponse_CONFIG_UPLOAD_REQUEST, + } + + if updatingFilesError != nil { + dataplaneResponse.CommandResponse.Status = mpi.CommandResponse_COMMAND_STATUS_FAILURE + dataplaneResponse.CommandResponse.Message = "Failed to update all files" + dataplaneResponse.CommandResponse.Error = updatingFilesError.Error() + } + + n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dataplaneResponse}) +} + +func (n *NginxPlugin) handleConnectionReset(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Nginx plugin received connection reset message") + + if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok { + err := n.conn.Close(ctx) + if err != nil { + slog.ErrorContext(ctx, "Nginx plugin: unable to close connection", "error", err) + } + + n.conn = newConnection + + reconnect := n.fileManagerService.IsConnected() + n.fileManagerService.ResetClient(ctx, n.conn.FileServiceClient()) + n.fileManagerService.SetIsConnected(reconnect) + + slog.DebugContext(ctx, "Nginx plugin connection reset successfully") + } +} + +func (n *NginxPlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Nginx plugin received config update message") + nginxConfigContext, ok := msg.Data.(*model.NginxConfigContext) + + if !ok { + slog.ErrorContext(ctx, "Unable to cast message payload to *model.NginxConfigContext", "payload", msg.Data) + + return + } + + n.fileManagerService.ConfigUpdate(ctx, nginxConfigContext) +} + +func (n *NginxPlugin) handleAPIActionRequest(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "Nginx plugin received api action request message") managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) @@ -150,7 +283,9 @@ func (n *Nginx) handleAPIActionRequest(ctx context.Context, msg *bus.Message) { } } -func (n *Nginx) handleNginxPlusActionRequest(ctx context.Context, action *mpi.NGINXPlusAction, instanceID string) { +func (n *NginxPlugin) handleNginxPlusActionRequest(ctx context.Context, + action *mpi.NGINXPlusAction, instanceID string, +) { correlationID := logger.CorrelationID(ctx) instance := n.nginxService.Instance(instanceID) apiAction := APIAction{ @@ -218,125 +353,286 @@ func (n *Nginx) handleNginxPlusActionRequest(ctx context.Context, action *mpi.NG } } -func (n *Nginx) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "Nginx plugin received write config successful message") - data, ok := msg.Data.(*model.ConfigApplyMessage) +func (n *NginxPlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Message) { + slog.DebugContext(ctx, "Nginx plugin received config apply request message") + + var dataplaneResponse *mpi.DataPlaneResponse + correlationID := logger.CorrelationID(ctx) + + managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) + if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", "payload", msg.Data) + slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", "payload", msg.Data) + return + } + + request, requestOk := managementPlaneRequest.GetRequest().(*mpi.ManagementPlaneRequest_ConfigApplyRequest) + if !requestOk { + slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest_ConfigApplyRequest", + "payload", msg.Data) return } - configContext, err := n.nginxService.ApplyConfig(ctx, data.InstanceID) - if err != nil { - data.Error = err + configApplyRequest := request.ConfigApplyRequest + instanceID := configApplyRequest.GetOverview().GetConfigVersion().GetInstanceId() + + writeStatus, err := n.fileManagerService.ConfigApply(ctx, configApplyRequest) + + switch writeStatus { + case model.NoChange: + slog.DebugContext(ctx, "No changes required for config apply request") + dataplaneResponse = response.CreateDataPlaneResponse(correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Config apply successful, no files to change", + Error: "", + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + instanceID, + ) + n.completeConfigApply(ctx, &model.NginxConfigContext{}, dataplaneResponse) + case model.Error: slog.ErrorContext( ctx, - "Errors found during config apply, sending error status and rolling back configuration updates", + "Failed to apply config changes", + "instance_id", instanceID, + "error", err, + ) + dataplaneResponse = response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "Config apply failed", + Error: err.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + instanceID, + ) + + n.completeConfigApply(ctx, &model.NginxConfigContext{}, dataplaneResponse) + case model.RollbackRequired: + slog.ErrorContext( + ctx, + "Failed to apply config changes, rolling back", + "instance_id", instanceID, "error", err, ) + dataplaneResponse = response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_ERROR, + Message: "Config apply failed, rolling back config", + Error: err.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + instanceID, + ) + + n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dataplaneResponse}) + + rollbackErr := n.fileManagerService.Rollback(ctx, instanceID) + + if rollbackErr != nil { + applyErr := fmt.Errorf("config apply error: %w", err) + rbErr := fmt.Errorf("rollback error: %w", rollbackErr) + combinedErr := errors.Join(applyErr, rbErr) + + rollbackResponse := response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "Config apply failed, rollback failed", + Error: combinedErr.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + instanceID, + ) + n.completeConfigApply(ctx, &model.NginxConfigContext{}, rollbackResponse) + + return + } + + dataplaneResponse = response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "Config apply failed, rollback successful", + Error: err.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + instanceID) + n.completeConfigApply(ctx, &model.NginxConfigContext{}, dataplaneResponse) + case model.OK: + slog.DebugContext(ctx, "Changes required for config apply request") + n.applyConfig(ctx, correlationID, instanceID) + } +} + +func (n *NginxPlugin) applyConfig(ctx context.Context, correlationID, instanceID string) { + configContext, err := n.nginxService.ApplyConfig(ctx, instanceID) + if err != nil { + slog.ErrorContext( + ctx, + "Errors found during config apply, sending error status and rolling back configuration updates", + "error", err, + ) dpResponse := response.CreateDataPlaneResponse( - data.CorrelationID, + correlationID, &mpi.CommandResponse{ Status: mpi.CommandResponse_COMMAND_STATUS_ERROR, Message: "Config apply failed, rolling back config", Error: err.Error(), }, mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, - data.InstanceID, + instanceID, ) n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse}) - n.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyFailedTopic, Data: data}) + + n.writeRollbackConfig(ctx, correlationID, instanceID, err) return } dpResponse := response.CreateDataPlaneResponse( - data.CorrelationID, + correlationID, &mpi.CommandResponse{ Status: mpi.CommandResponse_COMMAND_STATUS_OK, Message: "Config apply successful", Error: "", }, mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, - data.InstanceID, + instanceID, ) - successMessage := &model.ReloadSuccess{ - ConfigContext: configContext, - DataPlaneResponse: dpResponse, + if configContext.Files != nil { + slog.DebugContext(ctx, "Changes made during config apply, update files on disk") + updateError := n.fileManagerService.UpdateCurrentFilesOnDisk( + ctx, + files.ConvertToMapOfFiles(configContext.Files), + true, + ) + if updateError != nil { + slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError) + } } - n.messagePipe.Process(ctx, &bus.Message{Topic: bus.ReloadSuccessfulTopic, Data: successMessage}) + n.completeConfigApply(ctx, configContext, dpResponse) } -func (n *Nginx) handleRollbackWrite(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "Nginx plugin received rollback write message") - data, ok := msg.Data.(*model.ConfigApplyMessage) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplyMessage", "payload", msg.Data) +func (n *NginxPlugin) writeRollbackConfig(ctx context.Context, correlationID, instanceID string, applyErr error) { + slog.DebugContext(ctx, "Starting rollback of config", "instance_id", instanceID) + if instanceID == "" { + n.fileManagerService.ClearCache() + return + } + + err := n.fileManagerService.Rollback(ctx, instanceID) + if err != nil { + configErr := fmt.Errorf("config apply error: %w", applyErr) + rbErr := fmt.Errorf("rollback error: %w", err) + combinedErr := errors.Join(configErr, rbErr) + + rollbackResponse := response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_ERROR, + Message: "Rollback failed", + Error: err.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + instanceID, + ) + + applyResponse := response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "Config apply failed, rollback failed", + Error: combinedErr.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + instanceID, + ) + + n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse}) + + n.completeConfigApply(ctx, &model.NginxConfigContext{}, applyResponse) return } - _, err := n.nginxService.ApplyConfig(ctx, data.InstanceID) + + n.rollbackConfigApply(ctx, correlationID, instanceID, applyErr) +} + +func (n *NginxPlugin) rollbackConfigApply(ctx context.Context, correlationID, instanceID string, applyErr error) { + slog.DebugContext(ctx, "Rolling back config apply, after config written", "instance_id", instanceID) + _, err := n.nginxService.ApplyConfig(ctx, instanceID) if err != nil { slog.ErrorContext(ctx, "Errors found during rollback, sending failure status", "error", err) rollbackResponse := response.CreateDataPlaneResponse( - data.CorrelationID, + correlationID, &mpi.CommandResponse{ Status: mpi.CommandResponse_COMMAND_STATUS_ERROR, Message: "Rollback failed", Error: err.Error(), }, mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, - data.InstanceID, + instanceID, ) + configErr := fmt.Errorf("config apply error: %w", applyErr) + rbErr := fmt.Errorf("rollback error: %w", err) + combinedErr := errors.Join(configErr, rbErr) + applyResponse := response.CreateDataPlaneResponse( - data.CorrelationID, + correlationID, &mpi.CommandResponse{ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, Message: "Config apply failed, rollback failed", - Error: data.Error.Error(), + Error: combinedErr.Error(), }, mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, - data.InstanceID, + instanceID, ) n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse}) - n.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) + + n.completeConfigApply(ctx, &model.NginxConfigContext{}, applyResponse) return } applyResponse := response.CreateDataPlaneResponse( - data.CorrelationID, + correlationID, &mpi.CommandResponse{ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, Message: "Config apply failed, rollback successful", - Error: data.Error.Error(), + Error: applyErr.Error(), }, mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, - data.InstanceID, + instanceID, ) - n.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) + n.completeConfigApply(ctx, &model.NginxConfigContext{}, applyResponse) } -func (n *Nginx) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "Nginx plugin received agent config update message") - - n.agentConfigMutex.Lock() - defer n.agentConfigMutex.Unlock() +func (n *NginxPlugin) completeConfigApply(ctx context.Context, configContext *model.NginxConfigContext, + dpResponse *mpi.DataPlaneResponse, +) { + n.fileManagerService.ClearCache() + n.enableWatchers(ctx, configContext, dpResponse.GetInstanceId()) + n.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse}) +} - agentConfig, ok := msg.Data.(*config.Config) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *config.Config", "payload", msg.Data) - return +func (n *NginxPlugin) enableWatchers(ctx context.Context, configContext *model.NginxConfigContext, instanceID string) { + enableWatcher := &model.EnableWatchers{ + InstanceID: instanceID, + ConfigContext: configContext, } - n.agentConfig = agentConfig + n.messagePipe.Process(ctx, &bus.Message{Topic: bus.EnableWatchersTopic, Data: enableWatcher}) } diff --git a/internal/nginx/nginx_plugin_test.go b/internal/nginx/nginx_plugin_test.go index 2d276bd22..633370c1b 100644 --- a/internal/nginx/nginx_plugin_test.go +++ b/internal/nginx/nginx_plugin_test.go @@ -10,9 +10,15 @@ import ( "context" "encoding/json" "errors" - "sort" + "os" + "sync" "testing" + "time" + "github.com/nginx/agent/v3/api/grpc/mpi/v1/v1fakes" + "github.com/nginx/agent/v3/internal/file/filefakes" + "github.com/nginx/agent/v3/internal/grpc/grpcfakes" + "github.com/nginx/agent/v3/pkg/files" "github.com/nginx/agent/v3/test/stub" "google.golang.org/protobuf/types/known/structpb" @@ -33,75 +39,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestResource_Process_Apply(t *testing.T) { - ctx := context.Background() - - tests := []struct { - name string - message *bus.Message - applyErr error - topic []string - }{ - { - name: "Test 1: Write Config Successful Topic - Success Status", - message: &bus.Message{ - Topic: bus.WriteConfigSuccessfulTopic, - Data: &model.ConfigApplyMessage{ - CorrelationID: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - InstanceID: protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(), - Error: nil, - }, - }, - applyErr: nil, - topic: []string{bus.ReloadSuccessfulTopic}, - }, - { - name: "Test 2: Write Config Successful Topic - Fail Status", - message: &bus.Message{ - Topic: bus.WriteConfigSuccessfulTopic, - Data: &model.ConfigApplyMessage{ - CorrelationID: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - InstanceID: protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(), - Error: nil, - }, - }, - applyErr: errors.New("error reloading"), - topic: []string{bus.DataPlaneResponseTopic, bus.ConfigApplyFailedTopic}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(tt *testing.T) { - fakeNginxService := &nginxfakes.FakeNginxServiceInterface{} - fakeNginxService.ApplyConfigReturns(&model.NginxConfigContext{}, test.applyErr) - messagePipe := busfakes.NewFakeMessagePipe() - - resourcePlugin := NewNginx(types.AgentConfig()) - resourcePlugin.nginxService = fakeNginxService - - err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) - require.NoError(t, err) - - resourcePlugin.messagePipe = messagePipe - - resourcePlugin.Process(ctx, test.message) - - assert.Equal(t, test.topic[0], messagePipe.Messages()[0].Topic) - - if len(test.topic) > 1 { - assert.Equal(t, test.topic[1], messagePipe.Messages()[1].Topic) - } - - if test.applyErr != nil { - response, ok := messagePipe.Messages()[0].Data.(*mpi.DataPlaneResponse) - assert.True(tt, ok) - assert.Equal(tt, test.applyErr.Error(), response.GetCommandResponse().GetError()) - } - }) - } -} - -func TestResource_createPlusAPIError(t *testing.T) { +func TestNginx_createPlusAPIError(t *testing.T) { s := "failed to get the HTTP servers of upstream nginx1: expected 200 response, got 404. error.status=404;" + " error.text=upstream not found; error.code=UpstreamNotFound; request_id=b534bdab5cb5e321e8b41b431828b270; " + "href=https://nginx.org/en/docs/http/ngx_http_api_module.html" @@ -123,7 +61,7 @@ func TestResource_createPlusAPIError(t *testing.T) { assert.Equal(t, errors.New(string(expectedJSON)), result) } -func TestResource_Process_APIAction_GetHTTPServers(t *testing.T) { +func TestNginx_Process_APIAction_GetHTTPServers(t *testing.T) { ctx := context.Background() inValidInstance := protos.NginxPlusInstance([]string{}) @@ -198,14 +136,14 @@ func TestResource_Process_APIAction_GetHTTPServers(t *testing.T) { } for _, test := range tests { - runResourceTestHelper(t, ctx, test.name, func(fakeService *nginxfakes.FakeNginxServiceInterface) { + runNginxTestHelper(t, ctx, test.name, func(fakeService *nginxfakes.FakeNginxServiceInterface) { fakeService.GetHTTPUpstreamServersReturns(test.upstreams, test.err) }, test.instance, test.message, test.topic, test.err) } } //nolint:dupl // need to refactor so that redundant code can be removed -func TestResource_Process_APIAction_UpdateHTTPUpstreams(t *testing.T) { +func TestNginx_Process_APIAction_UpdateHTTPUpstreams(t *testing.T) { ctx := context.Background() tests := []struct { instance *mpi.Instance @@ -280,15 +218,16 @@ func TestResource_Process_APIAction_UpdateHTTPUpstreams(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewNginx(types.AgentConfig()) - resourcePlugin.nginxService = fakeNginxService + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + nginxPlugin.nginxService = fakeNginxService - err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) + err := messagePipe.Register(2, []bus.Plugin{nginxPlugin}) require.NoError(tt, err) - resourcePlugin.messagePipe = messagePipe + nginxPlugin.messagePipe = messagePipe - resourcePlugin.Process(ctx, test.message) + nginxPlugin.Process(ctx, test.message) assert.Equal(tt, test.topic[0], messagePipe.Messages()[0].Topic) @@ -308,7 +247,7 @@ func TestResource_Process_APIAction_UpdateHTTPUpstreams(t *testing.T) { } //nolint:dupl // need to refactor so that redundant code can be removed -func TestResource_Process_APIAction_UpdateStreamServers(t *testing.T) { +func TestNginx_Process_APIAction_UpdateStreamServers(t *testing.T) { ctx := context.Background() tests := []struct { instance *mpi.Instance @@ -383,15 +322,16 @@ func TestResource_Process_APIAction_UpdateStreamServers(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewNginx(types.AgentConfig()) - resourcePlugin.nginxService = fakeNginxService + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + nginxPlugin.nginxService = fakeNginxService - err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) + err := messagePipe.Register(2, []bus.Plugin{nginxPlugin}) require.NoError(tt, err) - resourcePlugin.messagePipe = messagePipe + nginxPlugin.messagePipe = messagePipe - resourcePlugin.Process(ctx, test.message) + nginxPlugin.Process(ctx, test.message) assert.Equal(tt, test.topic[0], messagePipe.Messages()[0].Topic) @@ -410,7 +350,7 @@ func TestResource_Process_APIAction_UpdateStreamServers(t *testing.T) { } } -func TestResource_Process_APIAction_GetStreamUpstreams(t *testing.T) { +func TestNginx_Process_APIAction_GetStreamUpstreams(t *testing.T) { ctx := context.Background() inValidInstance := protos.NginxPlusInstance([]string{}) @@ -525,15 +465,16 @@ func TestResource_Process_APIAction_GetStreamUpstreams(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewNginx(types.AgentConfig()) - resourcePlugin.nginxService = fakeNginxService + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + nginxPlugin.nginxService = fakeNginxService - err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) + err := messagePipe.Register(2, []bus.Plugin{nginxPlugin}) require.NoError(t, err) - resourcePlugin.messagePipe = messagePipe + nginxPlugin.messagePipe = messagePipe - resourcePlugin.Process(ctx, test.message) + nginxPlugin.Process(ctx, test.message) assert.Equal(t, test.topic[0], messagePipe.Messages()[0].Topic) @@ -551,7 +492,7 @@ func TestResource_Process_APIAction_GetStreamUpstreams(t *testing.T) { } } -func TestResource_Process_APIAction_GetUpstreams(t *testing.T) { +func TestNginx_Process_APIAction_GetUpstreams(t *testing.T) { ctx := context.Background() inValidInstance := protos.NginxPlusInstance([]string{}) @@ -665,124 +606,483 @@ func TestResource_Process_APIAction_GetUpstreams(t *testing.T) { } for _, test := range tests { - runResourceTestHelper(t, ctx, test.name, func(fakeService *nginxfakes.FakeNginxServiceInterface) { + runNginxTestHelper(t, ctx, test.name, func(fakeService *nginxfakes.FakeNginxServiceInterface) { fakeService.GetUpstreamsReturns(test.upstreams, test.err) }, test.instance, test.message, test.topic, test.err) } } -func TestResource_Process_Rollback(t *testing.T) { +func TestNginx_Subscriptions(t *testing.T) { + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + assert.Equal(t, + []string{ + bus.APIActionRequestTopic, + bus.ConnectionResetTopic, + bus.ConnectionCreatedTopic, + bus.NginxConfigUpdateTopic, + bus.ConfigUploadRequestTopic, + bus.ResourceUpdateTopic, + bus.ConfigApplyRequestTopic, + }, + nginxPlugin.Subscriptions()) + + readNginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Auxiliary, &sync.RWMutex{}) + assert.Equal(t, + []string{ + bus.APIActionRequestTopic, + bus.ConnectionResetTopic, + bus.ConnectionCreatedTopic, + bus.NginxConfigUpdateTopic, + bus.ConfigUploadRequestTopic, + bus.ResourceUpdateTopic, + }, + readNginxPlugin.Subscriptions()) +} + +func TestNginx_Info(t *testing.T) { + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + assert.Equal(t, &bus.Info{Name: "nginx"}, nginxPlugin.Info()) + + readNginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Auxiliary, &sync.RWMutex{}) + assert.Equal(t, &bus.Info{Name: "auxiliary-nginx"}, readNginxPlugin.Info()) +} + +func TestNginx_Init(t *testing.T) { ctx := context.Background() + fakeNginxService := nginxfakes.FakeNginxServiceInterface{} - tests := []struct { - name string - message *bus.Message - rollbackErr error - topic []string - }{ - { - name: "Test 1: Rollback Write Topic - Success Status", - message: &bus.Message{ - Topic: bus.RollbackWriteTopic, - Data: &model.ConfigApplyMessage{ - CorrelationID: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - InstanceID: protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(), - Error: errors.New("something went wrong with config apply"), + messagePipe := busfakes.NewFakeMessagePipe() + messagePipe.RunWithoutInit(ctx) + + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + nginxPlugin.nginxService = &fakeNginxService + err := nginxPlugin.Init(ctx, messagePipe) + require.NoError(t, err) + + messages := messagePipe.Messages() + + assert.Empty(t, messages) +} + +func TestNginx_Process_handleConfigUploadRequest(t *testing.T) { + ctx := context.Background() + + tempDir := os.TempDir() + testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf") + defer helpers.RemoveFileWithErrorCheck(t, testFile.Name()) + fileMeta := protos.FileMeta(testFile.Name(), "") + + message := &mpi.ManagementPlaneRequest{ + Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{ + ConfigUploadRequest: &mpi.ConfigUploadRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{ + { + FileMeta: fileMeta, + }, + { + FileMeta: fileMeta, + }, + }, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "123", + Version: "f33ref3d32d3c32d3a", + }, }, }, - rollbackErr: nil, - topic: []string{bus.ConfigApplyCompleteTopic}, }, - { - name: "Test 2: Rollback Write Topic - Fail Status", - message: &bus.Message{ - Topic: bus.RollbackWriteTopic, - Data: &model.ConfigApplyMessage{ - CorrelationID: "", - InstanceID: protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(), - Error: errors.New("something went wrong with config apply"), + } + + fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) + messagePipe := busfakes.NewFakeMessagePipe() + + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + err := nginxPlugin.Init(ctx, messagePipe) + require.NoError(t, err) + + nginxPlugin.Process(ctx, &bus.Message{Topic: bus.ConnectionCreatedTopic}) + nginxPlugin.Process(ctx, &bus.Message{Topic: bus.ConfigUploadRequestTopic, Data: message}) + + assert.Eventually( + t, + func() bool { return fakeFileServiceClient.UpdateFileCallCount() == 2 }, + 2*time.Second, + 10*time.Millisecond, + ) + + messages := messagePipe.Messages() + assert.Len(t, messages, 1) + assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) + + dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_OK, + dataPlaneResponse.GetCommandResponse().GetStatus(), + ) +} + +func TestNginx_Process_handleConfigUploadRequest_Failure(t *testing.T) { + ctx := context.Background() + + fileMeta := protos.FileMeta("/unknown/file.conf", "") + + message := &mpi.ManagementPlaneRequest{ + Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{ + ConfigUploadRequest: &mpi.ConfigUploadRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{ + { + FileMeta: fileMeta, + }, + { + FileMeta: fileMeta, + }, + }, + ConfigVersion: protos.CreateConfigVersion(), }, }, - rollbackErr: errors.New("error reloading"), - topic: []string{bus.ConfigApplyCompleteTopic, bus.DataPlaneResponseTopic}, + }, + } + + fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) + messagePipe := busfakes.NewFakeMessagePipe() + + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + err := nginxPlugin.Init(ctx, messagePipe) + require.NoError(t, err) + + nginxPlugin.Process(ctx, &bus.Message{Topic: bus.ConnectionCreatedTopic}) + nginxPlugin.Process(ctx, &bus.Message{Topic: bus.ConfigUploadRequestTopic, Data: message}) + + assert.Eventually( + t, + func() bool { return len(messagePipe.Messages()) == 1 }, + 2*time.Second, + 10*time.Millisecond, + ) + + assert.Equal(t, 0, fakeFileServiceClient.UpdateFileCallCount()) + + messages := messagePipe.Messages() + assert.Len(t, messages, 1) + + assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) + + dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + dataPlaneResponse.GetCommandResponse().GetStatus(), + ) +} + +func TestNginx_Process_handleConfigApplyRequest(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + + filePath := tempDir + "/nginx.conf" + fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}") + fileHash := files.GenerateHash(fileContent) + + message := &mpi.ManagementPlaneRequest{ + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: protos.CreateConfigApplyRequest(protos.FileOverview(filePath, fileHash)), + }, + } + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + agentConfig := types.AgentConfig() + agentConfig.AllowedDirectories = []string{tempDir} + + tests := []struct { + message *mpi.ManagementPlaneRequest + configApplyReturnsErr error + name string + configApplyStatus model.WriteStatus + }{ + { + name: "Test 1 - Success", + configApplyReturnsErr: nil, + configApplyStatus: model.OK, + message: message, + }, + { + name: "Test 2 - Fail, Rollback", + configApplyReturnsErr: errors.New("something went wrong"), + configApplyStatus: model.RollbackRequired, + message: message, + }, + { + name: "Test 3 - Fail, No Rollback", + configApplyReturnsErr: errors.New("something went wrong"), + configApplyStatus: model.Error, + message: message, + }, + { + name: "Test 4 - Fail to cast payload", + configApplyReturnsErr: errors.New("something went wrong"), + configApplyStatus: model.Error, + message: nil, + }, + { + name: "Test 5 - No changes needed", + configApplyReturnsErr: nil, + configApplyStatus: model.NoChange, + message: message, }, } for _, test := range tests { - t.Run(test.name, func(tt *testing.T) { + t.Run(test.name, func(t *testing.T) { fakeNginxService := &nginxfakes.FakeNginxServiceInterface{} - fakeNginxService.ApplyConfigReturns(&model.NginxConfigContext{}, test.rollbackErr) + fakeNginxService.ApplyConfigReturns(&model.NginxConfigContext{}, test.configApplyReturnsErr) + + fakeFileManagerService := &filefakes.FakeFileManagerServiceInterface{} + fakeFileManagerService.ConfigApplyReturns(test.configApplyStatus, test.configApplyReturnsErr) messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewNginx(types.AgentConfig()) - resourcePlugin.nginxService = fakeNginxService + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) - err := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) + err := nginxPlugin.Init(ctx, messagePipe) + nginxPlugin.fileManagerService = fakeFileManagerService + nginxPlugin.nginxService = fakeNginxService require.NoError(t, err) - resourcePlugin.messagePipe = messagePipe + nginxPlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplyRequestTopic, Data: test.message}) + + messages := messagePipe.Messages() + + switch { + case test.configApplyStatus == model.OK: + assert.Len(t, messages, 2) + assert.Equal(t, bus.EnableWatchersTopic, messages[0].Topic) + assert.Equal(t, bus.DataPlaneResponseTopic, messages[1].Topic) + + msg, ok := messages[1].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, msg.GetCommandResponse().GetStatus()) + assert.Equal(t, "Config apply successful", msg.GetCommandResponse().GetMessage()) + case test.configApplyStatus == model.RollbackRequired: + assert.Len(t, messages, 3) + + assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) + dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_ERROR, + dataPlaneResponse.GetCommandResponse().GetStatus(), + ) + assert.Equal(t, "Config apply failed, rolling back config", + dataPlaneResponse.GetCommandResponse().GetMessage()) + assert.Equal(t, test.configApplyReturnsErr.Error(), dataPlaneResponse.GetCommandResponse().GetError()) + + assert.Equal(t, bus.EnableWatchersTopic, messages[1].Topic) + + dataPlaneResponse, ok = messages[2].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal(t, "Config apply failed, rollback successful", + dataPlaneResponse.GetCommandResponse().GetMessage()) + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_FAILURE, + dataPlaneResponse.GetCommandResponse().GetStatus()) + + case test.configApplyStatus == model.NoChange: + assert.Len(t, messages, 2) + assert.Equal(t, bus.EnableWatchersTopic, messages[0].Topic) + + response, ok := messages[1].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal(t, 1, fakeFileManagerService.ClearCacheCallCount()) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_OK, + response.GetCommandResponse().GetStatus(), + ) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_OK, + response.GetCommandResponse().GetStatus(), + ) + + case test.message == nil: + assert.Empty(t, messages) + default: + assert.Len(t, messages, 2) + assert.Equal(t, bus.EnableWatchersTopic, messages[0].Topic) + + dataPlaneResponse, ok := messages[1].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + dataPlaneResponse.GetCommandResponse().GetStatus(), + ) + assert.Equal(t, "Config apply failed", dataPlaneResponse.GetCommandResponse().GetMessage()) + assert.Equal(t, test.configApplyReturnsErr.Error(), dataPlaneResponse.GetCommandResponse().GetError()) + } + }) + } +} + +func TestNginxPlugin_Failed_ConfigApply(t *testing.T) { + ctx := context.Background() - resourcePlugin.Process(ctx, test.message) + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - sort.Slice(messagePipe.Messages(), func(i, j int) bool { - return messagePipe.Messages()[i].Topic < messagePipe.Messages()[j].Topic - }) + tests := []struct { + rollbackError error + rollbackWriteError error + message string + name string + }{ + { + name: "Test 1 - Rollback Success", + message: "", + rollbackError: nil, + rollbackWriteError: nil, + }, + { + name: "Test 2 - Rollback Failed", + message: "config apply error: something went wrong\nrollback error: rollback failed", + rollbackError: errors.New("rollback failed"), + rollbackWriteError: nil, + }, + { + name: "Test 3 - Rollback Write Failed", + message: "config apply error: something went wrong\nrollback error: rollback write failed", + rollbackError: nil, + rollbackWriteError: errors.New("rollback write failed"), + }, + } - assert.Len(tt, messagePipe.Messages(), len(test.topic)) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeNginxService := &nginxfakes.FakeNginxServiceInterface{} + fakeNginxService.ApplyConfigReturnsOnCall(0, &model.NginxConfigContext{}, + errors.New("something went wrong")) + fakeNginxService.ApplyConfigReturnsOnCall(1, &model.NginxConfigContext{}, tt.rollbackWriteError) - assert.Equal(t, test.topic[0], messagePipe.Messages()[0].Topic) + fakeFileManagerService := &filefakes.FakeFileManagerServiceInterface{} + fakeFileManagerService.RollbackReturns(tt.rollbackError) - if len(test.topic) > 1 { - assert.Equal(t, test.topic[1], messagePipe.Messages()[1].Topic) - } + messagePipe := busfakes.NewFakeMessagePipe() - if test.rollbackErr != nil { - rollbackResponse, ok := messagePipe.Messages()[1].Data.(*mpi.DataPlaneResponse) - assert.True(tt, ok) - assert.Equal(t, test.topic[1], messagePipe.Messages()[1].Topic) - assert.Equal(tt, test.rollbackErr.Error(), rollbackResponse.GetCommandResponse().GetError()) + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + + err := nginxPlugin.Init(ctx, messagePipe) + nginxPlugin.fileManagerService = fakeFileManagerService + nginxPlugin.nginxService = fakeNginxService + require.NoError(t, err) + + nginxPlugin.applyConfig(ctx, "dfsbhj6-bc92-30c1-a9c9-85591422068e", protos. + NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId()) + + messages := messagePipe.Messages() + + dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_ERROR, + dataPlaneResponse.GetCommandResponse().GetStatus(), + ) + assert.Equal(t, "Config apply failed, rolling back config", + dataPlaneResponse.GetCommandResponse().GetMessage()) + + if tt.rollbackError == nil && tt.rollbackWriteError == nil { + assert.Len(t, messages, 3) + assert.Equal(t, bus.EnableWatchersTopic, messages[1].Topic) + + dataPlaneResponse, ok = messages[2].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + dataPlaneResponse.GetCommandResponse().GetStatus(), + ) + + assert.Equal(t, "Config apply failed, rollback successful", + dataPlaneResponse.GetCommandResponse().GetMessage()) + } else { + assert.Len(t, messages, 4) + dataPlaneResponse, ok = messages[1].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_ERROR, + dataPlaneResponse.GetCommandResponse().GetStatus(), + ) + + assert.Equal(t, "Rollback failed", dataPlaneResponse.GetCommandResponse().GetMessage()) + assert.Equal(t, bus.EnableWatchersTopic, messages[2].Topic) + + dataPlaneResponse, ok = messages[3].Data.(*mpi.DataPlaneResponse) + assert.True(t, ok) + assert.Equal( + t, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + dataPlaneResponse.GetCommandResponse().GetStatus(), + ) + + assert.Equal(t, "Config apply failed, rollback failed", + dataPlaneResponse.GetCommandResponse().GetMessage()) + assert.Equal(t, tt.message, dataPlaneResponse.GetCommandResponse().GetError()) } }) } } -func TestResource_Subscriptions(t *testing.T) { - resourcePlugin := NewNginx(types.AgentConfig()) - assert.Equal(t, - []string{ - bus.ResourceUpdateTopic, - bus.WriteConfigSuccessfulTopic, - bus.RollbackWriteTopic, - bus.APIActionRequestTopic, - bus.AgentConfigUpdateTopic, - }, - resourcePlugin.Subscriptions()) -} +func TestNginxPlugin_Process_NginxConfigUpdateTopic(t *testing.T) { + ctx := context.Background() -func TestResource_Info(t *testing.T) { - resourcePlugin := NewNginx(types.AgentConfig()) - assert.Equal(t, &bus.Info{Name: "nginx"}, resourcePlugin.Info()) -} + fileMeta := protos.FileMeta("/etc/nginx/nginx/conf", "") -func TestResource_Init(t *testing.T) { - ctx := context.Background() - fakeNginxService := nginxfakes.FakeNginxServiceInterface{} + message := &model.NginxConfigContext{ + Files: []*mpi.File{ + { + FileMeta: fileMeta, + }, + }, + } + + fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} + fakeFileServiceClient.UpdateOverviewReturns(&mpi.UpdateOverviewResponse{ + Overview: nil, + }, nil) + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) messagePipe := busfakes.NewFakeMessagePipe() - messagePipe.RunWithoutInit(ctx) - resourcePlugin := NewNginx(types.AgentConfig()) - resourcePlugin.nginxService = &fakeNginxService - err := resourcePlugin.Init(ctx, messagePipe) + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + err := nginxPlugin.Init(ctx, messagePipe) require.NoError(t, err) - messages := messagePipe.Messages() + nginxPlugin.Process(ctx, &bus.Message{Topic: bus.ConnectionCreatedTopic}) + nginxPlugin.Process(ctx, &bus.Message{Topic: bus.NginxConfigUpdateTopic, Data: message}) - assert.Empty(t, messages) + assert.Eventually( + t, + func() bool { return fakeFileServiceClient.UpdateOverviewCallCount() == 1 }, + 2*time.Second, + 10*time.Millisecond, + ) } //nolint:revive,lll // maximum number of arguments exceed -func runResourceTestHelper(t *testing.T, ctx context.Context, testName string, getUpstreamsFunc func(serviceInterface *nginxfakes.FakeNginxServiceInterface), instance *mpi.Instance, message *bus.Message, topic []string, err error) { +func runNginxTestHelper(t *testing.T, ctx context.Context, testName string, + getUpstreamsFunc func(serviceInterface *nginxfakes.FakeNginxServiceInterface), instance *mpi.Instance, + message *bus.Message, topic []string, err error, +) { t.Helper() t.Run(testName, func(tt *testing.T) { @@ -794,14 +1094,15 @@ func runResourceTestHelper(t *testing.T, ctx context.Context, testName string, g } messagePipe := busfakes.NewFakeMessagePipe() - resourcePlugin := NewNginx(types.AgentConfig()) - resourcePlugin.nginxService = fakeNginxService + fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} + nginxPlugin := NewNginx(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) + nginxPlugin.nginxService = fakeNginxService - registerErr := messagePipe.Register(2, []bus.Plugin{resourcePlugin}) + registerErr := messagePipe.Register(2, []bus.Plugin{nginxPlugin}) require.NoError(t, registerErr) - resourcePlugin.messagePipe = messagePipe - resourcePlugin.Process(ctx, message) + nginxPlugin.messagePipe = messagePipe + nginxPlugin.Process(ctx, message) assert.Equal(tt, topic[0], messagePipe.Messages()[0].Topic) diff --git a/internal/plugin/plugin_manager.go b/internal/plugin/plugin_manager.go index dae406deb..c4f8a9ea0 100644 --- a/internal/plugin/plugin_manager.go +++ b/internal/plugin/plugin_manager.go @@ -17,7 +17,6 @@ import ( "github.com/nginx/agent/v3/internal/collector" "github.com/nginx/agent/v3/internal/command" - "github.com/nginx/agent/v3/internal/file" "github.com/nginx/agent/v3/internal/grpc" "github.com/nginx/agent/v3/internal/nginx" @@ -31,23 +30,15 @@ func LoadPlugins(ctx context.Context, agentConfig *config.Config) []bus.Plugin { manifestLock := &sync.RWMutex{} - plugins = addResourcePlugin(plugins, agentConfig) - plugins = addCommandAndFilePlugins(ctx, plugins, agentConfig, manifestLock) - plugins = addAuxiliaryCommandAndFilePlugins(ctx, plugins, agentConfig, manifestLock) + plugins = addCommandAndNginxPlugins(ctx, plugins, agentConfig, manifestLock) + plugins = addAuxiliaryCommandAndNginxPlugins(ctx, plugins, agentConfig, manifestLock) plugins = addCollectorPlugin(ctx, agentConfig, plugins) plugins = addWatcherPlugin(plugins, agentConfig) return plugins } -func addResourcePlugin(plugins []bus.Plugin, agentConfig *config.Config) []bus.Plugin { - resourcePlugin := nginx.NewNginx(agentConfig) - plugins = append(plugins, resourcePlugin) - - return plugins -} - -func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentConfig *config.Config, +func addCommandAndNginxPlugins(ctx context.Context, plugins []bus.Plugin, agentConfig *config.Config, manifestLock *sync.RWMutex, ) []bus.Plugin { if agentConfig.IsCommandGrpcClientConfigured() { @@ -62,8 +53,8 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo } else { commandPlugin := command.NewCommandPlugin(agentConfig, grpcConnection, model.Command) plugins = append(plugins, commandPlugin) - filePlugin := file.NewFilePlugin(agentConfig, grpcConnection, model.Command, manifestLock) - plugins = append(plugins, filePlugin) + nginxPlugin := nginx.NewNginx(agentConfig, grpcConnection, model.Command, manifestLock) + plugins = append(plugins, nginxPlugin) } } else { slog.InfoContext(ctx, "Agent is not connected to a management plane. "+ @@ -73,7 +64,7 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo return plugins } -func addAuxiliaryCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, +func addAuxiliaryCommandAndNginxPlugins(ctx context.Context, plugins []bus.Plugin, agentConfig *config.Config, manifestLock *sync.RWMutex, ) []bus.Plugin { if agentConfig.IsAuxiliaryCommandGrpcClientConfigured() { @@ -88,8 +79,8 @@ func addAuxiliaryCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin } else { auxCommandPlugin := command.NewCommandPlugin(agentConfig, auxGRPCConnection, model.Auxiliary) plugins = append(plugins, auxCommandPlugin) - readFilePlugin := file.NewFilePlugin(agentConfig, auxGRPCConnection, model.Auxiliary, manifestLock) - plugins = append(plugins, readFilePlugin) + readNginxPlugin := nginx.NewNginx(agentConfig, auxGRPCConnection, model.Auxiliary, manifestLock) + plugins = append(plugins, readNginxPlugin) } } else { slog.DebugContext(ctx, "Agent is not connected to an auxiliary management plane. "+ diff --git a/internal/plugin/plugin_manager_test.go b/internal/plugin/plugin_manager_test.go index a96eb8e5f..5c1989985 100644 --- a/internal/plugin/plugin_manager_test.go +++ b/internal/plugin/plugin_manager_test.go @@ -13,7 +13,6 @@ import ( "github.com/nginx/agent/v3/internal/collector" "github.com/nginx/agent/v3/internal/command" - "github.com/nginx/agent/v3/internal/file" "github.com/nginx/agent/v3/internal/nginx" "github.com/nginx/agent/v3/internal/bus" @@ -34,7 +33,6 @@ func TestLoadPlugins(t *testing.T) { name: "Test 1: Load plugins", input: &config.Config{}, expected: []bus.Plugin{ - &nginx.Nginx{}, &watcher.Watcher{}, }, }, @@ -58,11 +56,10 @@ func TestLoadPlugins(t *testing.T) { Features: config.DefaultFeatures(), }, expected: []bus.Plugin{ - &nginx.Nginx{}, &command.CommandPlugin{}, - &file.FilePlugin{}, + &nginx.NginxPlugin{}, &command.CommandPlugin{}, - &file.FilePlugin{}, + &nginx.NginxPlugin{}, &watcher.Watcher{}, }, }, @@ -77,7 +74,6 @@ func TestLoadPlugins(t *testing.T) { Features: config.DefaultFeatures(), }, expected: []bus.Plugin{ - &nginx.Nginx{}, &collector.Collector{}, &watcher.Watcher{}, }, @@ -103,9 +99,8 @@ func TestLoadPlugins(t *testing.T) { }, }, expected: []bus.Plugin{ - &nginx.Nginx{}, &command.CommandPlugin{}, - &file.FilePlugin{}, + &nginx.NginxPlugin{}, &watcher.Watcher{}, }, }, @@ -134,9 +129,8 @@ func TestLoadPlugins(t *testing.T) { }, }, expected: []bus.Plugin{ - &nginx.Nginx{}, &command.CommandPlugin{}, - &file.FilePlugin{}, + &nginx.NginxPlugin{}, &collector.Collector{}, &watcher.Watcher{}, },