diff --git a/Makefile b/Makefile index d27ae81da..aaf6b6168 100644 --- a/Makefile +++ b/Makefile @@ -161,13 +161,13 @@ integration-test: $(SELECTED_PACKAGE) build-mock-management-plane-grpc TEST_ENV="Container" CONTAINER_OS_TYPE=$(CONTAINER_OS_TYPE) BUILD_TARGET="install-agent-local" CONTAINER_NGINX_IMAGE_REGISTRY=${CONTAINER_NGINX_IMAGE_REGISTRY} \ PACKAGES_REPO=$(OSS_PACKAGES_REPO) PACKAGE_NAME=$(PACKAGE_NAME) BASE_IMAGE=$(BASE_IMAGE) DOCKERFILE_PATH=$(DOCKERFILE_PATH) IMAGE_PATH=$(IMAGE_PATH) TAG=${IMAGE_TAG} \ OS_VERSION=$(OS_VERSION) OS_RELEASE=$(OS_RELEASE) \ - go test -v ./test/integration/installuninstall ./test/integration/managementplane ./test/integration/nginxless + go test -v ./test/integration/installuninstall ./test/integration/managementplane ./test/integration/auxiliarycommandserver ./test/integration/nginxless official-image-integration-test: $(SELECTED_PACKAGE) build-mock-management-plane-grpc TEST_ENV="Container" CONTAINER_OS_TYPE=$(CONTAINER_OS_TYPE) CONTAINER_NGINX_IMAGE_REGISTRY=${CONTAINER_NGINX_IMAGE_REGISTRY} BUILD_TARGET="install" \ PACKAGES_REPO=$(OSS_PACKAGES_REPO) TAG=${TAG} PACKAGE_NAME=$(PACKAGE_NAME) BASE_IMAGE=$(BASE_IMAGE) DOCKERFILE_PATH=$(OFFICIAL_IMAGE_DOCKERFILE_PATH) \ OS_VERSION=$(OS_VERSION) OS_RELEASE=$(OS_RELEASE) IMAGE_PATH=$(IMAGE_PATH) \ - go test -v ./test/integration/managementplane + go test -v ./test/integration/managementplane ./test/integration/auxiliarycommandserver performance-test: @mkdir -p $(TEST_BUILD_DIR) diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index b8076fdfb..74ab60039 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -88,6 +88,7 @@ type ( ) type FileManagerService struct { + manifestLock *sync.RWMutex agentConfig *config.Config fileOperator fileOperator fileServiceOperator fileServiceOperatorInterface @@ -102,16 +103,19 @@ type FileManagerService struct { filesMutex sync.RWMutex } -func NewFileManagerService(fileServiceClient mpi.FileServiceClient, agentConfig *config.Config) *FileManagerService { +func NewFileManagerService(fileServiceClient mpi.FileServiceClient, agentConfig *config.Config, + manifestLock *sync.RWMutex, +) *FileManagerService { return &FileManagerService{ agentConfig: agentConfig, - fileOperator: NewFileOperator(), - fileServiceOperator: NewFileServiceOperator(agentConfig, fileServiceClient), + fileOperator: NewFileOperator(manifestLock), + fileServiceOperator: NewFileServiceOperator(agentConfig, fileServiceClient, manifestLock), fileActions: make(map[string]*model.FileCache), rollbackFileContents: make(map[string][]byte), currentFilesOnDisk: make(map[string]*mpi.File), previousManifestFiles: make(map[string]*model.ManifestFile), manifestFilePath: agentConfig.ManifestDir + "/manifest.json", + manifestLock: manifestLock, } } @@ -423,9 +427,13 @@ func (fms *FileManagerService) UpdateCurrentFilesOnDisk( // seems to be a control flag, avoid control coupling // nolint: revive func (fms *FileManagerService) UpdateManifestFile(currentFiles map[string]*mpi.File, referenced bool) (err error) { + slog.Debug("Updating manifest file", "current_files", currentFiles, "referenced", referenced) currentManifestFiles, _, readError := fms.manifestFile() fms.previousManifestFiles = currentManifestFiles if readError != nil && !errors.Is(readError, os.ErrNotExist) { + slog.Debug("Error reading manifest file", "current_manifest_files", + currentManifestFiles, "updated_files", currentFiles, "referenced", referenced) + return fmt.Errorf("unable to read manifest file: %w", readError) } @@ -457,6 +465,8 @@ func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, m return nil, nil, err } + fms.manifestLock.Lock() + defer fms.manifestLock.Unlock() file, err := os.ReadFile(fms.manifestFilePath) if err != nil { return nil, nil, fmt.Errorf("failed to read manifest file: %w", err) @@ -466,6 +476,10 @@ func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, m err = json.Unmarshal(file, &manifestFiles) if err != nil { + if len(file) == 0 { + return nil, nil, fmt.Errorf("manifest file is empty: %w", err) + } + return nil, nil, fmt.Errorf("failed to parse manifest file: %w", err) } diff --git a/internal/file/file_manager_service_test.go b/internal/file/file_manager_service_test.go index 4cff68071..6adbf62e9 100644 --- a/internal/file/file_manager_service_test.go +++ b/internal/file/file_manager_service_test.go @@ -11,6 +11,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "testing" "github.com/nginx/agent/v3/internal/model" @@ -54,7 +55,7 @@ func TestFileManagerService_ConfigApply_Add(t *testing.T) { agentConfig := types.AgentConfig() agentConfig.AllowedDirectories = []string{tempDir} - fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig) + fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig, &sync.RWMutex{}) fileManagerService.agentConfig.ManifestDir = manifestDirPath fileManagerService.manifestFilePath = manifestFilePath @@ -101,7 +102,7 @@ func TestFileManagerService_ConfigApply_Add_LargeFile(t *testing.T) { fakeFileServiceClient.GetFileStreamReturns(fakeServerStreamingClient, nil) agentConfig := types.AgentConfig() agentConfig.AllowedDirectories = []string{tempDir} - fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig) + fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig, &sync.RWMutex{}) fileManagerService.agentConfig.ManifestDir = manifestDirPath fileManagerService.manifestFilePath = manifestFilePath @@ -160,7 +161,7 @@ func TestFileManagerService_ConfigApply_Update(t *testing.T) { agentConfig := types.AgentConfig() agentConfig.AllowedDirectories = []string{tempDir} - fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig) + fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig, &sync.RWMutex{}) fileManagerService.agentConfig.ManifestDir = manifestDirPath fileManagerService.manifestFilePath = manifestFilePath err := fileManagerService.UpdateCurrentFilesOnDisk(ctx, filesOnDisk, false) @@ -209,7 +210,7 @@ func TestFileManagerService_ConfigApply_Delete(t *testing.T) { agentConfig := types.AgentConfig() agentConfig.AllowedDirectories = []string{tempDir} - fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig) + fileManagerService := NewFileManagerService(fakeFileServiceClient, agentConfig, &sync.RWMutex{}) fileManagerService.agentConfig.ManifestDir = manifestDirPath fileManagerService.manifestFilePath = manifestFilePath err := fileManagerService.UpdateCurrentFilesOnDisk(ctx, filesOnDisk, false) @@ -247,7 +248,7 @@ func TestFileManagerService_ConfigApply_Delete(t *testing.T) { func TestFileManagerService_checkAllowedDirectory(t *testing.T) { fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) + fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig(), &sync.RWMutex{}) allowedFiles := []*mpi.File{ { @@ -281,7 +282,7 @@ func TestFileManagerService_checkAllowedDirectory(t *testing.T) { func TestFileManagerService_ClearCache(t *testing.T) { fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) + fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig(), &sync.RWMutex{}) filesCache := map[string]*model.FileCache{ "file/path/test.conf": { @@ -394,7 +395,7 @@ func TestFileManagerService_Rollback(t *testing.T) { instanceID := protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId() fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) + fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig(), &sync.RWMutex{}) fileManagerService.rollbackFileContents = fileContentCache fileManagerService.fileActions = filesCache fileManagerService.agentConfig.ManifestDir = manifestDirPath @@ -576,7 +577,7 @@ func TestFileManagerService_DetermineFileActions(t *testing.T) { manifestFilePath := manifestFile.Name() fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) + fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig(), &sync.RWMutex{}) fileManagerService.agentConfig.ManifestDir = manifestDirPath fileManagerService.manifestFilePath = manifestFilePath @@ -597,7 +598,7 @@ func TestFileManagerService_DetermineFileActions(t *testing.T) { func CreateTestManifestFile(t testing.TB, tempDir string, currentFiles map[string]*mpi.File) *os.File { t.Helper() fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) + fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig(), &sync.RWMutex{}) manifestFiles := fileManagerService.convertToManifestFileMap(currentFiles, true) manifestJSON, err := json.MarshalIndent(manifestFiles, "", " ") require.NoError(t, err) @@ -685,7 +686,7 @@ func TestFileManagerService_fileActions(t *testing.T) { Contents: newFileContent, }, }, nil) - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) + fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig(), &sync.RWMutex{}) fileManagerService.fileActions = filesCache diff --git a/internal/file/file_operator.go b/internal/file/file_operator.go index 88539ce3a..fe68c4c81 100644 --- a/internal/file/file_operator.go +++ b/internal/file/file_operator.go @@ -14,6 +14,7 @@ import ( "log/slog" "os" "path" + "sync" "github.com/nginx/agent/v3/internal/model" @@ -24,14 +25,18 @@ import ( mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" ) -type FileOperator struct{} +type FileOperator struct { + manifestLock *sync.RWMutex +} var _ fileOperator = (*FileOperator)(nil) // FileOperator only purpose is to write files, -func NewFileOperator() *FileOperator { - return &FileOperator{} +func NewFileOperator(manifestLock *sync.RWMutex) *FileOperator { + return &FileOperator{ + manifestLock: manifestLock, + } } func (fo *FileOperator) Write(ctx context.Context, fileContent []byte, file *mpi.FileMeta) error { @@ -152,6 +157,8 @@ func (fo *FileOperator) WriteManifestFile(updatedFiles map[string]*model.Manifes return fmt.Errorf("unable to marshal manifest file json: %w", err) } + fo.manifestLock.Lock() + defer fo.manifestLock.Unlock() // 0755 allows read/execute for all, write for owner if err = os.MkdirAll(manifestDir, dirPerm); err != nil { return fmt.Errorf("unable to create directory %s: %w", manifestDir, err) diff --git a/internal/file/file_operator_test.go b/internal/file/file_operator_test.go index 182c87025..78ea9a193 100644 --- a/internal/file/file_operator_test.go +++ b/internal/file/file_operator_test.go @@ -9,6 +9,7 @@ import ( "context" "os" "path/filepath" + "sync" "testing" "github.com/nginx/agent/v3/pkg/files" @@ -28,7 +29,7 @@ func TestFileOperator_Write(t *testing.T) { fileContent, err := os.ReadFile("../../test/config/nginx/nginx.conf") require.NoError(t, err) defer helpers.RemoveFileWithErrorCheck(t, filePath) - fileOp := NewFileOperator() + fileOp := NewFileOperator(&sync.RWMutex{}) fileMeta := protos.FileMeta(filePath, files.GenerateHash(fileContent)) diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index 330c02335..6818418f3 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -8,6 +8,7 @@ package file import ( "context" "log/slog" + "sync" "github.com/nginx/agent/v3/pkg/files" "github.com/nginx/agent/v3/pkg/id" @@ -27,6 +28,7 @@ var _ bus.Plugin = (*FilePlugin)(nil) // the file plugin does not care about the instance type type FilePlugin struct { + manifestLock *sync.RWMutex messagePipe bus.MessagePipeInterface config *config.Config conn grpc.GrpcConnectionInterface @@ -35,12 +37,13 @@ type FilePlugin struct { } func NewFilePlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface, - serverType model.ServerType, + serverType model.ServerType, manifestLock *sync.RWMutex, ) *FilePlugin { return &FilePlugin{ - config: agentConfig, - conn: grpcConnection, - serverType: serverType, + config: agentConfig, + conn: grpcConnection, + serverType: serverType, + manifestLock: manifestLock, } } @@ -52,7 +55,7 @@ func (fp *FilePlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInter slog.DebugContext(ctx, "Starting file plugin") fp.messagePipe = messagePipe - fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config) + fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config, fp.manifestLock) return nil } @@ -145,7 +148,7 @@ func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Messag fp.conn = newConnection reconnect = fp.fileManagerService.IsConnected() - fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config) + fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config, fp.manifestLock) fp.fileManagerService.SetIsConnected(reconnect) slog.DebugContext(ctx, "File manager service client reset successfully") diff --git a/internal/file/file_plugin_test.go b/internal/file/file_plugin_test.go index f1cb08403..b14ab7cac 100644 --- a/internal/file/file_plugin_test.go +++ b/internal/file/file_plugin_test.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "os" + "sync" "testing" "time" @@ -31,7 +32,8 @@ import ( ) func TestFilePlugin_Info(t *testing.T) { - filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command) + filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) assert.Equal(t, "file", filePlugin.Info().Name) } @@ -39,14 +41,15 @@ func TestFilePlugin_Close(t *testing.T) { ctx := context.Background() fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command) + filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) filePlugin.Close(ctx) assert.Equal(t, 1, fakeGrpcConnection.CloseCallCount()) } func TestFilePlugin_Subscriptions(t *testing.T) { - filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command) + filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Command, &sync.RWMutex{}) assert.Equal( t, []string{ @@ -62,7 +65,8 @@ func TestFilePlugin_Subscriptions(t *testing.T) { filePlugin.Subscriptions(), ) - readOnlyFilePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Auxiliary) + readOnlyFilePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, + model.Auxiliary, &sync.RWMutex{}) assert.Equal(t, []string{ bus.ConnectionResetTopic, bus.ConnectionCreatedTopic, @@ -93,7 +97,7 @@ func TestFilePlugin_Process_NginxConfigUpdateTopic(t *testing.T) { fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command) + filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -168,7 +172,7 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) { fakeFileManagerService := &filefakes.FakeFileManagerServiceInterface{} fakeFileManagerService.ConfigApplyReturns(test.configApplyStatus, test.configApplyReturnsErr) messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command) + filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) err := filePlugin.Init(ctx, messagePipe) filePlugin.fileManagerService = fakeFileManagerService require.NoError(t, err) @@ -266,7 +270,7 @@ func TestFilePlugin_Process_ConfigUploadRequestTopic(t *testing.T) { fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command) + filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -321,7 +325,7 @@ func TestFilePlugin_Process_ConfigUploadRequestTopic_Failure(t *testing.T) { fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command) + filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command, &sync.RWMutex{}) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -389,7 +393,7 @@ func TestFilePlugin_Process_ConfigApplyFailedTopic(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() agentConfig := types.AgentConfig() - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command) + filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -436,7 +440,7 @@ func TestFilePlugin_Process_ConfigApplyRollbackCompleteTopic(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() agentConfig := types.AgentConfig() fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command) + filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -481,7 +485,7 @@ func TestFilePlugin_Process_ConfigApplyCompleteTopic(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() agentConfig := types.AgentConfig() fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command) + filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command, &sync.RWMutex{}) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go index 6bf835fa0..dc33d3d3a 100644 --- a/internal/file/file_service_operator.go +++ b/internal/file/file_service_operator.go @@ -15,6 +15,7 @@ import ( "math" "os" "slices" + "sync" "sync/atomic" "github.com/cenkalti/backoff/v4" @@ -41,14 +42,16 @@ type FileServiceOperator struct { var _ fileServiceOperatorInterface = (*FileServiceOperator)(nil) -func NewFileServiceOperator(agentConfig *config.Config, fileServiceClient mpi.FileServiceClient) *FileServiceOperator { +func NewFileServiceOperator(agentConfig *config.Config, fileServiceClient mpi.FileServiceClient, + manifestLock *sync.RWMutex, +) *FileServiceOperator { isConnected := &atomic.Bool{} isConnected.Store(false) return &FileServiceOperator{ fileServiceClient: fileServiceClient, agentConfig: agentConfig, - fileOperator: NewFileOperator(), + fileOperator: NewFileOperator(manifestLock), isConnected: isConnected, } } diff --git a/internal/file/file_service_operator_test.go b/internal/file/file_service_operator_test.go index 632749fbb..389bb6c6b 100644 --- a/internal/file/file_service_operator_test.go +++ b/internal/file/file_service_operator_test.go @@ -9,6 +9,7 @@ import ( "context" "os" "path/filepath" + "sync" "sync/atomic" "testing" @@ -45,7 +46,7 @@ func TestFileServiceOperator_UpdateOverview(t *testing.T) { fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil) - fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient) + fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient, &sync.RWMutex{}) fileServiceOperator.SetIsConnected(true) err := fileServiceOperator.UpdateOverview(ctx, "123", []*mpi.File{ @@ -83,7 +84,7 @@ func TestFileServiceOperator_UpdateOverview_MaxIterations(t *testing.T) { fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil) - fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient) + fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient, &sync.RWMutex{}) fileServiceOperator.SetIsConnected(true) err := fileServiceOperator.UpdateOverview(ctx, "123", []*mpi.File{ @@ -126,7 +127,7 @@ func TestFileManagerService_UpdateFile(t *testing.T) { } fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient) + fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient, &sync.RWMutex{}) fileServiceOperator.SetIsConnected(true) err := fileServiceOperator.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta}) @@ -150,7 +151,7 @@ func TestFileManagerService_UpdateFile_LargeFile(t *testing.T) { fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} fakeClientStreamingClient := &FakeClientStreamingClient{sendCount: atomic.Int32{}} fakeFileServiceClient.UpdateFileStreamReturns(fakeClientStreamingClient, nil) - fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient) + fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient, &sync.RWMutex{}) fileServiceOperator.SetIsConnected(true) err := fileServiceOperator.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta}) diff --git a/internal/plugin/plugin_manager.go b/internal/plugin/plugin_manager.go index 30bcfc637..3f2c3869a 100644 --- a/internal/plugin/plugin_manager.go +++ b/internal/plugin/plugin_manager.go @@ -8,6 +8,7 @@ package plugin import ( "context" "log/slog" + "sync" "github.com/nginx/agent/v3/internal/model" @@ -27,9 +28,11 @@ import ( func LoadPlugins(ctx context.Context, agentConfig *config.Config) []bus.Plugin { plugins := make([]bus.Plugin, 0) + manifestLock := &sync.RWMutex{} + plugins = addResourcePlugin(plugins, agentConfig) - plugins = addCommandAndFilePlugins(ctx, plugins, agentConfig) - plugins = addAuxiliaryCommandAndFilePlugins(ctx, plugins, agentConfig) + plugins = addCommandAndFilePlugins(ctx, plugins, agentConfig, manifestLock) + plugins = addAuxiliaryCommandAndFilePlugins(ctx, plugins, agentConfig, manifestLock) plugins = addCollectorPlugin(ctx, agentConfig, plugins) plugins = addWatcherPlugin(plugins, agentConfig) @@ -43,7 +46,9 @@ func addResourcePlugin(plugins []bus.Plugin, agentConfig *config.Config) []bus.P return plugins } -func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentConfig *config.Config) []bus.Plugin { +func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentConfig *config.Config, + manifestLock *sync.RWMutex, +) []bus.Plugin { if agentConfig.IsCommandGrpcClientConfigured() { grpcConnection, err := grpc.NewGrpcConnection(ctx, agentConfig, agentConfig.Command) if err != nil { @@ -51,7 +56,7 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo } else { commandPlugin := command.NewCommandPlugin(agentConfig, grpcConnection, model.Command) plugins = append(plugins, commandPlugin) - filePlugin := file.NewFilePlugin(agentConfig, grpcConnection, model.Command) + filePlugin := file.NewFilePlugin(agentConfig, grpcConnection, model.Command, manifestLock) plugins = append(plugins, filePlugin) } } else { @@ -63,7 +68,7 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo } func addAuxiliaryCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, - agentConfig *config.Config, + agentConfig *config.Config, manifestLock *sync.RWMutex, ) []bus.Plugin { if agentConfig.IsAuxiliaryCommandGrpcClientConfigured() { auxGRPCConnection, err := grpc.NewGrpcConnection(ctx, agentConfig, agentConfig.AuxiliaryCommand) @@ -72,7 +77,7 @@ func addAuxiliaryCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin } else { auxCommandPlugin := command.NewCommandPlugin(agentConfig, auxGRPCConnection, model.Auxiliary) plugins = append(plugins, auxCommandPlugin) - readFilePlugin := file.NewFilePlugin(agentConfig, auxGRPCConnection, model.Auxiliary) + readFilePlugin := file.NewFilePlugin(agentConfig, auxGRPCConnection, model.Auxiliary, manifestLock) plugins = append(plugins, readFilePlugin) } } else { diff --git a/internal/watcher/file/file_watcher_service.go b/internal/watcher/file/file_watcher_service.go index 9b95fe7c7..dff86b117 100644 --- a/internal/watcher/file/file_watcher_service.go +++ b/internal/watcher/file/file_watcher_service.go @@ -195,7 +195,7 @@ func (fws *FileWatcherService) checkForUpdates(ctx context.Context, ch chan<- Fi // Check if directories no longer need to be watched fws.removeWatchers(ctx) - if fws.filesChanged.Load() { + if fws.filesChanged.Load() && fws.enabled.Load() { newCtx := context.WithValue( ctx, logger.CorrelationIDContextKey, diff --git a/test/config/agent/nginx-agent-with-auxiliary-command.conf b/test/config/agent/nginx-agent-with-auxiliary-command.conf new file mode 100644 index 000000000..759e8b8fd --- /dev/null +++ b/test/config/agent/nginx-agent-with-auxiliary-command.conf @@ -0,0 +1,26 @@ +# +# /etc/nginx-agent/nginx-agent.conf +# +# Configuration file for NGINX Agent. +# + +log: + level: debug + +command: + server: + host: managementPlane + port: 9092 + type: grpc + +auxiliary_command: + server: + host: managementPlaneAuxiliary + port: 9095 + type: grpc + + +allowed_directories: + - /etc/nginx + - /usr/local/etc/nginx + - /usr/share/nginx/modules diff --git a/test/helpers/test_containers_utils.go b/test/helpers/test_containers_utils.go index 60f4c3000..baee47d40 100644 --- a/test/helpers/test_containers_utils.go +++ b/test/helpers/test_containers_utils.go @@ -263,6 +263,39 @@ func StartMockManagementPlaneGrpcContainer( return container } +func StartAuxiliaryMockManagementPlaneGrpcContainer(ctx context.Context, tb testing.TB, + containerNetwork *testcontainers.DockerNetwork, +) testcontainers.Container { + tb.Helper() + req := testcontainers.ContainerRequest{ + FromDockerfile: testcontainers.FromDockerfile{ + Context: "../../../", + Dockerfile: "./test/integration/auxiliarycommandserver/Dockerfile", + KeepImage: false, + PrintBuildLog: true, + }, + ExposedPorts: []string{"9095/tcp", "9096/tcp"}, + Networks: []string{ + containerNetwork.Name, + }, + NetworkAliases: map[string][]string{ + containerNetwork.Name: { + "managementPlaneAuxiliary", + }, + }, + WaitingFor: wait.ForLog("Starting mock management plane gRPC server"), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + + require.NoError(tb, err) + + return container +} + func ToPtr[T any](value T) *T { return &value } @@ -274,6 +307,7 @@ func LogAndTerminateContainers( mockManagementPlaneContainer testcontainers.Container, agentContainer testcontainers.Container, expectNoErrorsInLogs bool, + auxiliaryMockManagementPlaneContainer testcontainers.Container, ) { tb.Helper() @@ -285,6 +319,8 @@ func LogAndTerminateContainers( require.NoError(tb, err) logs := string(buf) + assert.NotContains(tb, logs, "manifest file is empty", + "Error reading manifest file found in agent log") tb.Log(logs) if expectNoErrorsInLogs { assert.NotContains(tb, logs, "level=ERROR", "agent log file contains logs at error level") @@ -307,4 +343,19 @@ func LogAndTerminateContainers( err = mockManagementPlaneContainer.Terminate(ctx) require.NoError(tb, err) } + + if auxiliaryMockManagementPlaneContainer != nil { + tb.Log("======================== Logging Auxiliary Mock Management Container Logs ========================") + logReader, err = auxiliaryMockManagementPlaneContainer.Logs(ctx) + require.NoError(tb, err) + + buf, err = io.ReadAll(logReader) + require.NoError(tb, err) + logs = string(buf) + + tb.Log(logs) + + err = auxiliaryMockManagementPlaneContainer.Terminate(ctx) + require.NoError(tb, err) + } } diff --git a/test/integration/auxiliarycommandserver/Dockerfile b/test/integration/auxiliarycommandserver/Dockerfile new file mode 100644 index 000000000..a4904e7ba --- /dev/null +++ b/test/integration/auxiliarycommandserver/Dockerfile @@ -0,0 +1,8 @@ +FROM debian:buster-slim + +WORKDIR /mock-management-plane-grpc +COPY ./build/mock-management-plane-grpc ./ + +RUN mkdir config/ + +CMD ["/mock-management-plane-grpc/server", "--grpcAddress", "0.0.0.0:9095", "--apiAddress", "0.0.0.0:9096", "--configDirectory", "/mock-management-plane-grpc/config", "--logLevel", "DEBUG"] diff --git a/test/integration/auxiliarycommandserver/connection_test.go b/test/integration/auxiliarycommandserver/connection_test.go new file mode 100644 index 000000000..4b2204daa --- /dev/null +++ b/test/integration/auxiliarycommandserver/connection_test.go @@ -0,0 +1,203 @@ +// Copyright (c) F5, Inc. +// +// This source code is licensed under the Apache License, Version 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +package auxiliarycommandserver + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "sort" + "testing" + "time" + + "github.com/go-resty/resty/v2" + + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/test/integration/utils" + "github.com/stretchr/testify/suite" +) + +type AuxiliaryTestSuite struct { + suite.Suite + teardownTest func(tb testing.TB) + instanceID string +} + +func (s *AuxiliaryTestSuite) SetupSuite() { + t := s.T() + // Expect errors in logs should be false for recconnection tests + // For now for these test we will skip checking the logs for errors + s.teardownTest = utils.SetupConnectionTest(t, false, false, true, + "../../config/agent/nginx-agent-with-auxiliary-command.conf") +} + +func (s *AuxiliaryTestSuite) TearDownSuite() { + s.teardownTest(s.T()) +} + +func TestSuite(t *testing.T) { + suite.Run(t, new(AuxiliaryTestSuite)) +} + +func (s *AuxiliaryTestSuite) TestAuxiliary_Test1_Connection() { + s.instanceID = utils.VerifyConnection(s.T(), 2, utils.MockManagementPlaneAPIAddress) + s.False(s.T().Failed()) + utils.VerifyUpdateDataPlaneHealth(s.T(), utils.MockManagementPlaneAPIAddress) + + utils.VerifyConnection(s.T(), 2, utils.AuxiliaryMockManagementPlaneAPIAddress) + s.False(s.T().Failed()) + utils.VerifyUpdateDataPlaneHealth(s.T(), utils.AuxiliaryMockManagementPlaneAPIAddress) + + commandResponses := utils.ManagementPlaneResponses(s.T(), 1, utils.MockManagementPlaneAPIAddress) + s.Equal(mpi.CommandResponse_COMMAND_STATUS_OK, commandResponses[0].GetCommandResponse().GetStatus()) + s.Equal("Successfully updated all files", commandResponses[0].GetCommandResponse().GetMessage()) +} + +func (s *AuxiliaryTestSuite) TestAuxiliary_Test2_Reconnection() { + ctx := context.Background() + timeout := 15 * time.Second + + originalID := utils.VerifyConnection(s.T(), 2, utils.AuxiliaryMockManagementPlaneAPIAddress) + stopErr := utils.AuxiliaryMockManagementPlaneGrpcContainer.Stop(context.Background(), &timeout) + + s.Require().NoError(stopErr) + + utils.VerifyConnection(s.T(), 2, utils.MockManagementPlaneAPIAddress) + s.False(s.T().Failed()) + + startErr := utils.AuxiliaryMockManagementPlaneGrpcContainer.Start(ctx) + s.Require().NoError(startErr) + + ipAddress, err := utils.AuxiliaryMockManagementPlaneGrpcContainer.Host(ctx) + s.Require().NoError(err) + ports, err := utils.AuxiliaryMockManagementPlaneGrpcContainer.Ports(ctx) + s.Require().NoError(err) + utils.AuxiliaryMockManagementPlaneAPIAddress = net.JoinHostPort(ipAddress, ports["9096/tcp"][0].HostPort) + + currentID := utils.VerifyConnection(s.T(), 2, utils.AuxiliaryMockManagementPlaneAPIAddress) + s.Equal(originalID, currentID) +} + +func (s *AuxiliaryTestSuite) TestAuxiliary_Test3_DataplaneHealthRequest() { + utils.ClearManagementPlaneResponses(s.T(), utils.MockManagementPlaneAPIAddress) + utils.ClearManagementPlaneResponses(s.T(), utils.AuxiliaryMockManagementPlaneAPIAddress) + + request := `{ + "message_meta": { + "message_id": "5d0fa83e-351c-4009-90cd-1f2acce2d184", + "correlation_id": "79794c1c-8e91-47c1-a92c-b9a0c3f1a263", + "timestamp": "2023-01-15T01:30:15.01Z" + }, + "health_request": {} + }` + + client := resty.New() + client.SetRetryCount(utils.RetryCount).SetRetryWaitTime(utils.RetryWaitTime).SetRetryMaxWaitTime( + utils.RetryMaxWaitTime) + + url := fmt.Sprintf("http://%s/api/v1/requests", utils.MockManagementPlaneAPIAddress) + resp, err := client.R().EnableTrace().SetBody(request).Post(url) + + s.Require().NoError(err) + s.Equal(http.StatusOK, resp.StatusCode()) + + // Check command server has 2 ManagementPlaneResponses as it has sent the request + commandResponses := utils.ManagementPlaneResponses(s.T(), 1, utils.MockManagementPlaneAPIAddress) + s.Equal(mpi.CommandResponse_COMMAND_STATUS_OK, commandResponses[0].GetCommandResponse().GetStatus()) + s.Equal("Successfully sent health status update", commandResponses[0].GetCommandResponse().GetMessage()) + s.False(s.T().Failed()) + + // Check auxiliary server still only has 1 ManagementPlaneResponses as it didn't send the request + utils.ManagementPlaneResponses(s.T(), 0, utils.AuxiliaryMockManagementPlaneAPIAddress) + s.False(s.T().Failed()) +} + +func (s *AuxiliaryTestSuite) TestAuxiliary_Test4_FileWatcher() { + // Clear any previous responses from previous tests + utils.ClearManagementPlaneResponses(s.T(), utils.MockManagementPlaneAPIAddress) + utils.ClearManagementPlaneResponses(s.T(), utils.AuxiliaryMockManagementPlaneAPIAddress) + ctx := context.Background() + + err := utils.Container.CopyFileToContainer( + ctx, + "../../config/nginx/nginx-with-server-block-access-log.conf", + "/etc/nginx/nginx.conf", + 0o666, + ) + s.Require().NoError(err) + + // Check command server has 2 ManagementPlaneResponses from updating a file on disk + commandResponses := utils.ManagementPlaneResponses(s.T(), 1, utils.MockManagementPlaneAPIAddress) + s.Equal(mpi.CommandResponse_COMMAND_STATUS_OK, commandResponses[0].GetCommandResponse().GetStatus()) + s.Equal("Successfully updated all files", commandResponses[0].GetCommandResponse().GetMessage()) + + // Check auxiliary server has 2 ManagementPlaneResponses from updating a file on disk + auxResponses := utils.ManagementPlaneResponses(s.T(), 1, utils.AuxiliaryMockManagementPlaneAPIAddress) + s.Equal(mpi.CommandResponse_COMMAND_STATUS_OK, auxResponses[0].GetCommandResponse().GetStatus()) + s.Equal("Successfully updated all files", auxResponses[0].GetCommandResponse().GetMessage()) +} + +func (s *AuxiliaryTestSuite) TestAuxiliary_Test5_ConfigApply() { + utils.ClearManagementPlaneResponses(s.T(), utils.MockManagementPlaneAPIAddress) + utils.ClearManagementPlaneResponses(s.T(), utils.AuxiliaryMockManagementPlaneAPIAddress) + + ctx := context.Background() + + newConfigFile := "../../config/nginx/nginx-with-test-location.conf" + + if os.Getenv("IMAGE_PATH") == "/nginx-plus/agent" { + newConfigFile = "../../config/nginx/nginx-plus-with-test-location.conf" + } + + err := utils.MockManagementPlaneGrpcContainer.CopyFileToContainer( + ctx, + newConfigFile, + fmt.Sprintf("/mock-management-plane-grpc/config/%s/etc/nginx/nginx.conf", s.instanceID), + 0o666, + ) + + s.Require().NoError(err) + + utils.PerformConfigApply(s.T(), s.instanceID, utils.MockManagementPlaneAPIAddress) + + commandResponses := utils.ManagementPlaneResponses(s.T(), 2, utils.MockManagementPlaneAPIAddress) + + sort.Slice(commandResponses, func(i, j int) bool { + return commandResponses[i].GetCommandResponse().GetMessage() < + commandResponses[j].GetCommandResponse().GetMessage() + }) + + s.Equal(mpi.CommandResponse_COMMAND_STATUS_OK, commandResponses[0].GetCommandResponse().GetStatus()) + s.Equal("Config apply successful", commandResponses[0].GetCommandResponse().GetMessage()) + s.Equal(mpi.CommandResponse_COMMAND_STATUS_OK, commandResponses[1].GetCommandResponse().GetStatus()) + s.Equal("Successfully updated all files", commandResponses[1].GetCommandResponse().GetMessage()) + + auxResponses := utils.ManagementPlaneResponses(s.T(), 1, utils.AuxiliaryMockManagementPlaneAPIAddress) + s.Equal(mpi.CommandResponse_COMMAND_STATUS_OK, auxResponses[0].GetCommandResponse().GetStatus()) + s.Equal("Successfully updated all files", auxResponses[0].GetCommandResponse().GetMessage()) + + // Check the config version is the same in both command and auxiliary servers + commandOverview := utils.CurrentFileOverview(s.T(), s.instanceID, utils.MockManagementPlaneAPIAddress) + auxOverview := utils.CurrentFileOverview(s.T(), s.instanceID, utils.AuxiliaryMockManagementPlaneAPIAddress) + s.Equal(commandOverview.GetConfigVersion(), auxOverview.GetConfigVersion()) +} + +func (s *AuxiliaryTestSuite) TestAuxiliary_Test6_ConfigApplyInvalid() { + utils.ClearManagementPlaneResponses(s.T(), utils.MockManagementPlaneAPIAddress) + utils.ClearManagementPlaneResponses(s.T(), utils.AuxiliaryMockManagementPlaneAPIAddress) + + utils.PerformConfigApply(s.T(), s.instanceID, utils.AuxiliaryMockManagementPlaneAPIAddress) + + commandResponses := utils.ManagementPlaneResponses(s.T(), 1, + utils.AuxiliaryMockManagementPlaneAPIAddress) + s.Equal(mpi.CommandResponse_COMMAND_STATUS_FAILURE, + commandResponses[0].GetCommandResponse().GetStatus()) + s.Equal("Config apply failed", commandResponses[0].GetCommandResponse().GetMessage()) + s.Equal("Unable to process request. Management plane is configured as read only.", + commandResponses[0].GetCommandResponse().GetError()) +} diff --git a/test/integration/installuninstall/install_uninstall_test.go b/test/integration/installuninstall/install_uninstall_test.go index 4432999e0..a0137e9eb 100644 --- a/test/integration/installuninstall/install_uninstall_test.go +++ b/test/integration/installuninstall/install_uninstall_test.go @@ -66,6 +66,7 @@ func installUninstallSetup(tb testing.TB, expectNoErrorsInLogs bool) (testcontai nil, testContainer, expectNoErrorsInLogs, + nil, ) } } diff --git a/test/integration/managementplane/config_apply_test.go b/test/integration/managementplane/config_apply_test.go index a221ceb51..6fcb48096 100644 --- a/test/integration/managementplane/config_apply_test.go +++ b/test/integration/managementplane/config_apply_test.go @@ -26,20 +26,20 @@ const ( func TestGrpc_ConfigApply(t *testing.T) { ctx := context.Background() - teardownTest := utils.SetupConnectionTest(t, false, false, + teardownTest := utils.SetupConnectionTest(t, false, false, false, "../../config/agent/nginx-config-with-grpc-client.conf") defer teardownTest(t) - nginxInstanceID := utils.VerifyConnection(t, 2) + nginxInstanceID := utils.VerifyConnection(t, 2, utils.MockManagementPlaneAPIAddress) - responses := utils.ManagementPlaneResponses(t, 1) + responses := utils.ManagementPlaneResponses(t, 1, utils.MockManagementPlaneAPIAddress) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully updated all files", responses[0].GetCommandResponse().GetMessage()) t.Run("Test 1: No config changes", func(t *testing.T) { - utils.ClearManagementPlaneResponses(t) - utils.PerformConfigApply(t, nginxInstanceID) - responses = utils.ManagementPlaneResponses(t, 1) + utils.ClearManagementPlaneResponses(t, utils.MockManagementPlaneAPIAddress) + utils.PerformConfigApply(t, nginxInstanceID, utils.MockManagementPlaneAPIAddress) + responses = utils.ManagementPlaneResponses(t, 1, utils.MockManagementPlaneAPIAddress) t.Logf("Config apply responses: %v", responses) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) @@ -47,7 +47,7 @@ func TestGrpc_ConfigApply(t *testing.T) { }) t.Run("Test 2: Valid config", func(t *testing.T) { - utils.ClearManagementPlaneResponses(t) + utils.ClearManagementPlaneResponses(t, utils.MockManagementPlaneAPIAddress) newConfigFile := "../../config/nginx/nginx-with-test-location.conf" if os.Getenv("IMAGE_PATH") == "/nginx-plus/agent" { @@ -62,9 +62,9 @@ func TestGrpc_ConfigApply(t *testing.T) { ) require.NoError(t, err) - utils.PerformConfigApply(t, nginxInstanceID) + utils.PerformConfigApply(t, nginxInstanceID, utils.MockManagementPlaneAPIAddress) - responses = utils.ManagementPlaneResponses(t, 2) + responses = utils.ManagementPlaneResponses(t, 2, utils.MockManagementPlaneAPIAddress) t.Logf("Config apply responses: %v", responses) sort.Slice(responses, func(i, j int) bool { @@ -78,7 +78,7 @@ func TestGrpc_ConfigApply(t *testing.T) { }) t.Run("Test 3: Invalid config", func(t *testing.T) { - utils.ClearManagementPlaneResponses(t) + utils.ClearManagementPlaneResponses(t, utils.MockManagementPlaneAPIAddress) err := utils.MockManagementPlaneGrpcContainer.CopyFileToContainer( ctx, "../../config/nginx/invalid-nginx.conf", @@ -87,9 +87,9 @@ func TestGrpc_ConfigApply(t *testing.T) { ) require.NoError(t, err) - utils.PerformConfigApply(t, nginxInstanceID) + utils.PerformConfigApply(t, nginxInstanceID, utils.MockManagementPlaneAPIAddress) - responses = utils.ManagementPlaneResponses(t, 2) + responses = utils.ManagementPlaneResponses(t, 2, utils.MockManagementPlaneAPIAddress) t.Logf("Config apply responses: %v", responses) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_ERROR, responses[0].GetCommandResponse().GetStatus()) @@ -101,10 +101,10 @@ func TestGrpc_ConfigApply(t *testing.T) { }) t.Run("Test 4: File not in allowed directory", func(t *testing.T) { - utils.ClearManagementPlaneResponses(t) + utils.ClearManagementPlaneResponses(t, utils.MockManagementPlaneAPIAddress) utils.PerformInvalidConfigApply(t, nginxInstanceID) - responses = utils.ManagementPlaneResponses(t, 1) + responses = utils.ManagementPlaneResponses(t, 1, utils.MockManagementPlaneAPIAddress) t.Logf("Config apply responses: %v", responses) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_FAILURE, responses[0].GetCommandResponse().GetStatus()) @@ -119,17 +119,17 @@ func TestGrpc_ConfigApply(t *testing.T) { func TestGrpc_ConfigApply_Chunking(t *testing.T) { ctx := context.Background() - teardownTest := utils.SetupConnectionTest(t, false, false, + teardownTest := utils.SetupConnectionTest(t, false, false, false, "../../config/agent/nginx-config-with-max-file-size.conf") defer teardownTest(t) - nginxInstanceID := utils.VerifyConnection(t, 2) + nginxInstanceID := utils.VerifyConnection(t, 2, utils.MockManagementPlaneAPIAddress) - responses := utils.ManagementPlaneResponses(t, 1) + responses := utils.ManagementPlaneResponses(t, 1, utils.MockManagementPlaneAPIAddress) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully updated all files", responses[0].GetCommandResponse().GetMessage()) - utils.ClearManagementPlaneResponses(t) + utils.ClearManagementPlaneResponses(t, utils.MockManagementPlaneAPIAddress) newConfigFile := "../../config/nginx/nginx-1mb-file.conf" @@ -141,9 +141,9 @@ func TestGrpc_ConfigApply_Chunking(t *testing.T) { ) require.NoError(t, err) - utils.PerformConfigApply(t, nginxInstanceID) + utils.PerformConfigApply(t, nginxInstanceID, utils.MockManagementPlaneAPIAddress) - responses = utils.ManagementPlaneResponses(t, 2) + responses = utils.ManagementPlaneResponses(t, 2, utils.MockManagementPlaneAPIAddress) t.Logf("Config apply responses: %v", responses) sort.Slice(responses, func(i, j int) bool { diff --git a/test/integration/managementplane/config_upload_test.go b/test/integration/managementplane/config_upload_test.go index 3841f7d1a..35d57a628 100644 --- a/test/integration/managementplane/config_upload_test.go +++ b/test/integration/managementplane/config_upload_test.go @@ -19,14 +19,14 @@ import ( ) func TestGrpc_ConfigUpload(t *testing.T) { - teardownTest := utils.SetupConnectionTest(t, true, false, + teardownTest := utils.SetupConnectionTest(t, true, false, false, "../../config/agent/nginx-config-with-grpc-client.conf") defer teardownTest(t) - nginxInstanceID := utils.VerifyConnection(t, 2) + nginxInstanceID := utils.VerifyConnection(t, 2, utils.MockManagementPlaneAPIAddress) assert.False(t, t.Failed()) - responses := utils.ManagementPlaneResponses(t, 1) + responses := utils.ManagementPlaneResponses(t, 1, utils.MockManagementPlaneAPIAddress) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully updated all files", responses[0].GetCommandResponse().GetMessage()) @@ -58,7 +58,7 @@ func TestGrpc_ConfigUpload(t *testing.T) { require.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode()) - responses = utils.ManagementPlaneResponses(t, 2) + responses = utils.ManagementPlaneResponses(t, 2, utils.MockManagementPlaneAPIAddress) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully updated all files", responses[0].GetCommandResponse().GetMessage()) diff --git a/test/integration/managementplane/file_watcher_test.go b/test/integration/managementplane/file_watcher_test.go index e7d13004e..5d6bb58f8 100644 --- a/test/integration/managementplane/file_watcher_test.go +++ b/test/integration/managementplane/file_watcher_test.go @@ -18,11 +18,11 @@ import ( func TestGrpc_FileWatcher(t *testing.T) { ctx := context.Background() - teardownTest := utils.SetupConnectionTest(t, true, false, + teardownTest := utils.SetupConnectionTest(t, true, false, false, "../../config/agent/nginx-config-with-grpc-client.conf") defer teardownTest(t) - utils.VerifyConnection(t, 2) + utils.VerifyConnection(t, 2, utils.MockManagementPlaneAPIAddress) assert.False(t, t.Failed()) t.Run("Test 1: update nginx config file on data plane", func(t *testing.T) { @@ -34,13 +34,13 @@ func TestGrpc_FileWatcher(t *testing.T) { ) require.NoError(t, err) - responses := utils.ManagementPlaneResponses(t, 2) + responses := utils.ManagementPlaneResponses(t, 2, utils.MockManagementPlaneAPIAddress) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully updated all files", responses[0].GetCommandResponse().GetMessage()) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[1].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully updated all files", responses[1].GetCommandResponse().GetMessage()) - utils.VerifyUpdateDataPlaneStatus(t) + utils.VerifyUpdateDataPlaneStatus(t, utils.MockManagementPlaneAPIAddress) }) t.Run("Test 2: create new nginx config file", func(t *testing.T) { @@ -52,11 +52,11 @@ func TestGrpc_FileWatcher(t *testing.T) { ) require.NoError(t, err) - responses := utils.ManagementPlaneResponses(t, 3) + responses := utils.ManagementPlaneResponses(t, 3, utils.MockManagementPlaneAPIAddress) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[2].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully updated all files", responses[2].GetCommandResponse().GetMessage()) - utils.VerifyUpdateDataPlaneStatus(t) + utils.VerifyUpdateDataPlaneStatus(t, utils.MockManagementPlaneAPIAddress) }) t.Run("Test 3: delete nginx config file", func(t *testing.T) { @@ -66,10 +66,10 @@ func TestGrpc_FileWatcher(t *testing.T) { ) require.NoError(t, err) - responses := utils.ManagementPlaneResponses(t, 4) + responses := utils.ManagementPlaneResponses(t, 4, utils.MockManagementPlaneAPIAddress) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[3].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully updated all files", responses[3].GetCommandResponse().GetMessage()) - utils.VerifyUpdateDataPlaneStatus(t) + utils.VerifyUpdateDataPlaneStatus(t, utils.MockManagementPlaneAPIAddress) }) } diff --git a/test/integration/managementplane/grpc_management_plane_api_test.go b/test/integration/managementplane/grpc_management_plane_api_test.go index 85bc9b7a7..7bee15f28 100644 --- a/test/integration/managementplane/grpc_management_plane_api_test.go +++ b/test/integration/managementplane/grpc_management_plane_api_test.go @@ -23,13 +23,13 @@ import ( func TestGrpc_Reconnection(t *testing.T) { ctx := context.Background() - teardownTest := utils.SetupConnectionTest(t, false, false, + teardownTest := utils.SetupConnectionTest(t, false, false, false, "../../config/agent/nginx-config-with-grpc-client.conf") defer teardownTest(t) timeout := 15 * time.Second - originalID := utils.VerifyConnection(t, 2) + originalID := utils.VerifyConnection(t, 2, utils.MockManagementPlaneAPIAddress) stopErr := utils.MockManagementPlaneGrpcContainer.Stop(ctx, &timeout) @@ -44,29 +44,29 @@ func TestGrpc_Reconnection(t *testing.T) { require.NoError(t, err) utils.MockManagementPlaneAPIAddress = net.JoinHostPort(ipAddress, ports["9093/tcp"][0].HostPort) - currentID := utils.VerifyConnection(t, 2) + currentID := utils.VerifyConnection(t, 2, utils.MockManagementPlaneAPIAddress) assert.Equal(t, originalID, currentID) } // Verify that the agent sends a connection request and an update data plane status request func TestGrpc_StartUp(t *testing.T) { - teardownTest := utils.SetupConnectionTest(t, true, false, + teardownTest := utils.SetupConnectionTest(t, true, false, false, "../../config/agent/nginx-config-with-grpc-client.conf") defer teardownTest(t) - utils.VerifyConnection(t, 2) + utils.VerifyConnection(t, 2, utils.MockManagementPlaneAPIAddress) assert.False(t, t.Failed()) - utils.VerifyUpdateDataPlaneHealth(t) + utils.VerifyUpdateDataPlaneHealth(t, utils.MockManagementPlaneAPIAddress) } func TestGrpc_DataplaneHealthRequest(t *testing.T) { - teardownTest := utils.SetupConnectionTest(t, true, false, + teardownTest := utils.SetupConnectionTest(t, true, false, false, "../../config/agent/nginx-config-with-grpc-client.conf") defer teardownTest(t) - utils.VerifyConnection(t, 2) + utils.VerifyConnection(t, 2, utils.MockManagementPlaneAPIAddress) - responses := utils.ManagementPlaneResponses(t, 1) + responses := utils.ManagementPlaneResponses(t, 1, utils.MockManagementPlaneAPIAddress) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully updated all files", responses[0].GetCommandResponse().GetMessage()) @@ -91,7 +91,7 @@ func TestGrpc_DataplaneHealthRequest(t *testing.T) { require.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode()) - responses = utils.ManagementPlaneResponses(t, 2) + responses = utils.ManagementPlaneResponses(t, 2, utils.MockManagementPlaneAPIAddress) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[1].GetCommandResponse().GetStatus()) assert.Equal(t, "Successfully sent health status update", responses[1].GetCommandResponse().GetMessage()) diff --git a/test/integration/nginxless/nginx_less_mpi_connection_test.go b/test/integration/nginxless/nginx_less_mpi_connection_test.go index 7e626521c..c979fadca 100644 --- a/test/integration/nginxless/nginx_less_mpi_connection_test.go +++ b/test/integration/nginxless/nginx_less_mpi_connection_test.go @@ -15,10 +15,10 @@ import ( // Verify that the agent sends a connection request to Management Plane even when Nginx is not present func TestNginxLessGrpc_Connection(t *testing.T) { - teardownTest := utils.SetupConnectionTest(t, true, true, + teardownTest := utils.SetupConnectionTest(t, true, true, false, "../../config/agent/nginx-config-with-grpc-client.conf") defer teardownTest(t) - utils.VerifyConnection(t, 1) + utils.VerifyConnection(t, 1, utils.MockManagementPlaneAPIAddress) assert.False(t, t.Failed()) } diff --git a/test/integration/utils/config_apply_utils.go b/test/integration/utils/config_apply_utils.go index ac86e3227..bc0c5acb5 100644 --- a/test/integration/utils/config_apply_utils.go +++ b/test/integration/utils/config_apply_utils.go @@ -11,6 +11,9 @@ import ( "testing" "time" + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "google.golang.org/protobuf/encoding/protojson" + "github.com/go-resty/resty/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -22,19 +25,46 @@ const ( RetryMaxWaitTime = 6 * time.Second ) -var MockManagementPlaneAPIAddress string +var ( + MockManagementPlaneAPIAddress string + AuxiliaryMockManagementPlaneAPIAddress string +) -func PerformConfigApply(t *testing.T, nginxInstanceID string) { +func PerformConfigApply(t *testing.T, nginxInstanceID, mockManagementPlaneAPIAddress string) { t.Helper() client := resty.New() client.SetRetryCount(RetryCount).SetRetryWaitTime(RetryWaitTime).SetRetryMaxWaitTime(RetryMaxWaitTime) - url := fmt.Sprintf("http://%s/api/v1/instance/%s/config/apply", MockManagementPlaneAPIAddress, nginxInstanceID) + url := fmt.Sprintf("http://%s/api/v1/instance/%s/config/apply", mockManagementPlaneAPIAddress, nginxInstanceID) resp, err := client.R().EnableTrace().Post(url) + t.Logf("Config ApplyResponse: %s", resp.String()) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode()) +} + +func CurrentFileOverview(t *testing.T, nginxInstanceID, mockManagementPlaneAPIAddress string) *mpi.FileOverview { + t.Helper() + + client := resty.New() + client.SetRetryCount(RetryCount).SetRetryWaitTime(RetryWaitTime).SetRetryMaxWaitTime(RetryMaxWaitTime) + + url := fmt.Sprintf("http://%s/api/v1/instance/%s/config", mockManagementPlaneAPIAddress, nginxInstanceID) + resp, err := client.R().EnableTrace().Get(url) + require.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode()) + + responseData := resp.Body() + + overview := mpi.GetOverviewResponse{} + + pb := protojson.UnmarshalOptions{DiscardUnknown: true} + unmarshalErr := pb.Unmarshal(responseData, &overview) + require.NoError(t, unmarshalErr) + + return overview.GetOverview() } func PerformInvalidConfigApply(t *testing.T, nginxInstanceID string) { diff --git a/test/integration/utils/grpc_management_plane_utils.go b/test/integration/utils/grpc_management_plane_utils.go index cd2e9d894..b79ef5de2 100644 --- a/test/integration/utils/grpc_management_plane_utils.go +++ b/test/integration/utils/grpc_management_plane_utils.go @@ -31,9 +31,11 @@ import ( ) var ( - Container testcontainers.Container - MockManagementPlaneGrpcContainer testcontainers.Container - MockManagementPlaneGrpcAddress string + Container testcontainers.Container + MockManagementPlaneGrpcContainer testcontainers.Container + AuxiliaryMockManagementPlaneGrpcContainer testcontainers.Container + MockManagementPlaneGrpcAddress string + AuxiliaryMockManagementPlaneGrpcAddress string ) const ( @@ -60,12 +62,14 @@ type ( } ) -func SetupConnectionTest(tb testing.TB, expectNoErrorsInLogs, nginxless bool, agentConfig string) func(tb testing.TB) { +func SetupConnectionTest(tb testing.TB, expectNoErrorsInLogs, nginxless, auxiliaryServer bool, + agentConfig string, +) func(tb testing.TB) { tb.Helper() ctx := context.Background() if os.Getenv("TEST_ENV") == "Container" { - setupContainerEnvironment(ctx, tb, nginxless, agentConfig) + setupContainerEnvironment(ctx, tb, nginxless, auxiliaryServer, agentConfig) } else { setupLocalEnvironment(tb) } @@ -80,18 +84,25 @@ func SetupConnectionTest(tb testing.TB, expectNoErrorsInLogs, nginxless bool, ag MockManagementPlaneGrpcContainer, Container, expectNoErrorsInLogs, + AuxiliaryMockManagementPlaneGrpcContainer, ) } } } // setupContainerEnvironment sets up the container environment for testing. -func setupContainerEnvironment(ctx context.Context, tb testing.TB, nginxless bool, agentConfig string) { +// nolint: revive +func setupContainerEnvironment(ctx context.Context, tb testing.TB, nginxless, auxiliaryServer bool, + agentConfig string, +) { tb.Helper() tb.Log("Running tests in a container environment") containerNetwork := createContainerNetwork(ctx, tb) setupMockManagementPlaneGrpc(ctx, tb, containerNetwork) + if auxiliaryServer { + setupAuxiliaryMockManagementPlaneGrpc(ctx, tb, containerNetwork) + } params := &helpers.Parameters{ NginxAgentConfigPath: agentConfig, @@ -134,6 +145,24 @@ func setupMockManagementPlaneGrpc(ctx context.Context, tb testing.TB, containerN tb.Logf("Mock management API server running on %s", MockManagementPlaneAPIAddress) } +func setupAuxiliaryMockManagementPlaneGrpc(ctx context.Context, tb testing.TB, + containerNetwork *testcontainers.DockerNetwork, +) { + tb.Helper() + AuxiliaryMockManagementPlaneGrpcContainer = helpers.StartAuxiliaryMockManagementPlaneGrpcContainer(ctx, + tb, containerNetwork) + AuxiliaryMockManagementPlaneGrpcAddress = "managementPlaneAuxiliary:9095" + tb.Logf("Auxiliary mock management gRPC server running on %s", AuxiliaryMockManagementPlaneGrpcAddress) + + ipAddress, err := AuxiliaryMockManagementPlaneGrpcContainer.Host(ctx) + require.NoError(tb, err) + ports, err := AuxiliaryMockManagementPlaneGrpcContainer.Ports(ctx) + require.NoError(tb, err) + + AuxiliaryMockManagementPlaneAPIAddress = net.JoinHostPort(ipAddress, ports["9096/tcp"][0].HostPort) + tb.Logf("Auxiliary mock management API server running on %s", AuxiliaryMockManagementPlaneAPIAddress) +} + // setupNginxContainer configures and starts the NGINX container. func setupNginxContainer( ctx context.Context, @@ -186,7 +215,9 @@ func setupLocalEnvironment(tb testing.TB) { }(tb) } -func ManagementPlaneResponses(t *testing.T, numberOfExpectedResponses int) []*mpi.DataPlaneResponse { +func ManagementPlaneResponses(t *testing.T, numberOfExpectedResponses int, + mockManagementPlaneAPIAddress string, +) []*mpi.DataPlaneResponse { t.Helper() client := resty.New() @@ -204,7 +235,7 @@ func ManagementPlaneResponses(t *testing.T, numberOfExpectedResponses int) []*mp }, ) - url := fmt.Sprintf("http://%s/api/v1/responses", MockManagementPlaneAPIAddress) + url := fmt.Sprintf("http://%s/api/v1/responses", mockManagementPlaneAPIAddress) resp, err := client.R().EnableTrace().Get(url) require.NoError(t, err) @@ -227,19 +258,19 @@ func ManagementPlaneResponses(t *testing.T, numberOfExpectedResponses int) []*mp return response } -func ClearManagementPlaneResponses(t *testing.T) { +func ClearManagementPlaneResponses(t *testing.T, mockManagementPlaneAPIAddress string) { t.Helper() client := resty.New() - url := fmt.Sprintf("http://%s/api/v1/responses", MockManagementPlaneAPIAddress) + url := fmt.Sprintf("http://%s/api/v1/responses", mockManagementPlaneAPIAddress) resp, err := client.R().EnableTrace().Delete(url) require.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode()) } -func VerifyConnection(t *testing.T, instancesLength int) string { +func VerifyConnection(t *testing.T, instancesLength int, mockManagementPlaneAPIAddress string) string { t.Helper() client := resty.New() @@ -255,7 +286,7 @@ func VerifyConnection(t *testing.T, instancesLength int) string { return r.StatusCode() == http.StatusNotFound || unmarshalErr != nil }, ) - url := fmt.Sprintf("http://%s/api/v1/connection", MockManagementPlaneAPIAddress) + url := fmt.Sprintf("http://%s/api/v1/connection", mockManagementPlaneAPIAddress) t.Logf("Connecting to %s", url) resp, err := client.R().EnableTrace().Get(url) @@ -332,7 +363,7 @@ func VerifyConnection(t *testing.T, instancesLength int) string { return nginxInstanceID } -func VerifyUpdateDataPlaneHealth(t *testing.T) { +func VerifyUpdateDataPlaneHealth(t *testing.T, mockManagementPlaneAPIAddress string) { t.Helper() client := resty.New() @@ -346,7 +377,7 @@ func VerifyUpdateDataPlaneHealth(t *testing.T) { }, ) - url := fmt.Sprintf("http://%s/api/v1/health", MockManagementPlaneAPIAddress) + url := fmt.Sprintf("http://%s/api/v1/health", mockManagementPlaneAPIAddress) resp, err := client.R().EnableTrace().Get(url) @@ -393,14 +424,14 @@ func VerifyUpdateDataPlaneHealth(t *testing.T) { assert.Equal(t, mpi.InstanceHealth_INSTANCE_HEALTH_STATUS_HEALTHY, healths[0].GetInstanceHealthStatus()) } -func VerifyUpdateDataPlaneStatus(t *testing.T) { +func VerifyUpdateDataPlaneStatus(t *testing.T, mockManagementPlaneAPIAddress string) { t.Helper() client := resty.New() client.SetRetryCount(statusRetryCount).SetRetryWaitTime(retryWait).SetRetryMaxWaitTime(retryMaxWait) - url := fmt.Sprintf("http://%s/api/v1/status", MockManagementPlaneAPIAddress) + url := fmt.Sprintf("http://%s/api/v1/status", mockManagementPlaneAPIAddress) resp, err := client.R().EnableTrace().Get(url) diff --git a/test/mock/grpc/mock_management_command_service.go b/test/mock/grpc/mock_management_command_service.go index 3cae35f43..4c953c04a 100644 --- a/test/mock/grpc/mock_management_command_service.go +++ b/test/mock/grpc/mock_management_command_service.go @@ -181,9 +181,7 @@ func (cs *CommandService) handleConfigUploadRequest( instanceID := upload.ConfigUploadRequest.GetOverview().GetConfigVersion().GetInstanceId() overviewFiles := upload.ConfigUploadRequest.GetOverview().GetFiles() - if cs.instanceFiles[instanceID] == nil { - cs.instanceFiles[instanceID] = overviewFiles - } else { + if cs.instanceFiles[instanceID] != nil { filesToDelete := cs.checkForDeletedFiles(instanceID, overviewFiles) for _, fileToDelete := range filesToDelete { err := os.Remove(fileToDelete) @@ -192,6 +190,7 @@ func (cs *CommandService) handleConfigUploadRequest( } } } + cs.instanceFiles[instanceID] = overviewFiles } func (cs *CommandService) checkForDeletedFiles(instanceID string, overviewFiles []*mpi.File) []string { @@ -246,6 +245,7 @@ func (cs *CommandService) createServer(logger *slog.Logger) { cs.addHealthEndpoint() cs.addResponseAndRequestEndpoints() cs.addConfigApplyEndpoint() + cs.addConfigEndpoint() } func (cs *CommandService) addConnectionEndpoint() { @@ -386,6 +386,30 @@ func (cs *CommandService) addConfigApplyEndpoint() { }) } +func (cs *CommandService) addConfigEndpoint() { + cs.server.GET("/api/v1/instance/:instanceID/config", func(c *gin.Context) { + instanceID := c.Param("instanceID") + var data map[string]interface{} + + response := &mpi.GetOverviewResponse{ + Overview: &mpi.FileOverview{ + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: instanceID, + Version: files.GenerateConfigVersion(cs.instanceFiles[instanceID]), + }, + Files: cs.instanceFiles[instanceID], + }, + } + + if err := json.Unmarshal([]byte(protojson.Format(response)), &data); err != nil { + slog.Error("Failed to return connection", "error", err) + c.JSON(http.StatusInternalServerError, nil) + } + + c.JSON(http.StatusOK, data) + }) +} + func (cs *CommandService) findInstanceConfigFiles(instanceID string) (configFiles []*mpi.File, err error) { instanceDirectory := filepath.Join(cs.configDirectory, instanceID)