diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index 995da5ac6..0a36efc0a 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -94,8 +94,13 @@ func (cp *CommandPlugin) Close(ctx context.Context) error { } func (cp *CommandPlugin) Info() *bus.Info { + name := "command" + if cp.commandServerType.String() == model.Auxiliary.String() { + name = "auxiliary-command" + } + return &bus.Info{ - Name: cp.commandServerType.String(), + Name: name, } } diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index d8fa0c1ee..b8076fdfb 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -12,28 +12,16 @@ import ( "errors" "fmt" "log/slog" - "maps" - "math" "os" - "slices" "sync" - "sync/atomic" "google.golang.org/grpc" "github.com/nginx/agent/v3/internal/model" - "github.com/cenkalti/backoff/v4" - mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" "github.com/nginx/agent/v3/internal/config" - internalgrpc "github.com/nginx/agent/v3/internal/grpc" - "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/pkg/files" - "github.com/nginx/agent/v3/pkg/id" - "google.golang.org/protobuf/types/known/timestamppb" - - backoffHelpers "github.com/nginx/agent/v3/internal/backoff" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate @@ -64,17 +52,30 @@ type ( reader *bufio.Reader, chunkID uint32, ) (mpi.FileDataChunk_Content, error) + WriteManifestFile(updatedFiles map[string]*model.ManifestFile, + manifestDir, manifestPath string) (writeError error) } - fileManagerServiceInterface interface { + fileServiceOperatorInterface interface { + File(ctx context.Context, file *mpi.File, fileActions map[string]*model.FileCache) error UpdateOverview(ctx context.Context, instanceID string, filesToUpdate []*mpi.File, iteration int) error - ConfigApply( + ChunkedFile(ctx context.Context, file *mpi.File) error + IsConnected() bool + UpdateFile( ctx context.Context, - configApplyRequest *mpi.ConfigApplyRequest, - ) (writeStatus model.WriteStatus, err error) + instanceID string, + fileToUpdate *mpi.File, + ) error + SetIsConnected(isConnected bool) + } + + fileManagerServiceInterface interface { + ConfigApply(ctx context.Context, configApplyRequest *mpi.ConfigApplyRequest) (writeStatus model.WriteStatus, + err error) Rollback(ctx context.Context, instanceID string) error - UpdateFile(ctx context.Context, instanceID string, fileToUpdate *mpi.File) error ClearCache() + ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error + ConfigUpdate(ctx context.Context, nginxConfigContext *model.NginxConfigContext) UpdateCurrentFilesOnDisk(ctx context.Context, updateFiles map[string]*mpi.File, referenced bool) error DetermineFileActions( ctx context.Context, @@ -87,10 +88,9 @@ type ( ) type FileManagerService struct { - fileServiceClient mpi.FileServiceClient - agentConfig *config.Config - isConnected *atomic.Bool - fileOperator fileOperator + agentConfig *config.Config + fileOperator fileOperator + fileServiceOperator fileServiceOperatorInterface // map of files and the actions performed on them during config apply fileActions map[string]*model.FileCache // key is file path // map of the contents of files which have been updated or deleted during config apply, used during rollback @@ -103,397 +103,24 @@ type FileManagerService struct { } func NewFileManagerService(fileServiceClient mpi.FileServiceClient, agentConfig *config.Config) *FileManagerService { - isConnected := &atomic.Bool{} - isConnected.Store(false) - return &FileManagerService{ - fileServiceClient: fileServiceClient, agentConfig: agentConfig, fileOperator: NewFileOperator(), + fileServiceOperator: NewFileServiceOperator(agentConfig, fileServiceClient), fileActions: make(map[string]*model.FileCache), rollbackFileContents: make(map[string][]byte), currentFilesOnDisk: make(map[string]*mpi.File), previousManifestFiles: make(map[string]*model.ManifestFile), manifestFilePath: agentConfig.ManifestDir + "/manifest.json", - isConnected: isConnected, - } -} - -func (fms *FileManagerService) UpdateOverview( - ctx context.Context, - instanceID string, - filesToUpdate []*mpi.File, - iteration int, -) error { - correlationID := logger.CorrelationID(ctx) - - // error case for the UpdateOverview attempts - if iteration > maxAttempts { - return errors.New("too many UpdateOverview attempts") - } - - newCtx, correlationID := fms.setupIdentifiers(ctx, iteration) - - request := &mpi.UpdateOverviewRequest{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: correlationID, - Timestamp: timestamppb.Now(), - }, - Overview: &mpi.FileOverview{ - Files: filesToUpdate, - ConfigVersion: &mpi.ConfigVersion{ - InstanceId: instanceID, - Version: files.GenerateConfigVersion(filesToUpdate), - }, - }, - } - - backOffCtx, backoffCancel := context.WithTimeout(newCtx, fms.agentConfig.Client.Backoff.MaxElapsedTime) - defer backoffCancel() - - sendUpdateOverview := func() (*mpi.UpdateOverviewResponse, error) { - if fms.fileServiceClient == nil { - return nil, errors.New("file service client is not initialized") - } - - if !fms.isConnected.Load() { - return nil, errors.New("CreateConnection rpc has not being called yet") - } - - slog.DebugContext(newCtx, "Sending update overview request", - "instance_id", request.GetOverview().GetConfigVersion().GetInstanceId(), - "request", request, "parent_correlation_id", correlationID, - ) - - response, updateError := fms.fileServiceClient.UpdateOverview(newCtx, request) - - validatedError := internalgrpc.ValidateGrpcError(updateError) - - if validatedError != nil { - slog.ErrorContext(newCtx, "Failed to send update overview", "error", validatedError) - - return nil, validatedError - } - - return response, nil - } - - backoffSettings := fms.agentConfig.Client.Backoff - response, err := backoff.RetryWithData( - sendUpdateOverview, - backoffHelpers.Context(backOffCtx, backoffSettings), - ) - if err != nil { - return err - } - - slog.DebugContext(newCtx, "UpdateOverview response", "response", response) - - if response.GetOverview() == nil { - slog.Debug("UpdateOverview response is empty") - return nil - } - delta := files.ConvertToMapOfFiles(response.GetOverview().GetFiles()) - - if len(delta) != 0 { - return fms.updateFiles(ctx, delta, instanceID, iteration) - } - - return err -} - -func (fms *FileManagerService) setupIdentifiers(ctx context.Context, iteration int) (context.Context, string) { - correlationID := logger.CorrelationID(ctx) - var requestCorrelationID slog.Attr - - if iteration == 0 { - requestCorrelationID = logger.GenerateCorrelationID() - } else { - requestCorrelationID = logger.CorrelationIDAttr(ctx) - } - - newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, requestCorrelationID) - - return newCtx, correlationID -} - -func (fms *FileManagerService) updateFiles( - ctx context.Context, - delta map[string]*mpi.File, - instanceID string, - iteration int, -) error { - diffFiles := slices.Collect(maps.Values(delta)) - - for _, file := range diffFiles { - updateErr := fms.UpdateFile(ctx, instanceID, file) - if updateErr != nil { - return updateErr - } - } - - iteration++ - slog.Info("Updating file overview after file updates", "attempt_number", iteration) - - return fms.UpdateOverview(ctx, instanceID, diffFiles, iteration) -} - -func (fms *FileManagerService) UpdateFile( - ctx context.Context, - instanceID string, - fileToUpdate *mpi.File, -) error { - slog.InfoContext(ctx, "Updating file", "file_name", fileToUpdate.GetFileMeta().GetName(), "instance_id", instanceID) - - slog.DebugContext(ctx, "Checking file size", - "file_size", fileToUpdate.GetFileMeta().GetSize(), - "max_file_size", int64(fms.agentConfig.Client.Grpc.MaxFileSize), - ) - - if fileToUpdate.GetFileMeta().GetSize() <= int64(fms.agentConfig.Client.Grpc.MaxFileSize) { - return fms.sendUpdateFileRequest(ctx, fileToUpdate) - } - - return fms.sendUpdateFileStream(ctx, fileToUpdate, fms.agentConfig.Client.Grpc.FileChunkSize) -} - -func (fms *FileManagerService) sendUpdateFileRequest( - ctx context.Context, - fileToUpdate *mpi.File, -) error { - messageMeta := &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: logger.CorrelationID(ctx), - Timestamp: timestamppb.Now(), - } - - contents, err := os.ReadFile(fileToUpdate.GetFileMeta().GetName()) - if err != nil { - return err - } - - request := &mpi.UpdateFileRequest{ - File: fileToUpdate, - Contents: &mpi.FileContents{ - Contents: contents, - }, - MessageMeta: messageMeta, - } - - backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime) - defer backoffCancel() - - sendUpdateFile := func() (*mpi.UpdateFileResponse, error) { - slog.DebugContext(ctx, "Sending update file request", "request_file", request.GetFile(), - "request_message_meta", request.GetMessageMeta()) - if fms.fileServiceClient == nil { - return nil, errors.New("file service client is not initialized") - } - - if !fms.isConnected.Load() { - return nil, errors.New("CreateConnection rpc has not being called yet") - } - - response, updateError := fms.fileServiceClient.UpdateFile(ctx, request) - - validatedError := internalgrpc.ValidateGrpcError(updateError) - - if validatedError != nil { - slog.ErrorContext(ctx, "Failed to send update file", "error", validatedError) - - return nil, validatedError - } - - return response, nil - } - - response, err := backoff.RetryWithData( - sendUpdateFile, - backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff), - ) - if err != nil { - return err - } - - slog.DebugContext(ctx, "UpdateFile response", "response", response) - - return nil -} - -func (fms *FileManagerService) sendUpdateFileStream( - ctx context.Context, - fileToUpdate *mpi.File, - chunkSize uint32, -) error { - if chunkSize == 0 { - return fmt.Errorf("file chunk size must be greater than zero") - } - - updateFileStreamClient, err := fms.fileServiceClient.UpdateFileStream(ctx) - if err != nil { - return err - } - - err = fms.sendUpdateFileStreamHeader(ctx, fileToUpdate, chunkSize, updateFileStreamClient) - if err != nil { - return err - } - - return fms.sendFileUpdateStreamChunks(ctx, fileToUpdate, chunkSize, updateFileStreamClient) -} - -func (fms *FileManagerService) sendUpdateFileStreamHeader( - ctx context.Context, - fileToUpdate *mpi.File, - chunkSize uint32, - updateFileStreamClient grpc.ClientStreamingClient[mpi.FileDataChunk, mpi.UpdateFileResponse], -) error { - messageMeta := &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: logger.CorrelationID(ctx), - Timestamp: timestamppb.Now(), - } - - numberOfChunks := uint32(math.Ceil(float64(fileToUpdate.GetFileMeta().GetSize()) / float64(chunkSize))) - - header := mpi.FileDataChunk_Header{ - Header: &mpi.FileDataChunkHeader{ - FileMeta: fileToUpdate.GetFileMeta(), - Chunks: numberOfChunks, - ChunkSize: chunkSize, - }, - } - - backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime) - defer backoffCancel() - - sendUpdateFileHeader := func() error { - slog.DebugContext(ctx, "Sending update file stream header", "header", header) - if fms.fileServiceClient == nil { - return errors.New("file service client is not initialized") - } - - if !fms.isConnected.Load() { - return errors.New("CreateConnection rpc has not being called yet") - } - - err := updateFileStreamClient.Send( - &mpi.FileDataChunk{ - Meta: messageMeta, - Chunk: &header, - }, - ) - - validatedError := internalgrpc.ValidateGrpcError(err) - - if validatedError != nil { - slog.ErrorContext(ctx, "Failed to send update file stream header", "error", validatedError) - - return validatedError - } - - return nil - } - - return backoff.Retry(sendUpdateFileHeader, backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff)) -} - -func (fms *FileManagerService) sendFileUpdateStreamChunks( - ctx context.Context, - fileToUpdate *mpi.File, - chunkSize uint32, - updateFileStreamClient grpc.ClientStreamingClient[mpi.FileDataChunk, mpi.UpdateFileResponse], -) error { - f, err := os.Open(fileToUpdate.GetFileMeta().GetName()) - defer func() { - closeError := f.Close() - if closeError != nil { - slog.WarnContext( - ctx, "Failed to close file", - "file", fileToUpdate.GetFileMeta().GetName(), - "error", closeError, - ) - } - }() - if err != nil { - return err - } - - var chunkID uint32 - - reader := bufio.NewReader(f) - for { - chunk, readChunkError := fms.fileOperator.ReadChunk(ctx, chunkSize, reader, chunkID) - if readChunkError != nil { - return readChunkError - } - if chunk.Content == nil { - break - } - - sendError := fms.sendFileUpdateStreamChunk(ctx, chunk, updateFileStreamClient) - if sendError != nil { - return sendError - } - - chunkID++ } - - return nil -} - -func (fms *FileManagerService) sendFileUpdateStreamChunk( - ctx context.Context, - chunk mpi.FileDataChunk_Content, - updateFileStreamClient grpc.ClientStreamingClient[mpi.FileDataChunk, mpi.UpdateFileResponse], -) error { - messageMeta := &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: logger.CorrelationID(ctx), - Timestamp: timestamppb.Now(), - } - - backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime) - defer backoffCancel() - - sendUpdateFileChunk := func() error { - slog.DebugContext(ctx, "Sending update file stream chunk", "chunk_id", chunk.Content.GetChunkId()) - if fms.fileServiceClient == nil { - return errors.New("file service client is not initialized") - } - - if !fms.isConnected.Load() { - return errors.New("CreateConnection rpc has not being called yet") - } - - err := updateFileStreamClient.Send( - &mpi.FileDataChunk{ - Meta: messageMeta, - Chunk: &chunk, - }, - ) - - validatedError := internalgrpc.ValidateGrpcError(err) - - if validatedError != nil { - slog.ErrorContext(ctx, "Failed to send update file stream chunk", "error", validatedError) - - return validatedError - } - - return nil - } - - return backoff.Retry(sendUpdateFileChunk, backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff)) } func (fms *FileManagerService) IsConnected() bool { - return fms.isConnected.Load() + return fms.fileServiceOperator.IsConnected() } func (fms *FileManagerService) SetIsConnected(isConnected bool) { - fms.isConnected.Store(isConnected) + fms.fileServiceOperator.SetIsConnected(isConnected) } func (fms *FileManagerService) ConfigApply(ctx context.Context, @@ -581,7 +208,8 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) } } - manifestFileErr := fms.writeManifestFile(fms.previousManifestFiles) + manifestFileErr := fms.fileOperator.WriteManifestFile(fms.previousManifestFiles, + fms.agentConfig.ManifestDir, fms.manifestFilePath) if manifestFileErr != nil { return manifestFileErr } @@ -616,102 +244,70 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) error { func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) error { if file.GetFileMeta().GetSize() <= int64(fms.agentConfig.Client.Grpc.MaxFileSize) { - return fms.file(ctx, file) + return fms.fileServiceOperator.File(ctx, file, fms.fileActions) } - return fms.chunkedFile(ctx, file) + return fms.fileServiceOperator.ChunkedFile(ctx, file) } -func (fms *FileManagerService) file(ctx context.Context, file *mpi.File) error { - slog.DebugContext(ctx, "Getting file", "file", file.GetFileMeta().GetName()) - - backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime) - defer backoffCancel() - - getFile := func() (*mpi.GetFileResponse, error) { - return fms.fileServiceClient.GetFile(ctx, &mpi.GetFileRequest{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: logger.CorrelationID(ctx), - Timestamp: timestamppb.Now(), - }, - FileMeta: file.GetFileMeta(), - }) - } - - getFileResp, getFileErr := backoff.RetryWithData( - getFile, - backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff), - ) - - if getFileErr != nil { - return fmt.Errorf("error getting file data for %s: %w", file.GetFileMeta(), getFileErr) - } - - if writeErr := fms.fileOperator.Write(ctx, getFileResp.GetContents().GetContents(), - file.GetFileMeta()); writeErr != nil { - return writeErr +func (fms *FileManagerService) checkAllowedDirectory(checkFiles []*mpi.File) error { + for _, file := range checkFiles { + allowed := fms.agentConfig.IsDirectoryAllowed(file.GetFileMeta().GetName()) + if !allowed { + return fmt.Errorf("file not in allowed directories %s", file.GetFileMeta().GetName()) + } } - return fms.validateFileHash(file.GetFileMeta().GetName()) + return nil } -func (fms *FileManagerService) chunkedFile(ctx context.Context, file *mpi.File) error { - slog.DebugContext(ctx, "Getting chunked file", "file", file.GetFileMeta().GetName()) - - stream, err := fms.fileServiceClient.GetFileStream(ctx, &mpi.GetFileRequest{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: logger.CorrelationID(ctx), - Timestamp: timestamppb.Now(), - }, - FileMeta: file.GetFileMeta(), - }) - if err != nil { - return fmt.Errorf("error getting file stream for %s: %w", file.GetFileMeta().GetName(), err) - } - - // Get header chunk first - headerChunk, recvHeaderChunkError := stream.Recv() - if recvHeaderChunkError != nil { - return recvHeaderChunkError - } - - slog.DebugContext(ctx, "File header chunk received", "header_chunk", headerChunk) - - header := headerChunk.GetHeader() - - writeChunkedFileError := fms.fileOperator.WriteChunkedFile(ctx, file, header, stream) - if writeChunkedFileError != nil { - return writeChunkedFileError +func (fms *FileManagerService) ConfigUpdate(ctx context.Context, + nginxConfigContext *model.NginxConfigContext, +) { + updateError := fms.UpdateCurrentFilesOnDisk( + ctx, + files.ConvertToMapOfFiles(nginxConfigContext.Files), + true, + ) + if updateError != nil { + slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError) } - return nil -} - -func (fms *FileManagerService) validateFileHash(filePath string) error { - content, err := os.ReadFile(filePath) + slog.InfoContext(ctx, "Updating overview after nginx config update") + err := fms.fileServiceOperator.UpdateOverview(ctx, nginxConfigContext.InstanceID, nginxConfigContext.Files, 0) if err != nil { - return err + slog.ErrorContext( + ctx, + "Failed to update file overview", + "instance_id", nginxConfigContext.InstanceID, + "error", err, + ) } - fileHash := files.GenerateHash(content) +} - if fileHash != fms.fileActions[filePath].File.GetFileMeta().GetHash() { - return fmt.Errorf("error writing file, file hash does not match for file %s", filePath) - } +func (fms *FileManagerService) ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error { + var updatingFilesError error - return nil -} + for _, file := range configUploadRequest.GetOverview().GetFiles() { + err := fms.fileServiceOperator.UpdateFile( + ctx, + configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), + file, + ) + if err != nil { + slog.ErrorContext( + ctx, + "Failed to update file", + "instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), + "file_name", file.GetFileMeta().GetName(), + "error", err, + ) -func (fms *FileManagerService) checkAllowedDirectory(checkFiles []*mpi.File) error { - for _, file := range checkFiles { - allowed := fms.agentConfig.IsDirectoryAllowed(file.GetFileMeta().GetName()) - if !allowed { - return fmt.Errorf("file not in allowed directories %s", file.GetFileMeta().GetName()) + updatingFilesError = errors.Join(updatingFilesError, err) } } - return nil + return updatingFilesError } // DetermineFileActions compares two sets of files to determine the file action for each file. Returns a map of files @@ -853,38 +449,7 @@ func (fms *FileManagerService) UpdateManifestFile(currentFiles map[string]*mpi.F updatedFiles = manifestFiles } - return fms.writeManifestFile(updatedFiles) -} - -func (fms *FileManagerService) writeManifestFile(updatedFiles map[string]*model.ManifestFile) (writeError error) { - manifestJSON, err := json.MarshalIndent(updatedFiles, "", " ") - if err != nil { - return fmt.Errorf("unable to marshal manifest file json: %w", err) - } - - // 0755 allows read/execute for all, write for owner - if err = os.MkdirAll(fms.agentConfig.ManifestDir, dirPerm); err != nil { - return fmt.Errorf("unable to create directory %s: %w", fms.agentConfig.ManifestDir, err) - } - - // 0600 ensures only root can read/write - newFile, err := os.OpenFile(fms.manifestFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, filePerm) - defer func() { - if closeErr := newFile.Close(); closeErr != nil { - writeError = closeErr - } - }() - - if err != nil { - return fmt.Errorf("failed to read manifest file: %w", err) - } - - _, err = newFile.Write(manifestJSON) - if err != nil { - return fmt.Errorf("failed to write manifest file: %w", err) - } - - return writeError + return fms.fileOperator.WriteManifestFile(updatedFiles, fms.agentConfig.ManifestDir, fms.manifestFilePath) } func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, map[string]*mpi.File, error) { diff --git a/internal/file/file_manager_service_test.go b/internal/file/file_manager_service_test.go index 5f9237942..4cff68071 100644 --- a/internal/file/file_manager_service_test.go +++ b/internal/file/file_manager_service_test.go @@ -11,7 +11,6 @@ import ( "fmt" "os" "path/filepath" - "sync/atomic" "testing" "github.com/nginx/agent/v3/internal/model" @@ -28,146 +27,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestFileManagerService_UpdateOverview(t *testing.T) { - ctx := context.Background() - - filePath := filepath.Join(t.TempDir(), "nginx.conf") - fileMeta := protos.FileMeta(filePath, "") - - fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}") - fileHash := files.GenerateHash(fileContent) - - fileWriteErr := os.WriteFile(filePath, fileContent, 0o600) - require.NoError(t, fileWriteErr) - - overview := protos.FileOverview(filePath, fileHash) - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeFileServiceClient.UpdateOverviewReturnsOnCall(0, &mpi.UpdateOverviewResponse{ - Overview: overview, - }, nil) - - fakeFileServiceClient.UpdateOverviewReturnsOnCall(1, &mpi.UpdateOverviewResponse{}, nil) - - fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil) - - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) - fileManagerService.SetIsConnected(true) - - err := fileManagerService.UpdateOverview(ctx, "123", []*mpi.File{ - { - FileMeta: fileMeta, - }, - }, 0) - - require.NoError(t, err) - assert.Equal(t, 2, fakeFileServiceClient.UpdateOverviewCallCount()) -} - -func TestFileManagerService_UpdateOverview_MaxIterations(t *testing.T) { - ctx := context.Background() - - filePath := filepath.Join(t.TempDir(), "nginx.conf") - fileMeta := protos.FileMeta(filePath, "") - - fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}") - fileHash := files.GenerateHash(fileContent) - - fileWriteErr := os.WriteFile(filePath, fileContent, 0o600) - require.NoError(t, fileWriteErr) - - overview := protos.FileOverview(filePath, fileHash) - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - - // do 5 iterations - for i := 0; i <= 5; i++ { - fakeFileServiceClient.UpdateOverviewReturnsOnCall(i, &mpi.UpdateOverviewResponse{ - Overview: overview, - }, nil) - } - - fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil) - - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) - fileManagerService.SetIsConnected(true) - - err := fileManagerService.UpdateOverview(ctx, "123", []*mpi.File{ - { - FileMeta: fileMeta, - }, - }, 0) - - require.Error(t, err) - assert.Equal(t, "too many UpdateOverview attempts", err.Error()) -} - -func TestFileManagerService_UpdateFile(t *testing.T) { - tests := []struct { - name string - isCert bool - }{ - { - name: "non-cert", - isCert: false, - }, - { - name: "cert", - isCert: true, - }, - } - - tempDir := os.TempDir() - - for _, test := range tests { - ctx := context.Background() - - testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf") - - var fileMeta *mpi.FileMeta - if test.isCert { - fileMeta = protos.CertMeta(testFile.Name(), "") - } else { - fileMeta = protos.FileMeta(testFile.Name(), "") - } - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) - fileManagerService.SetIsConnected(true) - - err := fileManagerService.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta}) - - require.NoError(t, err) - assert.Equal(t, 1, fakeFileServiceClient.UpdateFileCallCount()) - - helpers.RemoveFileWithErrorCheck(t, testFile.Name()) - } -} - -func TestFileManagerService_UpdateFile_LargeFile(t *testing.T) { - ctx := context.Background() - tempDir := os.TempDir() - - testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf") - writeFileError := os.WriteFile(testFile.Name(), []byte("#test content"), 0o600) - require.NoError(t, writeFileError) - fileMeta := protos.FileMetaLargeFile(testFile.Name(), "") - - fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} - fakeClientStreamingClient := &FakeClientStreamingClient{sendCount: atomic.Int32{}} - fakeFileServiceClient.UpdateFileStreamReturns(fakeClientStreamingClient, nil) - fileManagerService := NewFileManagerService(fakeFileServiceClient, types.AgentConfig()) - fileManagerService.SetIsConnected(true) - - err := fileManagerService.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta}) - - require.NoError(t, err) - assert.Equal(t, 0, fakeFileServiceClient.UpdateFileCallCount()) - assert.Equal(t, 14, int(fakeClientStreamingClient.sendCount.Load())) - - helpers.RemoveFileWithErrorCheck(t, testFile.Name()) -} - func TestFileManagerService_ConfigApply_Add(t *testing.T) { ctx := context.Background() tempDir := t.TempDir() diff --git a/internal/file/file_operator.go b/internal/file/file_operator.go index 456f1127b..88539ce3a 100644 --- a/internal/file/file_operator.go +++ b/internal/file/file_operator.go @@ -8,12 +8,15 @@ package file import ( "bufio" "context" + "encoding/json" "fmt" "io" "log/slog" "os" "path" + "github.com/nginx/agent/v3/internal/model" + "google.golang.org/grpc" "github.com/nginx/agent/v3/pkg/files" @@ -140,3 +143,35 @@ func (fo *FileOperator) ReadChunk( return chunk, err } + +func (fo *FileOperator) WriteManifestFile(updatedFiles map[string]*model.ManifestFile, manifestDir, + manifestPath string, +) (writeError error) { + manifestJSON, err := json.MarshalIndent(updatedFiles, "", " ") + if err != nil { + return fmt.Errorf("unable to marshal manifest file json: %w", err) + } + + // 0755 allows read/execute for all, write for owner + if err = os.MkdirAll(manifestDir, dirPerm); err != nil { + return fmt.Errorf("unable to create directory %s: %w", manifestDir, err) + } + + // 0600 ensures only root can read/write + newFile, err := os.OpenFile(manifestPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, filePerm) + if err != nil { + return fmt.Errorf("failed to read manifest file: %w", err) + } + defer func() { + if closeErr := newFile.Close(); closeErr != nil { + writeError = closeErr + } + }() + + _, err = newFile.Write(manifestJSON) + if err != nil { + return fmt.Errorf("failed to write manifest file: %w", err) + } + + return writeError +} diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index e402fae27..6567830ea 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -7,7 +7,6 @@ package file import ( "context" - "fmt" "log/slog" "github.com/nginx/agent/v3/pkg/files" @@ -32,16 +31,24 @@ type FilePlugin struct { config *config.Config conn grpc.GrpcConnectionInterface fileManagerService fileManagerServiceInterface + serverType model.ServerType } -func NewFilePlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface) *FilePlugin { +func NewFilePlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface, + serverType model.ServerType, +) *FilePlugin { return &FilePlugin{ - config: agentConfig, - conn: grpcConnection, + config: agentConfig, + conn: grpcConnection, + serverType: serverType, } } func (fp *FilePlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error { + ctx = context.WithValue( + ctx, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()), + ) slog.DebugContext(ctx, "Starting file plugin") fp.messagePipe = messagePipe @@ -51,41 +58,70 @@ func (fp *FilePlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInter } func (fp *FilePlugin) Close(ctx context.Context) error { + ctx = context.WithValue( + ctx, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()), + ) slog.InfoContext(ctx, "Closing file plugin") + return fp.conn.Close(ctx) } func (fp *FilePlugin) Info() *bus.Info { + name := "file" + if fp.serverType.String() == model.Auxiliary.String() { + name = "auxiliary-file" + } + return &bus.Info{ - Name: "file", + Name: name, } } +// nolint: cyclop, revive func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) { - switch msg.Topic { - case bus.ConnectionResetTopic: - fp.handleConnectionReset(ctx, msg) - case bus.ConnectionCreatedTopic: - slog.DebugContext(ctx, "File plugin received connection created message") - fp.fileManagerService.SetIsConnected(true) - case bus.NginxConfigUpdateTopic: - fp.handleNginxConfigUpdate(ctx, msg) - case bus.ConfigUploadRequestTopic: - fp.handleConfigUploadRequest(ctx, msg) - case bus.ConfigApplyRequestTopic: - fp.handleConfigApplyRequest(ctx, msg) - case bus.ConfigApplyCompleteTopic: - fp.handleConfigApplyComplete(ctx, msg) - case bus.ConfigApplySuccessfulTopic: - fp.handleConfigApplySuccess(ctx, msg) - case bus.ConfigApplyFailedTopic: - fp.handleConfigApplyFailedRequest(ctx, msg) - default: - slog.DebugContext(ctx, "File plugin received unknown topic", "topic", msg.Topic) + if logger.ServerType(ctx) == "" { + ctx = context.WithValue( + ctx, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, fp.serverType.String()), + ) + } + + if logger.ServerType(ctx) == fp.serverType.String() || logger.ServerType(ctx) == "" { + switch msg.Topic { + case bus.ConnectionResetTopic: + fp.handleConnectionReset(ctx, msg) + case bus.ConnectionCreatedTopic: + slog.DebugContext(ctx, "File plugin received connection created message") + fp.fileManagerService.SetIsConnected(true) + case bus.NginxConfigUpdateTopic: + fp.handleNginxConfigUpdate(ctx, msg) + case bus.ConfigUploadRequestTopic: + fp.handleConfigUploadRequest(ctx, msg) + case bus.ConfigApplyRequestTopic: + fp.handleConfigApplyRequest(ctx, msg) + case bus.ConfigApplyCompleteTopic: + fp.handleConfigApplyComplete(ctx, msg) + case bus.ConfigApplySuccessfulTopic: + fp.handleConfigApplySuccess(ctx, msg) + case bus.ConfigApplyFailedTopic: + fp.handleConfigApplyFailedRequest(ctx, msg) + default: + slog.DebugContext(ctx, "File plugin received unknown topic", "topic", msg.Topic) + } } } func (fp *FilePlugin) Subscriptions() []string { + if fp.serverType == model.Auxiliary { + return []string{ + bus.ConnectionResetTopic, + bus.ConnectionCreatedTopic, + bus.NginxConfigUpdateTopic, + bus.ConfigUploadRequestTopic, + } + } + return []string{ bus.ConnectionResetTopic, bus.ConnectionCreatedTopic, @@ -319,27 +355,10 @@ func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Mess return } - updateError := fp.fileManagerService.UpdateCurrentFilesOnDisk( - ctx, - files.ConvertToMapOfFiles(nginxConfigContext.Files), - true, - ) - if updateError != nil { - slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError) - } - - slog.InfoContext(ctx, "Updating overview after nginx config update") - err := fp.fileManagerService.UpdateOverview(ctx, nginxConfigContext.InstanceID, nginxConfigContext.Files, 0) - if err != nil { - slog.ErrorContext( - ctx, - "Failed to update file overview", - "instance_id", nginxConfigContext.InstanceID, - "error", err, - ) - } + fp.fileManagerService.ConfigUpdate(ctx, nginxConfigContext) } +// nolint: dupl func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "File plugin received config upload request message") managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) @@ -357,36 +376,7 @@ func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Me correlationID := logger.CorrelationID(ctx) - var updatingFilesError error - - for _, file := range configUploadRequest.GetOverview().GetFiles() { - err := fp.fileManagerService.UpdateFile( - ctx, - configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), - file, - ) - if err != nil { - slog.ErrorContext( - ctx, - "Failed to update file", - "instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), - "file_name", file.GetFileMeta().GetName(), - "error", err, - ) - - response := fp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_ERROR, - fmt.Sprintf("Failed to update file %s", file.GetFileMeta().GetName()), - configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), - err.Error(), - ) - - updatingFilesError = err - - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) - - break - } - } + updatingFilesError := fp.fileManagerService.ConfigUpload(ctx, configUploadRequest) response := &mpi.DataPlaneResponse{ MessageMeta: &mpi.MessageMeta{ diff --git a/internal/file/file_plugin_test.go b/internal/file/file_plugin_test.go index fb7e556f5..f1cb08403 100644 --- a/internal/file/file_plugin_test.go +++ b/internal/file/file_plugin_test.go @@ -31,7 +31,7 @@ import ( ) func TestFilePlugin_Info(t *testing.T) { - filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) + filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command) assert.Equal(t, "file", filePlugin.Info().Name) } @@ -39,14 +39,14 @@ func TestFilePlugin_Close(t *testing.T) { ctx := context.Background() fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection) + filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command) filePlugin.Close(ctx) assert.Equal(t, 1, fakeGrpcConnection.CloseCallCount()) } func TestFilePlugin_Subscriptions(t *testing.T) { - filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) + filePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command) assert.Equal( t, []string{ @@ -61,6 +61,14 @@ func TestFilePlugin_Subscriptions(t *testing.T) { }, filePlugin.Subscriptions(), ) + + readOnlyFilePlugin := NewFilePlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Auxiliary) + assert.Equal(t, []string{ + bus.ConnectionResetTopic, + bus.ConnectionCreatedTopic, + bus.NginxConfigUpdateTopic, + bus.ConfigUploadRequestTopic, + }, readOnlyFilePlugin.Subscriptions()) } func TestFilePlugin_Process_NginxConfigUpdateTopic(t *testing.T) { @@ -85,7 +93,7 @@ func TestFilePlugin_Process_NginxConfigUpdateTopic(t *testing.T) { fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection) + filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -160,7 +168,7 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) { fakeFileManagerService := &filefakes.FakeFileManagerServiceInterface{} fakeFileManagerService.ConfigApplyReturns(test.configApplyStatus, test.configApplyReturnsErr) messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection) + filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command) err := filePlugin.Init(ctx, messagePipe) filePlugin.fileManagerService = fakeFileManagerService require.NoError(t, err) @@ -258,7 +266,7 @@ func TestFilePlugin_Process_ConfigUploadRequestTopic(t *testing.T) { fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection) + filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -313,7 +321,7 @@ func TestFilePlugin_Process_ConfigUploadRequestTopic_Failure(t *testing.T) { fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) messagePipe := busfakes.NewFakeMessagePipe() - filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection) + filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection, model.Command) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -322,7 +330,7 @@ func TestFilePlugin_Process_ConfigUploadRequestTopic_Failure(t *testing.T) { assert.Eventually( t, - func() bool { return len(messagePipe.Messages()) == 2 }, + func() bool { return len(messagePipe.Messages()) == 1 }, 2*time.Second, 10*time.Millisecond, ) @@ -330,21 +338,12 @@ func TestFilePlugin_Process_ConfigUploadRequestTopic_Failure(t *testing.T) { assert.Equal(t, 0, fakeFileServiceClient.UpdateFileCallCount()) messages := messagePipe.Messages() - assert.Len(t, messages, 2) + assert.Len(t, messages, 1) + assert.Equal(t, bus.DataPlaneResponseTopic, messages[0].Topic) dataPlaneResponse, ok := messages[0].Data.(*mpi.DataPlaneResponse) assert.True(t, ok) - assert.Equal( - t, - mpi.CommandResponse_COMMAND_STATUS_ERROR, - dataPlaneResponse.GetCommandResponse().GetStatus(), - ) - - assert.Equal(t, bus.DataPlaneResponseTopic, messages[1].Topic) - - dataPlaneResponse, ok = messages[1].Data.(*mpi.DataPlaneResponse) - assert.True(t, ok) assert.Equal( t, mpi.CommandResponse_COMMAND_STATUS_FAILURE, @@ -390,7 +389,7 @@ func TestFilePlugin_Process_ConfigApplyFailedTopic(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() agentConfig := types.AgentConfig() - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection) + filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -437,7 +436,7 @@ func TestFilePlugin_Process_ConfigApplyRollbackCompleteTopic(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() agentConfig := types.AgentConfig() fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection) + filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) @@ -482,7 +481,7 @@ func TestFilePlugin_Process_ConfigApplyCompleteTopic(t *testing.T) { messagePipe := busfakes.NewFakeMessagePipe() agentConfig := types.AgentConfig() fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} - filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection) + filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection, model.Command) err := filePlugin.Init(ctx, messagePipe) require.NoError(t, err) diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go new file mode 100644 index 000000000..6bf835fa0 --- /dev/null +++ b/internal/file/file_service_operator.go @@ -0,0 +1,514 @@ +// Copyright (c) F5, Inc. +// +// This source code is licensed under the Apache License, Version 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +package file + +import ( + "bufio" + "context" + "errors" + "fmt" + "log/slog" + "maps" + "math" + "os" + "slices" + "sync/atomic" + + "github.com/cenkalti/backoff/v4" + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" + backoffHelpers "github.com/nginx/agent/v3/internal/backoff" + "github.com/nginx/agent/v3/internal/config" + internalgrpc "github.com/nginx/agent/v3/internal/grpc" + "github.com/nginx/agent/v3/internal/logger" + "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/pkg/files" + "github.com/nginx/agent/v3/pkg/id" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// File service operator handles requests to the grpc file service + +type FileServiceOperator struct { + fileServiceClient mpi.FileServiceClient + agentConfig *config.Config + fileOperator fileOperator + isConnected *atomic.Bool +} + +var _ fileServiceOperatorInterface = (*FileServiceOperator)(nil) + +func NewFileServiceOperator(agentConfig *config.Config, fileServiceClient mpi.FileServiceClient) *FileServiceOperator { + isConnected := &atomic.Bool{} + isConnected.Store(false) + + return &FileServiceOperator{ + fileServiceClient: fileServiceClient, + agentConfig: agentConfig, + fileOperator: NewFileOperator(), + isConnected: isConnected, + } +} + +func (fso *FileServiceOperator) SetIsConnected(isConnected bool) { + fso.isConnected.Store(isConnected) +} + +func (fso *FileServiceOperator) IsConnected() bool { + return fso.isConnected.Load() +} + +func (fso *FileServiceOperator) File(ctx context.Context, file *mpi.File, + fileActions map[string]*model.FileCache, +) error { + slog.DebugContext(ctx, "Getting file", "file", file.GetFileMeta().GetName()) + + backOffCtx, backoffCancel := context.WithTimeout(ctx, fso.agentConfig.Client.Backoff.MaxElapsedTime) + defer backoffCancel() + + getFile := func() (*mpi.GetFileResponse, error) { + return fso.fileServiceClient.GetFile(ctx, &mpi.GetFileRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: logger.CorrelationID(ctx), + Timestamp: timestamppb.Now(), + }, + FileMeta: file.GetFileMeta(), + }) + } + + getFileResp, getFileErr := backoff.RetryWithData( + getFile, + backoffHelpers.Context(backOffCtx, fso.agentConfig.Client.Backoff), + ) + + if getFileErr != nil { + return fmt.Errorf("error getting file data for %s: %w", file.GetFileMeta(), getFileErr) + } + + if writeErr := fso.fileOperator.Write(ctx, getFileResp.GetContents().GetContents(), + file.GetFileMeta()); writeErr != nil { + return writeErr + } + + return fso.validateFileHash(file.GetFileMeta().GetName(), fileActions) +} + +func (fso *FileServiceOperator) validateFileHash(filePath string, fileActions map[string]*model.FileCache) error { + content, err := os.ReadFile(filePath) + if err != nil { + return err + } + fileHash := files.GenerateHash(content) + + if fileHash != fileActions[filePath].File.GetFileMeta().GetHash() { + return fmt.Errorf("error writing file, file hash does not match for file %s", filePath) + } + + return nil +} + +func (fso *FileServiceOperator) UpdateOverview( + ctx context.Context, + instanceID string, + filesToUpdate []*mpi.File, + iteration int, +) error { + correlationID := logger.CorrelationID(ctx) + + // error case for the UpdateOverview attempts + if iteration > maxAttempts { + return errors.New("too many UpdateOverview attempts") + } + + newCtx, correlationID := fso.setupIdentifiers(ctx, iteration) + + request := &mpi.UpdateOverviewRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: correlationID, + Timestamp: timestamppb.Now(), + }, + Overview: &mpi.FileOverview{ + Files: filesToUpdate, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: instanceID, + Version: files.GenerateConfigVersion(filesToUpdate), + }, + }, + } + + backOffCtx, backoffCancel := context.WithTimeout(newCtx, fso.agentConfig.Client.Backoff.MaxElapsedTime) + defer backoffCancel() + + sendUpdateOverview := func() (*mpi.UpdateOverviewResponse, error) { + if fso.fileServiceClient == nil { + return nil, errors.New("file service client is not initialized") + } + + if !fso.isConnected.Load() { + return nil, errors.New("CreateConnection rpc has not being called yet") + } + + slog.DebugContext(newCtx, "Sending update overview request", + "instance_id", request.GetOverview().GetConfigVersion().GetInstanceId(), + "request", request, "parent_correlation_id", correlationID, + ) + + response, updateError := fso.fileServiceClient.UpdateOverview(newCtx, request) + + validatedError := internalgrpc.ValidateGrpcError(updateError) + + if validatedError != nil { + slog.ErrorContext(newCtx, "Failed to send update overview", "error", validatedError) + + return nil, validatedError + } + + return response, nil + } + + backoffSettings := fso.agentConfig.Client.Backoff + response, err := backoff.RetryWithData( + sendUpdateOverview, + backoffHelpers.Context(backOffCtx, backoffSettings), + ) + if err != nil { + return err + } + + slog.DebugContext(newCtx, "UpdateOverview response", "response", response) + + if response.GetOverview() == nil { + slog.Debug("UpdateOverview response is empty") + return nil + } + delta := files.ConvertToMapOfFiles(response.GetOverview().GetFiles()) + + if len(delta) != 0 { + return fso.updateFiles(ctx, delta, instanceID, iteration) + } + + return err +} + +func (fso *FileServiceOperator) updateFiles( + ctx context.Context, + delta map[string]*mpi.File, + instanceID string, + iteration int, +) error { + diffFiles := slices.Collect(maps.Values(delta)) + + for _, file := range diffFiles { + updateErr := fso.UpdateFile(ctx, instanceID, file) + if updateErr != nil { + return updateErr + } + } + + iteration++ + slog.Info("Updating file overview after file updates", "attempt_number", iteration) + + return fso.UpdateOverview(ctx, instanceID, diffFiles, iteration) +} + +func (fso *FileServiceOperator) UpdateFile( + ctx context.Context, + instanceID string, + fileToUpdate *mpi.File, +) error { + slog.InfoContext(ctx, "Updating file", "file_name", fileToUpdate.GetFileMeta().GetName(), "instance_id", instanceID) + + slog.DebugContext(ctx, "Checking file size", + "file_size", fileToUpdate.GetFileMeta().GetSize(), + "max_file_size", int64(fso.agentConfig.Client.Grpc.MaxFileSize), + ) + + if fileToUpdate.GetFileMeta().GetSize() <= int64(fso.agentConfig.Client.Grpc.MaxFileSize) { + return fso.sendUpdateFileRequest(ctx, fileToUpdate) + } + + return fso.sendUpdateFileStream(ctx, fileToUpdate, fso.agentConfig.Client.Grpc.FileChunkSize) +} + +func (fso *FileServiceOperator) sendUpdateFileRequest( + ctx context.Context, + fileToUpdate *mpi.File, +) error { + messageMeta := &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: logger.CorrelationID(ctx), + Timestamp: timestamppb.Now(), + } + + contents, err := os.ReadFile(fileToUpdate.GetFileMeta().GetName()) + if err != nil { + return err + } + + request := &mpi.UpdateFileRequest{ + File: fileToUpdate, + Contents: &mpi.FileContents{ + Contents: contents, + }, + MessageMeta: messageMeta, + } + + backOffCtx, backoffCancel := context.WithTimeout(ctx, fso.agentConfig.Client.Backoff.MaxElapsedTime) + defer backoffCancel() + + sendUpdateFile := func() (*mpi.UpdateFileResponse, error) { + slog.DebugContext(ctx, "Sending update file request", "request_file", request.GetFile(), + "request_message_meta", request.GetMessageMeta()) + if fso.fileServiceClient == nil { + return nil, errors.New("file service client is not initialized") + } + + if !fso.isConnected.Load() { + return nil, errors.New("CreateConnection rpc has not being called yet") + } + + response, updateError := fso.fileServiceClient.UpdateFile(ctx, request) + + validatedError := internalgrpc.ValidateGrpcError(updateError) + + if validatedError != nil { + slog.ErrorContext(ctx, "Failed to send update file", "error", validatedError) + + return nil, validatedError + } + + return response, nil + } + + response, err := backoff.RetryWithData( + sendUpdateFile, + backoffHelpers.Context(backOffCtx, fso.agentConfig.Client.Backoff), + ) + if err != nil { + return err + } + + slog.DebugContext(ctx, "UpdateFile response", "response", response) + + return nil +} + +func (fso *FileServiceOperator) sendUpdateFileStream( + ctx context.Context, + fileToUpdate *mpi.File, + chunkSize uint32, +) error { + if chunkSize == 0 { + return fmt.Errorf("file chunk size must be greater than zero") + } + + updateFileStreamClient, err := fso.fileServiceClient.UpdateFileStream(ctx) + if err != nil { + return err + } + + err = fso.sendUpdateFileStreamHeader(ctx, fileToUpdate, chunkSize, updateFileStreamClient) + if err != nil { + return err + } + + return fso.sendFileUpdateStreamChunks(ctx, fileToUpdate, chunkSize, updateFileStreamClient) +} + +func (fso *FileServiceOperator) sendUpdateFileStreamHeader( + ctx context.Context, + fileToUpdate *mpi.File, + chunkSize uint32, + updateFileStreamClient grpc.ClientStreamingClient[mpi.FileDataChunk, mpi.UpdateFileResponse], +) error { + messageMeta := &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: logger.CorrelationID(ctx), + Timestamp: timestamppb.Now(), + } + + numberOfChunks := uint32(math.Ceil(float64(fileToUpdate.GetFileMeta().GetSize()) / float64(chunkSize))) + + header := mpi.FileDataChunk_Header{ + Header: &mpi.FileDataChunkHeader{ + FileMeta: fileToUpdate.GetFileMeta(), + Chunks: numberOfChunks, + ChunkSize: chunkSize, + }, + } + + backOffCtx, backoffCancel := context.WithTimeout(ctx, fso.agentConfig.Client.Backoff.MaxElapsedTime) + defer backoffCancel() + + sendUpdateFileHeader := func() error { + slog.DebugContext(ctx, "Sending update file stream header", "header", header) + if fso.fileServiceClient == nil { + return errors.New("file service client is not initialized") + } + + if !fso.isConnected.Load() { + return errors.New("CreateConnection rpc has not being called yet") + } + + err := updateFileStreamClient.Send( + &mpi.FileDataChunk{ + Meta: messageMeta, + Chunk: &header, + }, + ) + + validatedError := internalgrpc.ValidateGrpcError(err) + + if validatedError != nil { + slog.ErrorContext(ctx, "Failed to send update file stream header", "error", validatedError) + + return validatedError + } + + return nil + } + + return backoff.Retry(sendUpdateFileHeader, backoffHelpers.Context(backOffCtx, fso.agentConfig.Client.Backoff)) +} + +func (fso *FileServiceOperator) sendFileUpdateStreamChunks( + ctx context.Context, + fileToUpdate *mpi.File, + chunkSize uint32, + updateFileStreamClient grpc.ClientStreamingClient[mpi.FileDataChunk, mpi.UpdateFileResponse], +) error { + f, err := os.Open(fileToUpdate.GetFileMeta().GetName()) + defer func() { + closeError := f.Close() + if closeError != nil { + slog.WarnContext( + ctx, "Failed to close file", + "file", fileToUpdate.GetFileMeta().GetName(), + "error", closeError, + ) + } + }() + if err != nil { + return err + } + + var chunkID uint32 + + reader := bufio.NewReader(f) + for { + chunk, readChunkError := fso.fileOperator.ReadChunk(ctx, chunkSize, reader, chunkID) + if readChunkError != nil { + return readChunkError + } + if chunk.Content == nil { + break + } + + sendError := fso.sendFileUpdateStreamChunk(ctx, chunk, updateFileStreamClient) + if sendError != nil { + return sendError + } + + chunkID++ + } + + return nil +} + +func (fso *FileServiceOperator) sendFileUpdateStreamChunk( + ctx context.Context, + chunk mpi.FileDataChunk_Content, + updateFileStreamClient grpc.ClientStreamingClient[mpi.FileDataChunk, mpi.UpdateFileResponse], +) error { + messageMeta := &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: logger.CorrelationID(ctx), + Timestamp: timestamppb.Now(), + } + + backOffCtx, backoffCancel := context.WithTimeout(ctx, fso.agentConfig.Client.Backoff.MaxElapsedTime) + defer backoffCancel() + + sendUpdateFileChunk := func() error { + slog.DebugContext(ctx, "Sending update file stream chunk", "chunk_id", chunk.Content.GetChunkId()) + if fso.fileServiceClient == nil { + return errors.New("file service client is not initialized") + } + + if !fso.isConnected.Load() { + return errors.New("CreateConnection rpc has not being called yet") + } + + err := updateFileStreamClient.Send( + &mpi.FileDataChunk{ + Meta: messageMeta, + Chunk: &chunk, + }, + ) + + validatedError := internalgrpc.ValidateGrpcError(err) + + if validatedError != nil { + slog.ErrorContext(ctx, "Failed to send update file stream chunk", "error", validatedError) + + return validatedError + } + + return nil + } + + return backoff.Retry(sendUpdateFileChunk, backoffHelpers.Context(backOffCtx, fso.agentConfig.Client.Backoff)) +} + +func (fso *FileServiceOperator) ChunkedFile(ctx context.Context, file *mpi.File) error { + slog.DebugContext(ctx, "Getting chunked file", "file", file.GetFileMeta().GetName()) + + stream, err := fso.fileServiceClient.GetFileStream(ctx, &mpi.GetFileRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: logger.CorrelationID(ctx), + Timestamp: timestamppb.Now(), + }, + FileMeta: file.GetFileMeta(), + }) + if err != nil { + return fmt.Errorf("error getting file stream for %s: %w", file.GetFileMeta().GetName(), err) + } + + // Get header chunk first + headerChunk, recvHeaderChunkError := stream.Recv() + if recvHeaderChunkError != nil { + return recvHeaderChunkError + } + + slog.DebugContext(ctx, "File header chunk received", "header_chunk", headerChunk) + + header := headerChunk.GetHeader() + + writeChunkedFileError := fso.fileOperator.WriteChunkedFile(ctx, file, header, stream) + if writeChunkedFileError != nil { + return writeChunkedFileError + } + + return nil +} + +func (fso *FileServiceOperator) setupIdentifiers(ctx context.Context, iteration int) (context.Context, string) { + correlationID := logger.CorrelationID(ctx) + var requestCorrelationID slog.Attr + + if iteration == 0 { + requestCorrelationID = logger.GenerateCorrelationID() + } else { + requestCorrelationID = logger.CorrelationIDAttr(ctx) + } + + newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, requestCorrelationID) + + return newCtx, correlationID +} diff --git a/internal/file/file_service_operator_test.go b/internal/file/file_service_operator_test.go new file mode 100644 index 000000000..632749fbb --- /dev/null +++ b/internal/file/file_service_operator_test.go @@ -0,0 +1,163 @@ +// Copyright (c) F5, Inc. +// +// This source code is licensed under the Apache License, Version 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +package file + +import ( + "context" + "os" + "path/filepath" + "sync/atomic" + "testing" + + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/api/grpc/mpi/v1/v1fakes" + "github.com/nginx/agent/v3/pkg/files" + "github.com/nginx/agent/v3/test/helpers" + "github.com/nginx/agent/v3/test/protos" + "github.com/nginx/agent/v3/test/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFileServiceOperator_UpdateOverview(t *testing.T) { + ctx := context.Background() + + filePath := filepath.Join(t.TempDir(), "nginx.conf") + fileMeta := protos.FileMeta(filePath, "") + + fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}") + fileHash := files.GenerateHash(fileContent) + + fileWriteErr := os.WriteFile(filePath, fileContent, 0o600) + require.NoError(t, fileWriteErr) + + overview := protos.FileOverview(filePath, fileHash) + + fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} + fakeFileServiceClient.UpdateOverviewReturnsOnCall(0, &mpi.UpdateOverviewResponse{ + Overview: overview, + }, nil) + + fakeFileServiceClient.UpdateOverviewReturnsOnCall(1, &mpi.UpdateOverviewResponse{}, nil) + + fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil) + + fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient) + fileServiceOperator.SetIsConnected(true) + + err := fileServiceOperator.UpdateOverview(ctx, "123", []*mpi.File{ + { + FileMeta: fileMeta, + }, + }, 0) + + require.NoError(t, err) + assert.Equal(t, 2, fakeFileServiceClient.UpdateOverviewCallCount()) +} + +func TestFileServiceOperator_UpdateOverview_MaxIterations(t *testing.T) { + ctx := context.Background() + + filePath := filepath.Join(t.TempDir(), "nginx.conf") + fileMeta := protos.FileMeta(filePath, "") + + fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}") + fileHash := files.GenerateHash(fileContent) + + fileWriteErr := os.WriteFile(filePath, fileContent, 0o600) + require.NoError(t, fileWriteErr) + + overview := protos.FileOverview(filePath, fileHash) + + fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} + + // do 5 iterations + for i := 0; i <= 5; i++ { + fakeFileServiceClient.UpdateOverviewReturnsOnCall(i, &mpi.UpdateOverviewResponse{ + Overview: overview, + }, nil) + } + + fakeFileServiceClient.UpdateFileReturns(&mpi.UpdateFileResponse{}, nil) + + fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient) + fileServiceOperator.SetIsConnected(true) + + err := fileServiceOperator.UpdateOverview(ctx, "123", []*mpi.File{ + { + FileMeta: fileMeta, + }, + }, 0) + + require.Error(t, err) + assert.Equal(t, "too many UpdateOverview attempts", err.Error()) +} + +func TestFileManagerService_UpdateFile(t *testing.T) { + tests := []struct { + name string + isCert bool + }{ + { + name: "non-cert", + isCert: false, + }, + { + name: "cert", + isCert: true, + }, + } + + tempDir := os.TempDir() + + for _, test := range tests { + ctx := context.Background() + + testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf") + + var fileMeta *mpi.FileMeta + if test.isCert { + fileMeta = protos.CertMeta(testFile.Name(), "") + } else { + fileMeta = protos.FileMeta(testFile.Name(), "") + } + + fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} + fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient) + fileServiceOperator.SetIsConnected(true) + + err := fileServiceOperator.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta}) + + require.NoError(t, err) + assert.Equal(t, 1, fakeFileServiceClient.UpdateFileCallCount()) + + helpers.RemoveFileWithErrorCheck(t, testFile.Name()) + } +} + +func TestFileManagerService_UpdateFile_LargeFile(t *testing.T) { + ctx := context.Background() + tempDir := os.TempDir() + + testFile := helpers.CreateFileWithErrorCheck(t, tempDir, "nginx.conf") + writeFileError := os.WriteFile(testFile.Name(), []byte("#test content"), 0o600) + require.NoError(t, writeFileError) + fileMeta := protos.FileMetaLargeFile(testFile.Name(), "") + + fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} + fakeClientStreamingClient := &FakeClientStreamingClient{sendCount: atomic.Int32{}} + fakeFileServiceClient.UpdateFileStreamReturns(fakeClientStreamingClient, nil) + fileServiceOperator := NewFileServiceOperator(types.AgentConfig(), fakeFileServiceClient) + + fileServiceOperator.SetIsConnected(true) + err := fileServiceOperator.UpdateFile(ctx, "123", &mpi.File{FileMeta: fileMeta}) + + require.NoError(t, err) + assert.Equal(t, 0, fakeFileServiceClient.UpdateFileCallCount()) + assert.Equal(t, 14, int(fakeClientStreamingClient.sendCount.Load())) + + helpers.RemoveFileWithErrorCheck(t, testFile.Name()) +} diff --git a/internal/file/filefakes/fake_file_manager_service_interface.go b/internal/file/filefakes/fake_file_manager_service_interface.go index eee6577c7..3583dd166 100644 --- a/internal/file/filefakes/fake_file_manager_service_interface.go +++ b/internal/file/filefakes/fake_file_manager_service_interface.go @@ -28,6 +28,24 @@ type FakeFileManagerServiceInterface struct { result1 model.WriteStatus result2 error } + ConfigUpdateStub func(context.Context, *model.NginxConfigContext) + configUpdateMutex sync.RWMutex + configUpdateArgsForCall []struct { + arg1 context.Context + arg2 *model.NginxConfigContext + } + ConfigUploadStub func(context.Context, *v1.ConfigUploadRequest) error + configUploadMutex sync.RWMutex + configUploadArgsForCall []struct { + arg1 context.Context + arg2 *v1.ConfigUploadRequest + } + configUploadReturns struct { + result1 error + } + configUploadReturnsOnCall map[int]struct { + result1 error + } DetermineFileActionsStub func(context.Context, map[string]*v1.File, map[string]*model.FileCache) (map[string]*model.FileCache, map[string][]byte, error) determineFileActionsMutex sync.RWMutex determineFileActionsArgsForCall []struct { @@ -85,33 +103,6 @@ type FakeFileManagerServiceInterface struct { updateCurrentFilesOnDiskReturnsOnCall map[int]struct { result1 error } - UpdateFileStub func(context.Context, string, *v1.File) error - updateFileMutex sync.RWMutex - updateFileArgsForCall []struct { - arg1 context.Context - arg2 string - arg3 *v1.File - } - updateFileReturns struct { - result1 error - } - updateFileReturnsOnCall map[int]struct { - result1 error - } - UpdateOverviewStub func(context.Context, string, []*v1.File, int) error - updateOverviewMutex sync.RWMutex - updateOverviewArgsForCall []struct { - arg1 context.Context - arg2 string - arg3 []*v1.File - arg4 int - } - updateOverviewReturns struct { - result1 error - } - updateOverviewReturnsOnCall map[int]struct { - result1 error - } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -205,6 +196,101 @@ func (fake *FakeFileManagerServiceInterface) ConfigApplyReturnsOnCall(i int, res }{result1, result2} } +func (fake *FakeFileManagerServiceInterface) ConfigUpdate(arg1 context.Context, arg2 *model.NginxConfigContext) { + fake.configUpdateMutex.Lock() + fake.configUpdateArgsForCall = append(fake.configUpdateArgsForCall, struct { + arg1 context.Context + arg2 *model.NginxConfigContext + }{arg1, arg2}) + stub := fake.ConfigUpdateStub + fake.recordInvocation("ConfigUpdate", []interface{}{arg1, arg2}) + fake.configUpdateMutex.Unlock() + if stub != nil { + fake.ConfigUpdateStub(arg1, arg2) + } +} + +func (fake *FakeFileManagerServiceInterface) ConfigUpdateCallCount() int { + fake.configUpdateMutex.RLock() + defer fake.configUpdateMutex.RUnlock() + return len(fake.configUpdateArgsForCall) +} + +func (fake *FakeFileManagerServiceInterface) ConfigUpdateCalls(stub func(context.Context, *model.NginxConfigContext)) { + fake.configUpdateMutex.Lock() + defer fake.configUpdateMutex.Unlock() + fake.ConfigUpdateStub = stub +} + +func (fake *FakeFileManagerServiceInterface) ConfigUpdateArgsForCall(i int) (context.Context, *model.NginxConfigContext) { + fake.configUpdateMutex.RLock() + defer fake.configUpdateMutex.RUnlock() + argsForCall := fake.configUpdateArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeFileManagerServiceInterface) ConfigUpload(arg1 context.Context, arg2 *v1.ConfigUploadRequest) error { + fake.configUploadMutex.Lock() + ret, specificReturn := fake.configUploadReturnsOnCall[len(fake.configUploadArgsForCall)] + fake.configUploadArgsForCall = append(fake.configUploadArgsForCall, struct { + arg1 context.Context + arg2 *v1.ConfigUploadRequest + }{arg1, arg2}) + stub := fake.ConfigUploadStub + fakeReturns := fake.configUploadReturns + fake.recordInvocation("ConfigUpload", []interface{}{arg1, arg2}) + fake.configUploadMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeFileManagerServiceInterface) ConfigUploadCallCount() int { + fake.configUploadMutex.RLock() + defer fake.configUploadMutex.RUnlock() + return len(fake.configUploadArgsForCall) +} + +func (fake *FakeFileManagerServiceInterface) ConfigUploadCalls(stub func(context.Context, *v1.ConfigUploadRequest) error) { + fake.configUploadMutex.Lock() + defer fake.configUploadMutex.Unlock() + fake.ConfigUploadStub = stub +} + +func (fake *FakeFileManagerServiceInterface) ConfigUploadArgsForCall(i int) (context.Context, *v1.ConfigUploadRequest) { + fake.configUploadMutex.RLock() + defer fake.configUploadMutex.RUnlock() + argsForCall := fake.configUploadArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeFileManagerServiceInterface) ConfigUploadReturns(result1 error) { + fake.configUploadMutex.Lock() + defer fake.configUploadMutex.Unlock() + fake.ConfigUploadStub = nil + fake.configUploadReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeFileManagerServiceInterface) ConfigUploadReturnsOnCall(i int, result1 error) { + fake.configUploadMutex.Lock() + defer fake.configUploadMutex.Unlock() + fake.ConfigUploadStub = nil + if fake.configUploadReturnsOnCall == nil { + fake.configUploadReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.configUploadReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeFileManagerServiceInterface) DetermineFileActions(arg1 context.Context, arg2 map[string]*v1.File, arg3 map[string]*model.FileCache) (map[string]*model.FileCache, map[string][]byte, error) { fake.determineFileActionsMutex.Lock() ret, specificReturn := fake.determineFileActionsReturnsOnCall[len(fake.determineFileActionsArgsForCall)] @@ -484,138 +570,6 @@ func (fake *FakeFileManagerServiceInterface) UpdateCurrentFilesOnDiskReturnsOnCa }{result1} } -func (fake *FakeFileManagerServiceInterface) UpdateFile(arg1 context.Context, arg2 string, arg3 *v1.File) error { - fake.updateFileMutex.Lock() - ret, specificReturn := fake.updateFileReturnsOnCall[len(fake.updateFileArgsForCall)] - fake.updateFileArgsForCall = append(fake.updateFileArgsForCall, struct { - arg1 context.Context - arg2 string - arg3 *v1.File - }{arg1, arg2, arg3}) - stub := fake.UpdateFileStub - fakeReturns := fake.updateFileReturns - fake.recordInvocation("UpdateFile", []interface{}{arg1, arg2, arg3}) - fake.updateFileMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeFileManagerServiceInterface) UpdateFileCallCount() int { - fake.updateFileMutex.RLock() - defer fake.updateFileMutex.RUnlock() - return len(fake.updateFileArgsForCall) -} - -func (fake *FakeFileManagerServiceInterface) UpdateFileCalls(stub func(context.Context, string, *v1.File) error) { - fake.updateFileMutex.Lock() - defer fake.updateFileMutex.Unlock() - fake.UpdateFileStub = stub -} - -func (fake *FakeFileManagerServiceInterface) UpdateFileArgsForCall(i int) (context.Context, string, *v1.File) { - fake.updateFileMutex.RLock() - defer fake.updateFileMutex.RUnlock() - argsForCall := fake.updateFileArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *FakeFileManagerServiceInterface) UpdateFileReturns(result1 error) { - fake.updateFileMutex.Lock() - defer fake.updateFileMutex.Unlock() - fake.UpdateFileStub = nil - fake.updateFileReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeFileManagerServiceInterface) UpdateFileReturnsOnCall(i int, result1 error) { - fake.updateFileMutex.Lock() - defer fake.updateFileMutex.Unlock() - fake.UpdateFileStub = nil - if fake.updateFileReturnsOnCall == nil { - fake.updateFileReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.updateFileReturnsOnCall[i] = struct { - result1 error - }{result1} -} - -func (fake *FakeFileManagerServiceInterface) UpdateOverview(arg1 context.Context, arg2 string, arg3 []*v1.File, arg4 int) error { - var arg3Copy []*v1.File - if arg3 != nil { - arg3Copy = make([]*v1.File, len(arg3)) - copy(arg3Copy, arg3) - } - fake.updateOverviewMutex.Lock() - ret, specificReturn := fake.updateOverviewReturnsOnCall[len(fake.updateOverviewArgsForCall)] - fake.updateOverviewArgsForCall = append(fake.updateOverviewArgsForCall, struct { - arg1 context.Context - arg2 string - arg3 []*v1.File - arg4 int - }{arg1, arg2, arg3Copy, arg4}) - stub := fake.UpdateOverviewStub - fakeReturns := fake.updateOverviewReturns - fake.recordInvocation("UpdateOverview", []interface{}{arg1, arg2, arg3Copy, arg4}) - fake.updateOverviewMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3, arg4) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeFileManagerServiceInterface) UpdateOverviewCallCount() int { - fake.updateOverviewMutex.RLock() - defer fake.updateOverviewMutex.RUnlock() - return len(fake.updateOverviewArgsForCall) -} - -func (fake *FakeFileManagerServiceInterface) UpdateOverviewCalls(stub func(context.Context, string, []*v1.File, int) error) { - fake.updateOverviewMutex.Lock() - defer fake.updateOverviewMutex.Unlock() - fake.UpdateOverviewStub = stub -} - -func (fake *FakeFileManagerServiceInterface) UpdateOverviewArgsForCall(i int) (context.Context, string, []*v1.File, int) { - fake.updateOverviewMutex.RLock() - defer fake.updateOverviewMutex.RUnlock() - argsForCall := fake.updateOverviewArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 -} - -func (fake *FakeFileManagerServiceInterface) UpdateOverviewReturns(result1 error) { - fake.updateOverviewMutex.Lock() - defer fake.updateOverviewMutex.Unlock() - fake.UpdateOverviewStub = nil - fake.updateOverviewReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeFileManagerServiceInterface) UpdateOverviewReturnsOnCall(i int, result1 error) { - fake.updateOverviewMutex.Lock() - defer fake.updateOverviewMutex.Unlock() - fake.UpdateOverviewStub = nil - if fake.updateOverviewReturnsOnCall == nil { - fake.updateOverviewReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.updateOverviewReturnsOnCall[i] = struct { - result1 error - }{result1} -} - func (fake *FakeFileManagerServiceInterface) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -623,6 +577,10 @@ func (fake *FakeFileManagerServiceInterface) Invocations() map[string][][]interf defer fake.clearCacheMutex.RUnlock() fake.configApplyMutex.RLock() defer fake.configApplyMutex.RUnlock() + fake.configUpdateMutex.RLock() + defer fake.configUpdateMutex.RUnlock() + fake.configUploadMutex.RLock() + defer fake.configUploadMutex.RUnlock() fake.determineFileActionsMutex.RLock() defer fake.determineFileActionsMutex.RUnlock() fake.isConnectedMutex.RLock() @@ -633,10 +591,6 @@ func (fake *FakeFileManagerServiceInterface) Invocations() map[string][][]interf defer fake.setIsConnectedMutex.RUnlock() fake.updateCurrentFilesOnDiskMutex.RLock() defer fake.updateCurrentFilesOnDiskMutex.RUnlock() - fake.updateFileMutex.RLock() - defer fake.updateFileMutex.RUnlock() - fake.updateOverviewMutex.RLock() - defer fake.updateOverviewMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/internal/file/filefakes/fake_file_operator.go b/internal/file/filefakes/fake_file_operator.go index 0f064b5a1..77445acdc 100644 --- a/internal/file/filefakes/fake_file_operator.go +++ b/internal/file/filefakes/fake_file_operator.go @@ -8,6 +8,7 @@ import ( "sync" v1 "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/model" "google.golang.org/grpc" ) @@ -68,6 +69,19 @@ type FakeFileOperator struct { writeChunkedFileReturnsOnCall map[int]struct { result1 error } + WriteManifestFileStub func(map[string]*model.ManifestFile, string, string) error + writeManifestFileMutex sync.RWMutex + writeManifestFileArgsForCall []struct { + arg1 map[string]*model.ManifestFile + arg2 string + arg3 string + } + writeManifestFileReturns struct { + result1 error + } + writeManifestFileReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -334,6 +348,69 @@ func (fake *FakeFileOperator) WriteChunkedFileReturnsOnCall(i int, result1 error }{result1} } +func (fake *FakeFileOperator) WriteManifestFile(arg1 map[string]*model.ManifestFile, arg2 string, arg3 string) error { + fake.writeManifestFileMutex.Lock() + ret, specificReturn := fake.writeManifestFileReturnsOnCall[len(fake.writeManifestFileArgsForCall)] + fake.writeManifestFileArgsForCall = append(fake.writeManifestFileArgsForCall, struct { + arg1 map[string]*model.ManifestFile + arg2 string + arg3 string + }{arg1, arg2, arg3}) + stub := fake.WriteManifestFileStub + fakeReturns := fake.writeManifestFileReturns + fake.recordInvocation("WriteManifestFile", []interface{}{arg1, arg2, arg3}) + fake.writeManifestFileMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeFileOperator) WriteManifestFileCallCount() int { + fake.writeManifestFileMutex.RLock() + defer fake.writeManifestFileMutex.RUnlock() + return len(fake.writeManifestFileArgsForCall) +} + +func (fake *FakeFileOperator) WriteManifestFileCalls(stub func(map[string]*model.ManifestFile, string, string) error) { + fake.writeManifestFileMutex.Lock() + defer fake.writeManifestFileMutex.Unlock() + fake.WriteManifestFileStub = stub +} + +func (fake *FakeFileOperator) WriteManifestFileArgsForCall(i int) (map[string]*model.ManifestFile, string, string) { + fake.writeManifestFileMutex.RLock() + defer fake.writeManifestFileMutex.RUnlock() + argsForCall := fake.writeManifestFileArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeFileOperator) WriteManifestFileReturns(result1 error) { + fake.writeManifestFileMutex.Lock() + defer fake.writeManifestFileMutex.Unlock() + fake.WriteManifestFileStub = nil + fake.writeManifestFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeFileOperator) WriteManifestFileReturnsOnCall(i int, result1 error) { + fake.writeManifestFileMutex.Lock() + defer fake.writeManifestFileMutex.Unlock() + fake.WriteManifestFileStub = nil + if fake.writeManifestFileReturnsOnCall == nil { + fake.writeManifestFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeManifestFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeFileOperator) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -345,6 +422,8 @@ func (fake *FakeFileOperator) Invocations() map[string][][]interface{} { defer fake.writeMutex.RUnlock() fake.writeChunkedFileMutex.RLock() defer fake.writeChunkedFileMutex.RUnlock() + fake.writeManifestFileMutex.RLock() + defer fake.writeManifestFileMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/internal/plugin/plugin_manager.go b/internal/plugin/plugin_manager.go index 96b0b1a86..30bcfc637 100644 --- a/internal/plugin/plugin_manager.go +++ b/internal/plugin/plugin_manager.go @@ -51,7 +51,7 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo } else { commandPlugin := command.NewCommandPlugin(agentConfig, grpcConnection, model.Command) plugins = append(plugins, commandPlugin) - filePlugin := file.NewFilePlugin(agentConfig, grpcConnection) + filePlugin := file.NewFilePlugin(agentConfig, grpcConnection, model.Command) plugins = append(plugins, filePlugin) } } else { @@ -72,7 +72,8 @@ func addAuxiliaryCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin } else { auxCommandPlugin := command.NewCommandPlugin(agentConfig, auxGRPCConnection, model.Auxiliary) plugins = append(plugins, auxCommandPlugin) - // Followup PR to add read plugin eventually + readFilePlugin := file.NewFilePlugin(agentConfig, auxGRPCConnection, model.Auxiliary) + plugins = append(plugins, readFilePlugin) } } else { slog.DebugContext(ctx, "Agent is not connected to an auxiliary management plane. "+ diff --git a/internal/plugin/plugin_manager_test.go b/internal/plugin/plugin_manager_test.go index ff0dca6d0..53fe3663b 100644 --- a/internal/plugin/plugin_manager_test.go +++ b/internal/plugin/plugin_manager_test.go @@ -48,12 +48,21 @@ func TestLoadPlugins(t *testing.T) { Type: config.Grpc, }, }, + AuxiliaryCommand: &config.Command{ + Server: &config.ServerConfig{ + Host: "test.connect", + Port: 443, + Type: config.Grpc, + }, + }, Features: config.DefaultFeatures(), }, expected: []bus.Plugin{ &resource.Resource{}, &command.CommandPlugin{}, &file.FilePlugin{}, + &command.CommandPlugin{}, + &file.FilePlugin{}, &watcher.Watcher{}, }, },