diff --git a/.golangci.yml b/.golangci.yml index 5d0832068..c364617dd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -6,9 +6,11 @@ linters: - copyloopvar - cyclop - dupl + - decorder - errorlint - exhaustive - forcetypeassert + - funcorder - gocheckcompilerdirectives - gocognit - goconst @@ -21,8 +23,10 @@ linters: - grouper - importas - inamedparam + - intrange - ireturn - lll + - loggercheck - maintidx - makezero - mirror @@ -35,7 +39,9 @@ linters: - nilnil - nlreturn - noctx +# - nolintlint - nosprintfhostport + - perfsprint - prealloc - predeclared - promlinter @@ -44,6 +50,7 @@ linters: - revive - rowserrcheck - sloglint + - spancheck - sqlclosecheck - staticcheck - tagalign @@ -52,6 +59,7 @@ linters: - thelper - unconvert - unparam + - usetesting - usestdlibvars - wastedassign - whitespace @@ -209,6 +217,19 @@ linters: - chan nlreturn: block-size: 2 +# nolintlint: +# # Disable to ensure that all nolint directives actually have an effect. +# # Default: false +# allow-unused: true +# # Exclude following linters from requiring an explanation. +# # Default: [] +# allow-no-explanation: [ ] +# # Enable to require an explanation of nonzero length after each nolint directive. +# # Default: false +# require-explanation: true +# # Enable to require nolint directives to mention the specific linter being suppressed. +# # Default: false +# require-specific: true prealloc: simple: true range-loops: true diff --git a/internal/bus/message_pipe.go b/internal/bus/message_pipe.go index 0cba3a173..aabba3dfb 100644 --- a/internal/bus/message_pipe.go +++ b/internal/bus/message_pipe.go @@ -147,6 +147,16 @@ func (p *MessagePipe) IsPluginRegistered(pluginName string) bool { return isPluginRegistered } +func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int { + for index, plugin := range plugins { + if pluginName == plugin.Info().Name { + return index + } + } + + return -1 +} + func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin Plugin) error { if index != -1 { p.plugins = append(p.plugins[:index], p.plugins[index+1:]...) @@ -181,16 +191,6 @@ func (p *MessagePipe) findPlugins(pluginNames []string) []Plugin { return plugins } -func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int { - for index, plugin := range plugins { - if pluginName == plugin.Info().Name { - return index - } - } - - return -1 -} - func (p *MessagePipe) initPlugins(ctx context.Context) { for index, plugin := range p.plugins { err := plugin.Init(ctx, p) diff --git a/internal/collector/logsgzipprocessor/processor.go b/internal/collector/logsgzipprocessor/processor.go index ce3232ab1..e8d1ca1de 100644 --- a/internal/collector/logsgzipprocessor/processor.go +++ b/internal/collector/logsgzipprocessor/processor.go @@ -95,6 +95,22 @@ func (p *logsGzipProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error return p.nextConsumer.ConsumeLogs(ctx, ld) } +func (p *logsGzipProcessor) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (p *logsGzipProcessor) Start(ctx context.Context, _ component.Host) error { + p.settings.Logger.Info("Starting logs gzip processor") + return nil +} + +func (p *logsGzipProcessor) Shutdown(ctx context.Context) error { + p.settings.Logger.Info("Shutting down logs gzip processor") + return nil +} + func (p *logsGzipProcessor) processLogRecords(logRecords plog.LogRecordSlice) error { var errs error // Filter out unsupported data types in the log before processing @@ -164,19 +180,3 @@ func (p *logsGzipProcessor) gzipCompress(data []byte) ([]byte, error) { return buf.Bytes(), nil } - -func (p *logsGzipProcessor) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } -} - -func (p *logsGzipProcessor) Start(ctx context.Context, _ component.Host) error { - p.settings.Logger.Info("Starting logs gzip processor") - return nil -} - -func (p *logsGzipProcessor) Shutdown(ctx context.Context) error { - p.settings.Logger.Info("Shutting down logs gzip processor") - return nil -} diff --git a/internal/collector/logsgzipprocessor/processor_benchmark_test.go b/internal/collector/logsgzipprocessor/processor_benchmark_test.go index 4bb9ccab5..82477aef7 100644 --- a/internal/collector/logsgzipprocessor/processor_benchmark_test.go +++ b/internal/collector/logsgzipprocessor/processor_benchmark_test.go @@ -20,7 +20,7 @@ func generateLogs(numRecords, recordSize int) plog.Logs { logs := plog.NewLogs() rl := logs.ResourceLogs().AppendEmpty() sl := rl.ScopeLogs().AppendEmpty() - for i := 0; i < numRecords; i++ { + for range numRecords { lr := sl.LogRecords().AppendEmpty() content, _ := randomString(recordSize) lr.Body().SetStr(content) @@ -64,7 +64,7 @@ func BenchmarkGzipProcessor(b *testing.B) { logs := generateLogs(bm.numRecords, bm.recordSize) b.ResetTimer() - for i := 0; i < b.N; i++ { + for range b.N { _ = p.ConsumeLogs(context.Background(), logs) } }) diff --git a/internal/collector/nginxossreceiver/internal/scraper/accesslog/nginx_log_scraper.go b/internal/collector/nginxossreceiver/internal/scraper/accesslog/nginx_log_scraper.go index e8af8cd1e..0e0b59b6b 100644 --- a/internal/collector/nginxossreceiver/internal/scraper/accesslog/nginx_log_scraper.go +++ b/internal/collector/nginxossreceiver/internal/scraper/accesslog/nginx_log_scraper.go @@ -224,6 +224,12 @@ func (nls *NginxLogScraper) Shutdown(_ context.Context) error { return err } +func (nls *NginxLogScraper) ConsumerCallback(_ context.Context, entries []*entry.Entry) { + nls.mut.Lock() + nls.entries = append(nls.entries, entries...) + nls.mut.Unlock() +} + func (nls *NginxLogScraper) initStanzaPipeline( operators []operator.Config, logger *zap.Logger, @@ -248,12 +254,6 @@ func (nls *NginxLogScraper) initStanzaPipeline( return pipe, err } -func (nls *NginxLogScraper) ConsumerCallback(_ context.Context, entries []*entry.Entry) { - nls.mut.Lock() - nls.entries = append(nls.entries, entries...) - nls.mut.Unlock() -} - func (nls *NginxLogScraper) runConsumer(ctx context.Context) { nls.logger.Info("Starting NGINX access log receiver's consumer") defer nls.wg.Done() diff --git a/internal/collector/nginxossreceiver/internal/scraper/accesslog/operator/input/file/config.go b/internal/collector/nginxossreceiver/internal/scraper/accesslog/operator/input/file/config.go index bdb9b9030..803eeee62 100644 --- a/internal/collector/nginxossreceiver/internal/scraper/accesslog/operator/input/file/config.go +++ b/internal/collector/nginxossreceiver/internal/scraper/accesslog/operator/input/file/config.go @@ -21,6 +21,13 @@ import ( const operatorType = "access_log_file_input" +// Config is the configuration of a file input operator +type Config struct { + helper.InputConfig `mapstructure:",squash"` + AccessLogFormat string `mapstructure:"access_log_format"` + fileconsumer.Config `mapstructure:",squash"` +} + func init() { operator.Register(operatorType, func() operator.Builder { return NewConfig() }) } @@ -38,13 +45,6 @@ func NewConfigWithID(operatorID string) *Config { } } -// Config is the configuration of a file input operator -type Config struct { - helper.InputConfig `mapstructure:",squash"` - AccessLogFormat string `mapstructure:"access_log_format"` - fileconsumer.Config `mapstructure:",squash"` -} - // Build will build a file input operator from the supplied configuration // nolint: ireturn func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error) { diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index ec83790e7..a1c6a9876 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -136,6 +136,63 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro return nil } +// Info the plugin. +func (oc *Collector) Info() *bus.Info { + return &bus.Info{ + Name: "collector", + } +} + +// Close the plugin. +func (oc *Collector) Close(ctx context.Context) error { + slog.InfoContext(ctx, "Closing OTel Collector plugin") + + if !oc.stopped { + slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState()) + oc.service.Shutdown() + oc.cancel() + + settings := *oc.config.Client.Backoff + settings.MaxElapsedTime = maxTimeToWaitForShutdown + err := backoff.WaitUntil(ctx, &settings, func() error { + if oc.service.GetState() == otelcol.StateClosed { + return nil + } + + return errors.New("OTel Collector not in a closed state yet") + }) + + if err != nil { + slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState()) + } else { + slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState()) + oc.stopped = true + } + } + + return nil +} + +// Process an incoming Message Bus message in the plugin +func (oc *Collector) Process(ctx context.Context, msg *bus.Message) { + switch msg.Topic { + case bus.NginxConfigUpdateTopic: + oc.handleNginxConfigUpdate(ctx, msg) + case bus.ResourceUpdateTopic: + oc.handleResourceUpdate(ctx, msg) + default: + slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic) + } +} + +// Subscriptions returns the list of topics the plugin is subscribed to +func (oc *Collector) Subscriptions() []string { + return []string{ + bus.ResourceUpdateTopic, + bus.NginxConfigUpdateTopic, + } +} + // Process receivers and log warning for sub-optimal configurations func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) { for _, receiver := range receivers { @@ -167,7 +224,7 @@ func (oc *Collector) bootup(ctx context.Context) error { go func() { if oc.service == nil { - errChan <- fmt.Errorf("unable to start OTel collector: service is nil") + errChan <- errors.New("unable to start OTel collector: service is nil") return } @@ -184,7 +241,7 @@ func (oc *Collector) bootup(ctx context.Context) error { return err default: if oc.service == nil { - return fmt.Errorf("unable to start otel collector: service is nil") + return errors.New("unable to start otel collector: service is nil") } state := oc.service.GetState() @@ -205,63 +262,6 @@ func (oc *Collector) bootup(ctx context.Context) error { } } -// Info the plugin. -func (oc *Collector) Info() *bus.Info { - return &bus.Info{ - Name: "collector", - } -} - -// Close the plugin. -func (oc *Collector) Close(ctx context.Context) error { - slog.InfoContext(ctx, "Closing OTel Collector plugin") - - if !oc.stopped { - slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState()) - oc.service.Shutdown() - oc.cancel() - - settings := *oc.config.Client.Backoff - settings.MaxElapsedTime = maxTimeToWaitForShutdown - err := backoff.WaitUntil(ctx, &settings, func() error { - if oc.service.GetState() == otelcol.StateClosed { - return nil - } - - return errors.New("OTel Collector not in a closed state yet") - }) - - if err != nil { - slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState()) - } else { - slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState()) - oc.stopped = true - } - } - - return nil -} - -// Process an incoming Message Bus message in the plugin -func (oc *Collector) Process(ctx context.Context, msg *bus.Message) { - switch msg.Topic { - case bus.NginxConfigUpdateTopic: - oc.handleNginxConfigUpdate(ctx, msg) - case bus.ResourceUpdateTopic: - oc.handleResourceUpdate(ctx, msg) - default: - slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic) - } -} - -// Subscriptions returns the list of topics the plugin is subscribed to -func (oc *Collector) Subscriptions() []string { - return []string{ - bus.ResourceUpdateTopic, - bus.NginxConfigUpdateTopic, - } -} - func (oc *Collector) handleNginxConfigUpdate(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "OTel collector plugin received nginx config update message") oc.mu.Lock() diff --git a/internal/collector/otel_collector_plugin_test.go b/internal/collector/otel_collector_plugin_test.go index a767c62cc..d625d13b7 100644 --- a/internal/collector/otel_collector_plugin_test.go +++ b/internal/collector/otel_collector_plugin_test.go @@ -8,7 +8,6 @@ import ( "bytes" "context" "errors" - "fmt" "path/filepath" "testing" @@ -252,7 +251,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { if len(test.receivers.NginxPlusReceivers) == 1 { apiDetails := config.APIDetails{ - URL: fmt.Sprintf("%s/api", nginxPlusMock.URL), + URL: nginxPlusMock.URL + "/api", Listen: "", Location: "", } @@ -270,7 +269,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { model.PlusAPI.Location = apiDetails.Location } else { apiDetails := config.APIDetails{ - URL: fmt.Sprintf("%s/stub_status", nginxPlusMock.URL), + URL: nginxPlusMock.URL + "/stub_status", Listen: "", Location: "", } diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index 32797cf47..263ac26bf 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -132,6 +132,16 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) { } } +func (cp *CommandPlugin) Subscriptions() []string { + return []string{ + bus.ConnectionResetTopic, + bus.ResourceUpdateTopic, + bus.InstanceHealthTopic, + bus.DataPlaneHealthResponseTopic, + bus.DataPlaneResponseTopic, + } +} + func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "Command plugin received resource update message") if resource, ok := msg.Data.(*mpi.Resource); ok { @@ -233,16 +243,6 @@ func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Me } } -func (cp *CommandPlugin) Subscriptions() []string { - return []string{ - bus.ConnectionResetTopic, - bus.ResourceUpdateTopic, - bus.InstanceHealthTopic, - bus.DataPlaneHealthResponseTopic, - bus.DataPlaneResponseTopic, - } -} - // nolint: revive, cyclop func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) { for { diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 15b35604c..f0edc3615 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -330,7 +330,7 @@ func (cs *CommandService) sendResponseForQueuedConfigApplyRequests( indexOfConfigApplyRequest int, ) error { instanceID := response.GetInstanceId() - for i := 0; i < indexOfConfigApplyRequest; i++ { + for i := range indexOfConfigApplyRequest { newResponse := response newResponse.GetMessageMeta().MessageId = id.GenerateMessageID() @@ -464,7 +464,7 @@ func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, e return nil } - slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s", errorMsg), "error", err) + slog.ErrorContext(ctx, "Failed to"+errorMsg, "error", err) return err } diff --git a/internal/config/config.go b/internal/config/config.go index 39393bd47..0447f834e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -602,7 +602,7 @@ func seekFileInPaths(fileName string, directories ...string) (string, error) { } } - return "", fmt.Errorf("a valid configuration has not been found in any of the search paths") + return "", errors.New("a valid configuration has not been found in any of the search paths") } func configFilePaths() []string { diff --git a/internal/datasource/cert/cert.go b/internal/datasource/cert/cert.go index 95dd257ef..f640c9f5a 100644 --- a/internal/datasource/cert/cert.go +++ b/internal/datasource/cert/cert.go @@ -8,7 +8,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/pem" - "fmt" + "errors" "os" ) @@ -37,7 +37,7 @@ func LoadCertificate(certPath string) (*x509.Certificate, error) { block, _ := pem.Decode(fileContents) if block == nil || block.Type != "CERTIFICATE" { - return nil, fmt.Errorf("failed to decode PEM block containing the certificate") + return nil, errors.New("failed to decode PEM block containing the certificate") } cert, err := x509.ParseCertificate(block.Bytes) diff --git a/internal/datasource/config/nginx_config_parser.go b/internal/datasource/config/nginx_config_parser.go index 837f333a1..ebdef5441 100644 --- a/internal/datasource/config/nginx_config_parser.go +++ b/internal/datasource/config/nginx_config_parser.go @@ -510,10 +510,10 @@ func (ncp *NginxConfigParser) apiCallback(ctx context.Context, parent, for _, url := range urls { if ncp.pingAPIEndpoint(ctx, url, apiType) { - slog.DebugContext(ctx, fmt.Sprintf("%s found", apiType), "url", url) + slog.DebugContext(ctx, apiType+" found", "url", url) return url } - slog.DebugContext(ctx, fmt.Sprintf("%s is not reachable", apiType), "url", url) + slog.DebugContext(ctx, apiType+" is not reachable", "url", url) } return &model.APIDetails{ @@ -548,7 +548,7 @@ func (ncp *NginxConfigParser) pingAPIEndpoint(ctx context.Context, statusAPIDeta } if resp.StatusCode != http.StatusOK { - slog.DebugContext(ctx, fmt.Sprintf("%s API responded with unexpected status code", apiType), "status_code", + slog.DebugContext(ctx, apiType+" API responded with unexpected status code", "status_code", resp.StatusCode, "expected", http.StatusOK) return false diff --git a/internal/datasource/config/nginx_config_parser_benchmark_test.go b/internal/datasource/config/nginx_config_parser_benchmark_test.go index 74e1e549b..bfd3e2651 100644 --- a/internal/datasource/config/nginx_config_parser_benchmark_test.go +++ b/internal/datasource/config/nginx_config_parser_benchmark_test.go @@ -45,7 +45,7 @@ func BenchmarkNginxConfigParser_Parse(b *testing.B) { bb.ResetTimer() - for i := 0; i < bb.N; i++ { + for range bb.N { _, err := nginxConfigParser.Parse( ctx, &mpi.Instance{ @@ -106,7 +106,7 @@ func BenchmarkNginxConfigParserGeneratedConfig_Parse(b *testing.B) { bb.ResetTimer() - for i := 0; i < bb.N; i++ { + for range bb.N { _, parseErr := nginxConfigParser.Parse( ctx, &mpi.Instance{ diff --git a/internal/datasource/config/nginx_config_parser_test.go b/internal/datasource/config/nginx_config_parser_test.go index caac63f82..18e85e447 100644 --- a/internal/datasource/config/nginx_config_parser_test.go +++ b/internal/datasource/config/nginx_config_parser_test.go @@ -1377,7 +1377,7 @@ func TestNginxConfigParser_checkDuplicate(t *testing.T) { } func protoListEqual(protoListA, protoListB []*mpi.File) bool { - for i := 0; i < len(protoListA); i++ { + for i := range protoListA { res := proto.Equal(protoListA[i], protoListB[i]) if !res { return false diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index b38d0bf24..8953aaf5a 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -133,7 +133,7 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context, fileOverview := configApplyRequest.GetOverview() if fileOverview == nil { - return model.Error, fmt.Errorf("fileOverview is nil") + return model.Error, errors.New("fileOverview is nil") } allowedErr := fms.checkAllowedDirectory(fileOverview.GetFiles()) @@ -221,50 +221,6 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) return nil } -func (fms *FileManagerService) executeFileActions(ctx context.Context) error { - for _, fileAction := range fms.fileActions { - switch fileAction.Action { - case model.Delete: - slog.DebugContext(ctx, "File action, deleting file", "file", fileAction.File.GetFileMeta().GetName()) - if err := os.Remove(fileAction.File.GetFileMeta().GetName()); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("error deleting file: %s error: %w", - fileAction.File.GetFileMeta().GetName(), err) - } - - continue - case model.Add, model.Update: - slog.DebugContext(ctx, "File action, add or update file", "file", fileAction.File.GetFileMeta().GetName()) - updateErr := fms.fileUpdate(ctx, fileAction.File) - if updateErr != nil { - return updateErr - } - case model.Unchanged: - slog.DebugContext(ctx, "File unchanged") - } - } - - return nil -} - -func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) error { - if file.GetFileMeta().GetSize() <= int64(fms.agentConfig.Client.Grpc.MaxFileSize) { - return fms.fileServiceOperator.File(ctx, file, fms.fileActions) - } - - return fms.fileServiceOperator.ChunkedFile(ctx, file) -} - -func (fms *FileManagerService) checkAllowedDirectory(checkFiles []*mpi.File) error { - for _, file := range checkFiles { - allowed := fms.agentConfig.IsDirectoryAllowed(file.GetFileMeta().GetName()) - if !allowed { - return fmt.Errorf("file not in allowed directories %s", file.GetFileMeta().GetName()) - } - } - - return nil -} - func (fms *FileManagerService) ConfigUpdate(ctx context.Context, nginxConfigContext *model.NginxConfigContext, ) { @@ -488,6 +444,50 @@ func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, m return manifestFiles, fileMap, nil } +func (fms *FileManagerService) executeFileActions(ctx context.Context) error { + for _, fileAction := range fms.fileActions { + switch fileAction.Action { + case model.Delete: + slog.DebugContext(ctx, "File action, deleting file", "file", fileAction.File.GetFileMeta().GetName()) + if err := os.Remove(fileAction.File.GetFileMeta().GetName()); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("error deleting file: %s error: %w", + fileAction.File.GetFileMeta().GetName(), err) + } + + continue + case model.Add, model.Update: + slog.DebugContext(ctx, "File action, add or update file", "file", fileAction.File.GetFileMeta().GetName()) + updateErr := fms.fileUpdate(ctx, fileAction.File) + if updateErr != nil { + return updateErr + } + case model.Unchanged: + slog.DebugContext(ctx, "File unchanged") + } + } + + return nil +} + +func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) error { + if file.GetFileMeta().GetSize() <= int64(fms.agentConfig.Client.Grpc.MaxFileSize) { + return fms.fileServiceOperator.File(ctx, file, fms.fileActions) + } + + return fms.fileServiceOperator.ChunkedFile(ctx, file) +} + +func (fms *FileManagerService) checkAllowedDirectory(checkFiles []*mpi.File) error { + for _, file := range checkFiles { + allowed := fms.agentConfig.IsDirectoryAllowed(file.GetFileMeta().GetName()) + if !allowed { + return fmt.Errorf("file not in allowed directories %s", file.GetFileMeta().GetName()) + } + } + + return nil +} + func (fms *FileManagerService) convertToManifestFileMap( currentFiles map[string]*mpi.File, referenced bool, diff --git a/internal/file/file_manager_service_test.go b/internal/file/file_manager_service_test.go index 6adbf62e9..01a0f7cda 100644 --- a/internal/file/file_manager_service_test.go +++ b/internal/file/file_manager_service_test.go @@ -92,7 +92,7 @@ func TestFileManagerService_ConfigApply_Add_LargeFile(t *testing.T) { fileName: filePath, } - for i := 0; i < len(fileContent); i++ { + for i := range fileContent { fakeServerStreamingClient.chunks[uint32(i)] = []byte{fileContent[i]} } @@ -755,7 +755,7 @@ rQHX6DP4w6IwZY8JB8LS if test.certContent == "" { _, certBytes = helpers.GenerateSelfSignedCert(t) certContents := helpers.Cert{ - Name: fmt.Sprintf("%s.pem", test.certName), + Name: test.certName + ".pem", Type: "CERTIFICATE", Contents: certBytes, } diff --git a/internal/file/file_operator.go b/internal/file/file_operator.go index fe68c4c81..ee65925c8 100644 --- a/internal/file/file_operator.go +++ b/internal/file/file_operator.go @@ -103,7 +103,7 @@ func (fo *FileOperator) WriteChunkedFile( } slog.DebugContext(ctx, "Writing chunked file", "file", file.GetFileMeta().GetName()) - for i := uint32(0); i < header.GetChunks(); i++ { + for range header.GetChunks() { chunk, recvError := stream.Recv() if recvError != nil { return recvError diff --git a/internal/file/file_plugin_test.go b/internal/file/file_plugin_test.go index b14ab7cac..23403a71f 100644 --- a/internal/file/file_plugin_test.go +++ b/internal/file/file_plugin_test.go @@ -7,7 +7,7 @@ package file import ( "context" - "fmt" + "errors" "os" "sync" "testing" @@ -116,7 +116,7 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) { ctx := context.Background() tempDir := t.TempDir() - filePath := fmt.Sprintf("%s/nginx.conf", tempDir) + filePath := tempDir + "/nginx.conf" fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}") fileHash := files.GenerateHash(fileContent) @@ -143,19 +143,19 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) { }, { name: "Test 2 - Fail, Rollback", - configApplyReturnsErr: fmt.Errorf("something went wrong"), + configApplyReturnsErr: errors.New("something went wrong"), configApplyStatus: model.RollbackRequired, message: message, }, { name: "Test 3 - Fail, No Rollback", - configApplyReturnsErr: fmt.Errorf("something went wrong"), + configApplyReturnsErr: errors.New("something went wrong"), configApplyStatus: model.Error, message: message, }, { name: "Test 4 - Fail to cast payload", - configApplyReturnsErr: fmt.Errorf("something went wrong"), + configApplyReturnsErr: errors.New("something went wrong"), configApplyStatus: model.Error, message: nil, }, @@ -371,13 +371,13 @@ func TestFilePlugin_Process_ConfigApplyFailedTopic(t *testing.T) { }, { name: "Test 2 - Rollback Fail", - rollbackReturns: fmt.Errorf("something went wrong"), + rollbackReturns: errors.New("something went wrong"), instanceID: instanceID, }, { name: "Test 3 - Fail to cast payload", - rollbackReturns: fmt.Errorf("something went wrong"), + rollbackReturns: errors.New("something went wrong"), instanceID: "", }, } @@ -402,7 +402,7 @@ func TestFilePlugin_Process_ConfigApplyFailedTopic(t *testing.T) { data := &model.ConfigApplyMessage{ CorrelationID: "dfsbhj6-bc92-30c1-a9c9-85591422068e", InstanceID: test.instanceID, - Error: fmt.Errorf("something went wrong with config apply"), + Error: errors.New("something went wrong with config apply"), } filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplyFailedTopic, Data: data}) diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go index d9d1ed063..a37951b29 100644 --- a/internal/file/file_service_operator.go +++ b/internal/file/file_service_operator.go @@ -100,20 +100,6 @@ func (fso *FileServiceOperator) File(ctx context.Context, file *mpi.File, return fso.validateFileHash(file.GetFileMeta().GetName(), fileActions) } -func (fso *FileServiceOperator) validateFileHash(filePath string, fileActions map[string]*model.FileCache) error { - content, err := os.ReadFile(filePath) - if err != nil { - return err - } - fileHash := files.GenerateHash(content) - - if fileHash != fileActions[filePath].File.GetFileMeta().GetHash() { - return fmt.Errorf("error writing file, file hash does not match for file %s", filePath) - } - - return nil -} - func (fso *FileServiceOperator) UpdateOverview( ctx context.Context, instanceID string, @@ -198,25 +184,37 @@ func (fso *FileServiceOperator) UpdateOverview( return err } -func (fso *FileServiceOperator) updateFiles( - ctx context.Context, - delta map[string]*mpi.File, - instanceID string, - iteration int, -) error { - diffFiles := slices.Collect(maps.Values(delta)) +func (fso *FileServiceOperator) ChunkedFile(ctx context.Context, file *mpi.File) error { + slog.DebugContext(ctx, "Getting chunked file", "file", file.GetFileMeta().GetName()) - for _, file := range diffFiles { - updateErr := fso.UpdateFile(ctx, instanceID, file) - if updateErr != nil { - return updateErr - } + stream, err := fso.fileServiceClient.GetFileStream(ctx, &mpi.GetFileRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: logger.CorrelationID(ctx), + Timestamp: timestamppb.Now(), + }, + FileMeta: file.GetFileMeta(), + }) + if err != nil { + return fmt.Errorf("error getting file stream for %s: %w", file.GetFileMeta().GetName(), err) } - iteration++ - slog.InfoContext(ctx, "Updating file overview after file updates", "attempt_number", iteration) + // Get header chunk first + headerChunk, recvHeaderChunkError := stream.Recv() + if recvHeaderChunkError != nil { + return recvHeaderChunkError + } - return fso.UpdateOverview(ctx, instanceID, diffFiles, iteration) + slog.DebugContext(ctx, "File header chunk received", "header_chunk", headerChunk) + + header := headerChunk.GetHeader() + + writeChunkedFileError := fso.fileOperator.WriteChunkedFile(ctx, file, header, stream) + if writeChunkedFileError != nil { + return writeChunkedFileError + } + + return nil } func (fso *FileServiceOperator) UpdateFile( @@ -238,6 +236,41 @@ func (fso *FileServiceOperator) UpdateFile( return fso.sendUpdateFileStream(ctx, fileToUpdate, fso.agentConfig.Client.Grpc.FileChunkSize) } +func (fso *FileServiceOperator) validateFileHash(filePath string, fileActions map[string]*model.FileCache) error { + content, err := os.ReadFile(filePath) + if err != nil { + return err + } + fileHash := files.GenerateHash(content) + + if fileHash != fileActions[filePath].File.GetFileMeta().GetHash() { + return fmt.Errorf("error writing file, file hash does not match for file %s", filePath) + } + + return nil +} + +func (fso *FileServiceOperator) updateFiles( + ctx context.Context, + delta map[string]*mpi.File, + instanceID string, + iteration int, +) error { + diffFiles := slices.Collect(maps.Values(delta)) + + for _, file := range diffFiles { + updateErr := fso.UpdateFile(ctx, instanceID, file) + if updateErr != nil { + return updateErr + } + } + + iteration++ + slog.InfoContext(ctx, "Updating file overview after file updates", "attempt_number", iteration) + + return fso.UpdateOverview(ctx, instanceID, diffFiles, iteration) +} + func (fso *FileServiceOperator) sendUpdateFileRequest( ctx context.Context, fileToUpdate *mpi.File, @@ -307,7 +340,7 @@ func (fso *FileServiceOperator) sendUpdateFileStream( chunkSize uint32, ) error { if chunkSize == 0 { - return fmt.Errorf("file chunk size must be greater than zero") + return errors.New("file chunk size must be greater than zero") } updateFileStreamClient, err := fso.fileServiceClient.UpdateFileStream(ctx) @@ -468,39 +501,6 @@ func (fso *FileServiceOperator) sendFileUpdateStreamChunk( return backoff.Retry(sendUpdateFileChunk, backoffHelpers.Context(backOffCtx, fso.agentConfig.Client.Backoff)) } -func (fso *FileServiceOperator) ChunkedFile(ctx context.Context, file *mpi.File) error { - slog.DebugContext(ctx, "Getting chunked file", "file", file.GetFileMeta().GetName()) - - stream, err := fso.fileServiceClient.GetFileStream(ctx, &mpi.GetFileRequest{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: logger.CorrelationID(ctx), - Timestamp: timestamppb.Now(), - }, - FileMeta: file.GetFileMeta(), - }) - if err != nil { - return fmt.Errorf("error getting file stream for %s: %w", file.GetFileMeta().GetName(), err) - } - - // Get header chunk first - headerChunk, recvHeaderChunkError := stream.Recv() - if recvHeaderChunkError != nil { - return recvHeaderChunkError - } - - slog.DebugContext(ctx, "File header chunk received", "header_chunk", headerChunk) - - header := headerChunk.GetHeader() - - writeChunkedFileError := fso.fileOperator.WriteChunkedFile(ctx, file, header, stream) - if writeChunkedFileError != nil { - return writeChunkedFileError - } - - return nil -} - func (fso *FileServiceOperator) setupIdentifiers(ctx context.Context, iteration int) (context.Context, string) { correlationID := logger.CorrelationID(ctx) var requestCorrelationID slog.Attr diff --git a/internal/file/file_service_operator_test.go b/internal/file/file_service_operator_test.go index 389bb6c6b..c2f9bf22a 100644 --- a/internal/file/file_service_operator_test.go +++ b/internal/file/file_service_operator_test.go @@ -76,7 +76,7 @@ func TestFileServiceOperator_UpdateOverview_MaxIterations(t *testing.T) { fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} // do 5 iterations - for i := 0; i <= 5; i++ { + for i := range 6 { fakeFileServiceClient.UpdateOverviewReturnsOnCall(i, &mpi.UpdateOverviewResponse{ Overview: overview, }, nil) diff --git a/internal/grpc/certificates.go b/internal/grpc/certificates.go index a66c7bf24..aabc34c01 100644 --- a/internal/grpc/certificates.go +++ b/internal/grpc/certificates.go @@ -8,6 +8,7 @@ package grpc import ( "crypto/tls" "crypto/x509" + "errors" "fmt" "os" ) @@ -50,7 +51,7 @@ func appendCertKeyPair(tlsConfig *tls.Config, certFile, keyFile string) error { return nil } if certFile == "" || keyFile == "" { - return fmt.Errorf("cert and key must both be provided") + return errors.New("cert and key must both be provided") } certificate, err := tls.LoadX509KeyPair(certFile, keyFile) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index a36c146df..fe0feaf62 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -12,6 +12,7 @@ import ( "fmt" "log/slog" "net" + "strconv" "strings" "sync" @@ -82,7 +83,7 @@ func NewGrpcConnection(ctx context.Context, agentConfig *config.Config, serverAddr := net.JoinHostPort( commandConfig.Server.Host, - fmt.Sprint(commandConfig.Server.Port), + strconv.Itoa(commandConfig.Server.Port), ) slog.InfoContext(ctx, "Dialing grpc server", "server_addr", serverAddr) diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 01d0a506d..4cf10c869 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -157,7 +157,6 @@ func CorrelationIDAttr(ctx context.Context) slog.Attr { slog.DebugContext( ctx, "Correlation ID not found in context, generating new correlation ID", - "correlation_id", correlationID) return GenerateCorrelationID() diff --git a/internal/resource/nginx_instance_operator.go b/internal/resource/nginx_instance_operator.go index 1765e5c91..fa515c081 100644 --- a/internal/resource/nginx_instance_operator.go +++ b/internal/resource/nginx_instance_operator.go @@ -52,20 +52,6 @@ func (i *NginxInstanceOperator) Validate(ctx context.Context, instance *mpi.Inst return nil } -func (i *NginxInstanceOperator) validateConfigCheckResponse(out []byte) error { - if bytes.Contains(out, []byte("[emerg]")) || - bytes.Contains(out, []byte("[alert]")) || - bytes.Contains(out, []byte("[crit]")) { - return fmt.Errorf("error running nginx -t -c:\n%s", out) - } - - if i.treatWarningsAsErrors && bytes.Contains(out, []byte("[warn]")) { - return fmt.Errorf("error running nginx -t -c:\n%s", out) - } - - return nil -} - func (i *NginxInstanceOperator) Reload(ctx context.Context, instance *mpi.Instance) error { var errorsFound error slog.InfoContext(ctx, "Reloading NGINX PID", "pid", @@ -87,7 +73,7 @@ func (i *NginxInstanceOperator) Reload(ctx context.Context, instance *mpi.Instan numberOfExpectedMessages := len(errorLogs) - for i := 0; i < numberOfExpectedMessages; i++ { + for range numberOfExpectedMessages { logErr := <-logErrorChannel slog.InfoContext(ctx, "Message received in logErrorChannel", "error", logErr) if logErr != nil { @@ -105,6 +91,20 @@ func (i *NginxInstanceOperator) Reload(ctx context.Context, instance *mpi.Instan return nil } +func (i *NginxInstanceOperator) validateConfigCheckResponse(out []byte) error { + if bytes.Contains(out, []byte("[emerg]")) || + bytes.Contains(out, []byte("[alert]")) || + bytes.Contains(out, []byte("[crit]")) { + return fmt.Errorf("error running nginx -t -c:\n%s", out) + } + + if i.treatWarningsAsErrors && bytes.Contains(out, []byte("[warn]")) { + return fmt.Errorf("error running nginx -t -c:\n%s", out) + } + + return nil +} + func (i *NginxInstanceOperator) errorLogs(instance *mpi.Instance) (errorLogs []string) { if instance.GetInstanceMeta().GetInstanceType() == mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS { errorLogs = instance.GetInstanceRuntime().GetNginxPlusRuntimeInfo().GetErrorLogs() diff --git a/internal/resource/nginx_instance_operator_test.go b/internal/resource/nginx_instance_operator_test.go index 259c3f9ad..186aa05bb 100644 --- a/internal/resource/nginx_instance_operator_test.go +++ b/internal/resource/nginx_instance_operator_test.go @@ -75,7 +75,7 @@ func TestInstanceOperator_Validate(t *testing.T) { name: "Test 3: Validate Config failed", out: bytes.NewBufferString("nginx [emerg]"), err: nil, - expected: fmt.Errorf("error running nginx -t -c:\nnginx [emerg]"), + expected: errors.New("error running nginx -t -c:\nnginx [emerg]"), }, } diff --git a/internal/resource/resource_plugin_test.go b/internal/resource/resource_plugin_test.go index e6651b46f..cecb17bbb 100644 --- a/internal/resource/resource_plugin_test.go +++ b/internal/resource/resource_plugin_test.go @@ -10,7 +10,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "sort" "testing" @@ -838,7 +837,7 @@ func TestResource_Process_Rollback(t *testing.T) { Data: &model.ConfigApplyMessage{ CorrelationID: "dfsbhj6-bc92-30c1-a9c9-85591422068e", InstanceID: protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(), - Error: fmt.Errorf("something went wrong with config apply"), + Error: errors.New("something went wrong with config apply"), }, }, rollbackErr: nil, @@ -851,7 +850,7 @@ func TestResource_Process_Rollback(t *testing.T) { Data: &model.ConfigApplyMessage{ CorrelationID: "", InstanceID: protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(), - Error: fmt.Errorf("something went wrong with config apply"), + Error: errors.New("something went wrong with config apply"), }, }, rollbackErr: errors.New("error reloading"), diff --git a/internal/resource/resource_service_test.go b/internal/resource/resource_service_test.go index b60af6d23..0b82f1b18 100644 --- a/internal/resource/resource_service_test.go +++ b/internal/resource/resource_service_test.go @@ -306,23 +306,23 @@ func TestResourceService_ApplyConfig(t *testing.T) { { name: "Test 2: Failed reload", instanceID: protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(), - reloadErr: fmt.Errorf("something went wrong"), + reloadErr: errors.New("something went wrong"), validateErr: nil, - expected: fmt.Errorf("failed to reload NGINX %w", fmt.Errorf("something went wrong")), + expected: fmt.Errorf("failed to reload NGINX %w", errors.New("something went wrong")), }, { name: "Test 3: Failed validate", instanceID: protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(), reloadErr: nil, - validateErr: fmt.Errorf("something went wrong"), - expected: fmt.Errorf("failed validating config %w", fmt.Errorf("something went wrong")), + validateErr: errors.New("something went wrong"), + expected: fmt.Errorf("failed validating config %w", errors.New("something went wrong")), }, { name: "Test 4: Unknown instance ID", instanceID: "unknown", reloadErr: nil, validateErr: nil, - expected: fmt.Errorf("instance unknown not found"), + expected: errors.New("instance unknown not found"), }, } diff --git a/internal/watcher/credentials/credential_watcher_service_test.go b/internal/watcher/credentials/credential_watcher_service_test.go index 31ce8ae98..2d33ad253 100644 --- a/internal/watcher/credentials/credential_watcher_service_test.go +++ b/internal/watcher/credentials/credential_watcher_service_test.go @@ -7,7 +7,7 @@ package credentials import ( "context" - "fmt" + "errors" "os" "path" "testing" @@ -58,7 +58,7 @@ func TestCredentialWatcherService_Watch(t *testing.T) { } func() { - cws.watcher.Errors <- fmt.Errorf("watch error") + cws.watcher.Errors <- errors.New("watch error") }() } diff --git a/internal/watcher/health/health_watcher_service_test.go b/internal/watcher/health/health_watcher_service_test.go index fb3fa97f0..b2ed9b78c 100644 --- a/internal/watcher/health/health_watcher_service_test.go +++ b/internal/watcher/health/health_watcher_service_test.go @@ -7,6 +7,7 @@ package health import ( "context" + "errors" "fmt" "reflect" "testing" @@ -105,7 +106,7 @@ func TestHealthWatcherService_health(t *testing.T) { fakePlusHealthOp.HealthReturns(protos.UnhealthyInstanceHealth(), nil) fakeUnspecifiedHealthOp := healthfakes.FakeHealthWatcherOperator{} - fakeUnspecifiedHealthOp.HealthReturns(nil, fmt.Errorf("unable to determine health")) + fakeUnspecifiedHealthOp.HealthReturns(nil, errors.New("unable to determine health")) watchers[plusInstance.GetInstanceMeta().GetInstanceId()] = &fakePlusHealthOp watchers[ossInstance.GetInstanceMeta().GetInstanceId()] = &fakeOSSHealthOp diff --git a/internal/watcher/health/nginx_health_watcher_operator.go b/internal/watcher/health/nginx_health_watcher_operator.go index 45afe31e7..0fdd75618 100644 --- a/internal/watcher/health/nginx_health_watcher_operator.go +++ b/internal/watcher/health/nginx_health_watcher_operator.go @@ -51,7 +51,7 @@ func (nhw *NginxHealthWatcher) Health(ctx context.Context, instance *mpi.Instanc } if len(instance.GetInstanceRuntime().GetInstanceChildren()) == 0 { - health.Description = fmt.Sprintf("%s, instance does not have enough children", health.GetDescription()) + health.Description = health.GetDescription() + ", instance does not have enough children" health.InstanceHealthStatus = mpi.InstanceHealth_INSTANCE_HEALTH_STATUS_DEGRADED } diff --git a/internal/watcher/instance/nginx_process_parser_test.go b/internal/watcher/instance/nginx_process_parser_test.go index c2e204628..889807960 100644 --- a/internal/watcher/instance/nginx_process_parser_test.go +++ b/internal/watcher/instance/nginx_process_parser_test.go @@ -8,6 +8,7 @@ package instance import ( "bytes" "context" + "errors" "fmt" "path/filepath" "sort" @@ -124,11 +125,11 @@ func TestNginxProcessParser_Parse(t *testing.T) { }{ { name: "Test 1: NGINX open source", - nginxVersionCommandOutput: fmt.Sprintf(`nginx version: nginx/1.25.3 + nginxVersionCommandOutput: `nginx version: nginx/1.25.3 built by clang 14.0.0 (clang-1400.0.29.202) built with OpenSSL 1.1.1s 1 Nov 2022 (running with OpenSSL 1.1.1t 7 Feb 2023) TLS SNI support enabled - configure arguments: %s`, ossArgs), + configure arguments: ` + ossArgs, expected: map[string]*mpi.Instance{ protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(): protos.NginxOssInstance( []string{expectedModules}), @@ -136,12 +137,12 @@ func TestNginxProcessParser_Parse(t *testing.T) { }, { name: "Test 2: NGINX plus", - nginxVersionCommandOutput: fmt.Sprintf(` + nginxVersionCommandOutput: ` nginx version: nginx/1.25.3 (nginx-plus-r31-p1) built by gcc 9.4.0 (Ubuntu 9.4.0-1ubuntu1~20.04.2) built with OpenSSL 1.1.1f 31 Mar 2020 TLS SNI support enabled - configure arguments: %s`, plusArgs), + configure arguments: ` + plusArgs, expected: map[string]*mpi.Instance{ protos.NginxPlusInstance([]string{}).GetInstanceMeta().GetInstanceId(): protos.NginxPlusInstance( []string{expectedModules}), @@ -149,11 +150,11 @@ func TestNginxProcessParser_Parse(t *testing.T) { }, { name: "Test 3: No Modules", - nginxVersionCommandOutput: fmt.Sprintf(`nginx version: nginx/1.25.3 + nginxVersionCommandOutput: `nginx version: nginx/1.25.3 built by clang 14.0.0 (clang-1400.0.29.202) built with OpenSSL 1.1.1s 1 Nov 2022 (running with OpenSSL 1.1.1t 7 Feb 2023) TLS SNI support enabled - configure arguments: %s`, noModuleArgs), + configure arguments: ` + noModuleArgs, expected: map[string]*mpi.Instance{ protos.NginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId(): protos. NginxOssInstance(nil), @@ -196,11 +197,11 @@ func TestNginxProcessParser_Parse_Processes(t *testing.T) { configArgs := fmt.Sprintf(ossConfigArgs, modulePath) - nginxVersionCommandOutput := fmt.Sprintf(`nginx version: nginx/1.25.3 + nginxVersionCommandOutput := `nginx version: nginx/1.25.3 built by clang 14.0.0 (clang-1400.0.29.202) built with OpenSSL 1.1.1s 1 Nov 2022 (running with OpenSSL 1.1.1t 7 Feb 2023) TLS SNI support enabled - configure arguments: %s`, configArgs) + configure arguments: ` + configArgs process1 := protos.NginxOssInstance(nil) instancesTest1 := map[string]*mpi.Instance{ @@ -420,12 +421,12 @@ func TestGetInfo(t *testing.T) { }{ { name: "Test 1: NGINX open source", - nginxVersionCommandOutput: fmt.Sprintf(` + nginxVersionCommandOutput: ` nginx version: nginx/1.25.3 built by clang 14.0.3 (clang-1403.0.22.14.1) built with OpenSSL 3.1.3 19 Sep 2023 (running with OpenSSL 3.2.0 23 Nov 2023) TLS SNI support enabled - configure arguments: %s`, ossArgs), + configure arguments: ` + ossArgs, process: &nginxprocess.Process{ PID: 1123, Exe: exePath, @@ -488,12 +489,12 @@ func TestGetInfo(t *testing.T) { }, { name: "Test 2: NGINX plus", - nginxVersionCommandOutput: fmt.Sprintf(` + nginxVersionCommandOutput: ` nginx version: nginx/1.25.3 (nginx-plus-r31-p1) built by gcc 9.4.0 (Ubuntu 9.4.0-1ubuntu1~20.04.2) built with OpenSSL 1.1.1f 31 Mar 2020 TLS SNI support enabled - configure arguments: %s`, plusArgs), + configure arguments: ` + plusArgs, process: &nginxprocess.Process{ PID: 3141, Exe: exePath, @@ -592,7 +593,7 @@ func TestNginxProcessParser_GetExe(t *testing.T) { { name: "Test 1: Default exe if error executing command -v nginx", commandOutput: []byte{}, - commandError: fmt.Errorf("command error"), + commandError: errors.New("command error"), expected: "/usr/bin/nginx", }, { diff --git a/pkg/files/file_helpers_benchmark_test.go b/pkg/files/file_helpers_benchmark_test.go index e7f0c0938..2ffcbac65 100644 --- a/pkg/files/file_helpers_benchmark_test.go +++ b/pkg/files/file_helpers_benchmark_test.go @@ -26,7 +26,7 @@ func BenchmarkGenerateConfigVersion(b *testing.B) { }, } - for i := 0; i < b.N; i++ { + for range b.N { files := []*mpi.File{ file1, file2, diff --git a/pkg/files/file_stream.go b/pkg/files/file_stream.go index 636582486..9f774adcd 100644 --- a/pkg/files/file_stream.go +++ b/pkg/files/file_stream.go @@ -40,7 +40,7 @@ func SendChunkedFile( // since "send" is synchronous and data in buffer re-marshaled, we shouldn't // need to reallocate for each chunk. buf := make([]byte, chunkSize) - for i := 0; i < chunkCount; i++ { + for i := range chunkCount { // using ReadFull here, since the Read may get partial fills depends on impl, due to Read being // at most once op per call. But we want to fill our buffer every loop, for the defined chunk size. // Set size of the buffer, so we don't get the error from io.ReadFull, if the data fits just right @@ -104,7 +104,7 @@ func recvContents( totalSize int, ) error { // receive and write all content chunks - for i := 0; i < chunkCount; i++ { + for i := range chunkCount { chunk, err := src.Recv() if err != nil { return fmt.Errorf("unable to receive chunk id %d: %w", i, err) diff --git a/pkg/files/file_stream_test.go b/pkg/files/file_stream_test.go index c4fe1bc1d..2c9e19dff 100644 --- a/pkg/files/file_stream_test.go +++ b/pkg/files/file_stream_test.go @@ -7,7 +7,7 @@ package files_test import ( "bytes" - "fmt" + "errors" "io" "math/rand" "testing" @@ -126,7 +126,7 @@ func TestSendChunkedFile(t *testing.T) { // we send up to the Size in the file meta content: randBytes(2300), clientFunc: func(cl *v1fakes.FakeFileService_GetFileStreamServer) { - cl.SendReturns(fmt.Errorf("error")) + cl.SendReturns(errors.New("error")) }, expectedErrString: "unable to send header chunk", }, @@ -146,7 +146,7 @@ func TestSendChunkedFile(t *testing.T) { clientFunc: func(cl *v1fakes.FakeFileService_GetFileStreamServer) { cl.SendCalls(func(chunk *v1.FileDataChunk) error { if cl.SendCallCount() > 1 { - return fmt.Errorf("foo") + return errors.New("foo") } return nil @@ -191,7 +191,7 @@ func TestSendChunkedFile(t *testing.T) { chunks := test.header.Header.GetChunks() assert.EqualValues(t, chunks+1, cl.SendCallCount()) - for i := 0; i < int(chunks+1); i++ { + for i := range int(chunks + 1) { arg := cl.SendArgsForCall(i) switch i { case 0: @@ -214,12 +214,12 @@ func TestSendChunkedFile(t *testing.T) { type badWriter struct{} func (b badWriter) Write(p []byte) (n int, err error) { - return 0, fmt.Errorf("error") + return 0, errors.New("error") } -// nolint: revive,govet,maintidx +//nolint:revive,govet,maintidx func TestRecvChunkedFile(t *testing.T) { - recvErr := fmt.Errorf("recv error") + recvErr := errors.New("recv error") type recvReturn struct { chunk *v1.FileDataChunk err error diff --git a/pkg/id/uuid.go b/pkg/id/uuid.go index efaf118f1..35140a7a5 100644 --- a/pkg/id/uuid.go +++ b/pkg/id/uuid.go @@ -7,6 +7,7 @@ package id import ( "crypto/sha256" + "encoding/hex" "fmt" "github.com/google/uuid" @@ -37,7 +38,7 @@ func Generate(format string, a ...interface{}) string { h := sha256.New() s := fmt.Sprintf(f, a...) _, _ = h.Write([]byte(s)) - id := fmt.Sprintf("%x", h.Sum(nil)) + id := hex.EncodeToString(h.Sum(nil)) return uuid.NewMD5(uuid.Nil, []byte(id)).String() } diff --git a/pkg/nginxprocess/process_test.go b/pkg/nginxprocess/process_test.go index d7c3c44f5..5af69b832 100644 --- a/pkg/nginxprocess/process_test.go +++ b/pkg/nginxprocess/process_test.go @@ -216,14 +216,14 @@ func BenchmarkList(b *testing.B) { b.Skipf("skipping to prevent CI flake") ctx := context.Background() b.Run("base", func(b *testing.B) { - for i := 0; i < b.N; i++ { + for range b.N { _, err := nginxprocess.List(ctx) require.NoError(b, err) } }) b.Run("WithStatus", func(b *testing.B) { - for i := 0; i < b.N; i++ { + for range b.N { _, err := nginxprocess.List(ctx, nginxprocess.WithStatus(true)) require.NoError(b, err) } diff --git a/pkg/tls/self_signed_cert.go b/pkg/tls/self_signed_cert.go index a7220549a..4410afd37 100644 --- a/pkg/tls/self_signed_cert.go +++ b/pkg/tls/self_signed_cert.go @@ -114,7 +114,8 @@ func GenerateCA(now time.Time, caCertPath string) (*x509.Certificate, *ecdsa.Pri // GenerateServerCerts creates a server CA, Cert and Key and writes them to specified destinations. // Hostnames are a list of subject alternative names. // If cert files are already present, does nothing, returns true. -// nolint: revive +// +//nolint:revive func GenerateServerCerts(hostnames []string, caPath, certPath, keyPath string) (existingCert bool, err error) { // Check for and return existing cert if it already exists existingCert, existingCertErr := DoesCertAlreadyExist(certPath) @@ -195,7 +196,7 @@ func DoesCertAlreadyExist(certPath string) (bool, error) { if _, certErr := os.Stat(certPath); certErr == nil { certBytes, certReadErr := os.ReadFile(certPath) if certReadErr != nil { - return false, fmt.Errorf("error reading existing certificate file") + return false, errors.New("error reading existing certificate file") } certPEM, _ := pem.Decode(certBytes) if certPEM == nil { diff --git a/pkg/tls/self_signed_cert_test.go b/pkg/tls/self_signed_cert_test.go index 64911d77a..bbcf0bdd1 100644 --- a/pkg/tls/self_signed_cert_test.go +++ b/pkg/tls/self_signed_cert_test.go @@ -17,7 +17,7 @@ import ( "github.com/stretchr/testify/require" ) -// nolint: revive,gocognit +//nolint:revive,gocognit func TestGenerateSelfSignedCert(t *testing.T) { // Setup temp file paths caPath := "/tmp/test_ca.pem" diff --git a/test/helpers/config_utils.go b/test/helpers/config_utils.go index 3620e36c0..f4e1c5850 100644 --- a/test/helpers/config_utils.go +++ b/test/helpers/config_utils.go @@ -75,7 +75,7 @@ func GenerateConfig(t testing.TB, outputFile string, targetSize int64) (fs.FileI server, serverErr := generateRandomString(serverLength) require.NoError(t, serverErr) - serverName := fmt.Sprintf("%s.com", server) + serverName := server + ".com" proxy, proxyErr := generateRandomString(proxyLength) require.NoError(t, proxyErr) diff --git a/test/helpers/go_utils.go b/test/helpers/go_utils.go index 5e161e482..ade1349e3 100644 --- a/test/helpers/go_utils.go +++ b/test/helpers/go_utils.go @@ -86,7 +86,7 @@ func generatePattern(n int) (string, error) { pattern.WriteString(currDir) pattern.WriteRune(os.PathSeparator) - for i := 0; i < n; i++ { + for range n { pattern.WriteString("..") pattern.WriteRune(os.PathSeparator) } diff --git a/test/helpers/network_utils.go b/test/helpers/network_utils.go index c086ed78c..f5a1c38b0 100644 --- a/test/helpers/network_utils.go +++ b/test/helpers/network_utils.go @@ -6,6 +6,7 @@ package helpers import ( "crypto/rand" + "errors" "fmt" "math/big" "net" @@ -21,7 +22,7 @@ func RandomPort(t *testing.T) (int, error) { const maxPort = 65535 // try up to 10 times to get a random port - for i := 0; i < 10; i++ { + for range 10 { maxValue := &big.Int{} maxValue.SetInt64(maxPort - minPort + 1) @@ -37,7 +38,7 @@ func RandomPort(t *testing.T) (int, error) { } } - return 0, fmt.Errorf("could not find an available port after multiple attempts") + return 0, errors.New("could not find an available port after multiple attempts") } // isPortAvailable checks if a port is available by attempting to bind to it diff --git a/test/helpers/nginx_plus.go b/test/helpers/nginx_plus.go index 62be2956e..f51a8c420 100644 --- a/test/helpers/nginx_plus.go +++ b/test/helpers/nginx_plus.go @@ -20,7 +20,7 @@ const ( serverID = 1234 ) -// nolint: gocyclo,revive,cyclop,maintidx +//nolint:gocyclo,revive,cyclop,maintidx func NewMockNGINXPlusAPIServer(t *testing.T) *httptest.Server { t.Helper() return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { diff --git a/test/helpers/os_utils_test.go b/test/helpers/os_utils_test.go index 2ce93d558..a3b8d3688 100644 --- a/test/helpers/os_utils_test.go +++ b/test/helpers/os_utils_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" ) -// nolint: staticcheck +//nolint:staticcheck func TestRemoveASCIIControlSignals(t *testing.T) { tests := []struct { name string diff --git a/test/integration/installuninstall/install_uninstall_test.go b/test/integration/installuninstall/install_uninstall_test.go index a0137e9eb..14b60a0f1 100644 --- a/test/integration/installuninstall/install_uninstall_test.go +++ b/test/integration/installuninstall/install_uninstall_test.go @@ -155,7 +155,7 @@ func verifyAgentVersion(ctx context.Context, tb testing.TB, testContainer testco replacer := strings.NewReplacer("nginx-agent-", "v", "SNAPSHOT-", "") packageVersion := replacer.Replace(os.Getenv("PACKAGE_NAME")) - expectedVersionOutput := fmt.Sprintf("nginx-agent version %s", packageVersion) + expectedVersionOutput := "nginx-agent version " + packageVersion exitCode, cmdOut, err := testContainer.Exec(ctx, []string{"nginx-agent", "--version"}) require.NoError(tb, err) diff --git a/test/load/otel_collector_plugin_load_test.go b/test/load/otel_collector_plugin_load_test.go index c3797d2e9..fa4db0b2b 100644 --- a/test/load/otel_collector_plugin_load_test.go +++ b/test/load/otel_collector_plugin_load_test.go @@ -23,7 +23,7 @@ func TestMetric10kDPS(t *testing.T) { binary := parseBinary(os.Getenv("PACKAGE_NAME")) - otelTestBedCollector, err := filepath.Abs(fmt.Sprintf("../../%s", binary)) + otelTestBedCollector, err := filepath.Abs("../../" + binary) require.NoError(t, err) t.Logf("Absolute path is %s", otelTestBedCollector) diff --git a/test/mock/collector/mock-collector/auth/auth.go b/test/mock/collector/mock-collector/auth/auth.go index 550e4000b..02531138a 100644 --- a/test/mock/collector/mock-collector/auth/auth.go +++ b/test/mock/collector/mock-collector/auth/auth.go @@ -33,7 +33,7 @@ type Option func(*HeadersCheck) // Ensure that the authenticator implements the auth.Server interface. var _ auth.Server = (*HeadersCheck)(nil) -// nolint: ireturn +//nolint:ireturn func NewFactory() extension.Factory { return extension.NewFactory( aType, @@ -56,7 +56,7 @@ func (a *HeadersCheck) Authenticate(ctx context.Context, headers map[string][]st return ctx, nil } -// nolint: ireturn +//nolint:ireturn func CreateAuthExtensionFunc( _ context.Context, setting extension.Settings, diff --git a/test/mock/collector/mock-collector/auth/config.go b/test/mock/collector/mock-collector/auth/config.go index a58401884..c6e087899 100644 --- a/test/mock/collector/mock-collector/auth/config.go +++ b/test/mock/collector/mock-collector/auth/config.go @@ -17,7 +17,7 @@ type Config struct { AuthenticatorID component.ID `mapstructure:",squash"` } -// nolint: ireturn +//nolint:ireturn func CreateDefaultConfig() component.Config { return &Config{ AuthenticatorID: HeadersCheckID, diff --git a/test/mock/grpc/mock_management_file_service.go b/test/mock/grpc/mock_management_file_service.go index 593c6abe6..9ee302aae 100644 --- a/test/mock/grpc/mock_management_file_service.go +++ b/test/mock/grpc/mock_management_file_service.go @@ -16,12 +16,13 @@ import ( "path/filepath" "strconv" + "github.com/nginx/agent/v3/pkg/files" + "github.com/cenkalti/backoff/v4" backoffHelpers "github.com/nginx/agent/v3/internal/backoff" "github.com/nginx/agent/v3/internal/config" internalgrpc "github.com/nginx/agent/v3/internal/grpc" "github.com/nginx/agent/v3/internal/logger" - "github.com/nginx/agent/v3/pkg/files" "google.golang.org/grpc" "github.com/nginx/agent/v3/api/grpc/mpi/v1" @@ -162,6 +163,101 @@ func (mgs *FileService) GetFileStream(request *v1.GetFileRequest, streamingServer) } +func (mgs *FileService) UpdateFile( + ctx context.Context, + request *v1.UpdateFileRequest, +) (*v1.UpdateFileResponse, error) { + fileContents := request.GetContents().GetContents() + fileMeta := request.GetFile().GetFileMeta() + fileName := fileMeta.GetName() + fileHash := fileMeta.GetHash() + filePermissions := fileMeta.GetPermissions() + + slog.InfoContext(ctx, "Updating file", "name", fileName, "hash", fileHash) + + fullFilePath := mgs.findFile(request.GetFile().GetFileMeta()) + + if _, err := os.Stat(fullFilePath); os.IsNotExist(err) { + statErr := os.MkdirAll(filepath.Dir(fullFilePath), os.ModePerm) + if statErr != nil { + slog.InfoContext(ctx, "Failed to create/update file", "full_file_path", fullFilePath, "error", statErr) + return nil, status.Errorf(codes.Internal, "Failed to create/update file") + } + } + + err := os.WriteFile(fullFilePath, fileContents, fileMode(filePermissions)) + if err != nil { + slog.InfoContext(ctx, "Failed to create/update file", "full_file_path", fullFilePath, "error", err) + return nil, status.Errorf(codes.Internal, "Failed to create/update file") + } + + return &v1.UpdateFileResponse{ + FileMeta: fileMeta, + }, nil +} + +func (mgs *FileService) UpdateFileStream(streamingServer grpc.ClientStreamingServer[v1.FileDataChunk, + v1.UpdateFileResponse], +) error { + slog.Info("Updating file, stream") + + headerChunk, headerChunkErr := streamingServer.Recv() + if headerChunkErr != nil { + return headerChunkErr + } + + slog.Debug("File header chunk received", "header_chunk", headerChunk) + + header := headerChunk.GetHeader() + writeChunkedFileError := mgs.WriteChunkFile(header.GetFileMeta(), header, streamingServer) + if writeChunkedFileError != nil { + return writeChunkedFileError + } + + return nil +} + +func (mgs *FileService) WriteChunkFile(fileMeta *v1.FileMeta, header *v1.FileDataChunkHeader, + stream grpc.ClientStreamingServer[v1.FileDataChunk, v1.UpdateFileResponse], +) error { + fileName := mgs.findFile(fileMeta) + filePermissions := files.FileMode(fileMeta.GetPermissions()) + fullFilePath := mgs.findFile(fileMeta) + + if err := mgs.createDirectories(fullFilePath, filePermissions); err != nil { + return err + } + + fileToWrite, createError := os.Create(fullFilePath) + defer func() { + closeError := fileToWrite.Close() + if closeError != nil { + slog.Warn("Failed to close file", + "file", fileMeta.GetName(), + "error", closeError, + ) + } + }() + + if createError != nil { + return createError + } + slog.Debug("Writing chunked file", "file", fileName) + for range header.GetChunks() { + chunk, recvError := stream.Recv() + if recvError != nil { + return recvError + } + + _, chunkWriteError := fileToWrite.Write(chunk.GetContent().GetData()) + if chunkWriteError != nil { + return fmt.Errorf("error writing chunk to file %s: %w", fileMeta.GetName(), chunkWriteError) + } + } + + return nil +} + func (mgs *FileService) sendGetFileStreamChunks(ctx context.Context, fullFilePath, filePath string, chunkSize uint32, streamingServer grpc.ServerStreamingServer[v1.FileDataChunk], ) error { @@ -312,101 +408,6 @@ func readChunk( return chunk, err } -func (mgs *FileService) UpdateFile( - ctx context.Context, - request *v1.UpdateFileRequest, -) (*v1.UpdateFileResponse, error) { - fileContents := request.GetContents().GetContents() - fileMeta := request.GetFile().GetFileMeta() - fileName := fileMeta.GetName() - fileHash := fileMeta.GetHash() - filePermissions := fileMeta.GetPermissions() - - slog.InfoContext(ctx, "Updating file", "name", fileName, "hash", fileHash) - - fullFilePath := mgs.findFile(request.GetFile().GetFileMeta()) - - if _, err := os.Stat(fullFilePath); os.IsNotExist(err) { - statErr := os.MkdirAll(filepath.Dir(fullFilePath), os.ModePerm) - if statErr != nil { - slog.InfoContext(ctx, "Failed to create/update file", "full_file_path", fullFilePath, "error", statErr) - return nil, status.Errorf(codes.Internal, "Failed to create/update file") - } - } - - err := os.WriteFile(fullFilePath, fileContents, fileMode(filePermissions)) - if err != nil { - slog.InfoContext(ctx, "Failed to create/update file", "full_file_path", fullFilePath, "error", err) - return nil, status.Errorf(codes.Internal, "Failed to create/update file") - } - - return &v1.UpdateFileResponse{ - FileMeta: fileMeta, - }, nil -} - -func (mgs *FileService) UpdateFileStream(streamingServer grpc.ClientStreamingServer[v1.FileDataChunk, - v1.UpdateFileResponse], -) error { - slog.Info("Updating file, stream") - - headerChunk, headerChunkErr := streamingServer.Recv() - if headerChunkErr != nil { - return headerChunkErr - } - - slog.Debug("File header chunk received", "header_chunk", headerChunk) - - header := headerChunk.GetHeader() - writeChunkedFileError := mgs.WriteChunkFile(header.GetFileMeta(), header, streamingServer) - if writeChunkedFileError != nil { - return writeChunkedFileError - } - - return nil -} - -func (mgs *FileService) WriteChunkFile(fileMeta *v1.FileMeta, header *v1.FileDataChunkHeader, - stream grpc.ClientStreamingServer[v1.FileDataChunk, v1.UpdateFileResponse], -) error { - fileName := mgs.findFile(fileMeta) - filePermissions := files.FileMode(fileMeta.GetPermissions()) - fullFilePath := mgs.findFile(fileMeta) - - if err := mgs.createDirectories(fullFilePath, filePermissions); err != nil { - return err - } - - fileToWrite, createError := os.Create(fullFilePath) - defer func() { - closeError := fileToWrite.Close() - if closeError != nil { - slog.Warn("Failed to close file", - "file", fileMeta.GetName(), - "error", closeError, - ) - } - }() - - if createError != nil { - return createError - } - slog.Debug("Writing chunked file", "file", fileName) - for i := uint32(0); i < header.GetChunks(); i++ { - chunk, recvError := stream.Recv() - if recvError != nil { - return recvError - } - - _, chunkWriteError := fileToWrite.Write(chunk.GetContent().GetData()) - if chunkWriteError != nil { - return fmt.Errorf("error writing chunk to file %s: %w", fileMeta.GetName(), chunkWriteError) - } - } - - return nil -} - func (mgs *FileService) createDirectories(fullFilePath string, filePermissions os.FileMode) error { if _, err := os.Stat(fullFilePath); os.IsNotExist(err) { statErr := os.MkdirAll(filepath.Dir(fullFilePath), filePermissions)