diff --git a/internal/app.go b/internal/app.go index 7cbd1a037..6f10aa1bb 100644 --- a/internal/app.go +++ b/internal/app.go @@ -11,7 +11,6 @@ import ( "github.com/nginx/agent/v3/internal/bus" "github.com/nginx/agent/v3/internal/config" - "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/internal/plugin" "github.com/spf13/cobra" ) @@ -46,9 +45,6 @@ func (a *App) Run(ctx context.Context) error { return } - slogger := logger.New(*agentConfig.Log) - slog.SetDefault(slogger) - slog.InfoContext(ctx, "Starting NGINX Agent", slog.String("version", a.version), slog.String("commit", a.commit), diff --git a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go index 50ac43b0d..f9173a1b8 100644 --- a/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go +++ b/internal/collector/nginxossreceiver/internal/scraper/stubstatus/stub_status_scraper.go @@ -25,6 +25,7 @@ import ( ) type NginxStubStatusScraper struct { + logger *zap.Logger httpClient *http.Client client *client.NginxClient cfg *config.Config @@ -50,6 +51,7 @@ func NewScraper( cfg: cfg, mb: mb, rb: rb, + logger: logger, } } @@ -59,6 +61,7 @@ func (s *NginxStubStatusScraper) ID() component.ID { // nolint: unparam func (s *NginxStubStatusScraper) Start(_ context.Context, _ component.Host) error { + s.logger.Info("Starting NGINX stub status scraper") httpClient := http.DefaultClient httpClient.Timeout = s.cfg.ClientConfig.Timeout @@ -74,7 +77,9 @@ func (s *NginxStubStatusScraper) Start(_ context.Context, _ component.Host) erro return nil } +// nolint: unparam func (s *NginxStubStatusScraper) Shutdown(_ context.Context) error { + s.logger.Info("Shutting down NGINX stub status scraper") return nil } diff --git a/internal/config/config.go b/internal/config/config.go index 96634b329..8f9a41a4d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,9 +19,9 @@ import ( "strings" "time" - "github.com/nginx/agent/v3/internal/datasource/host" - "github.com/nginx/agent/v3/internal/datasource/file" + "github.com/nginx/agent/v3/internal/datasource/host" + "github.com/nginx/agent/v3/internal/logger" "github.com/goccy/go-yaml" uuidLibrary "github.com/nginx/agent/v3/pkg/id" @@ -83,6 +83,10 @@ func ResolveConfig() (*Config, error) { directories := viperInstance.GetStringSlice(AllowedDirectoriesKey) allowedDirs := []string{AgentDirName} + log := resolveLog() + slogger := logger.New(log.Path, log.Level) + slog.SetDefault(slogger) + // Check directories in allowed_directories are valid for _, dir := range directories { if dir == "" || !filepath.IsAbs(dir) { @@ -111,7 +115,7 @@ func ResolveConfig() (*Config, error) { UUID: viperInstance.GetString(UUIDKey), Version: viperInstance.GetString(VersionKey), Path: viperInstance.GetString(ConfigPathKey), - Log: resolveLog(), + Log: log, DataPlaneConfig: resolveDataPlaneConfig(), Client: resolveClient(), AllowedDirectories: allowedDirs, diff --git a/internal/datasource/config/configfakes/fake_config_parser.go b/internal/datasource/config/configfakes/fake_config_parser.go new file mode 100644 index 000000000..538ba34cb --- /dev/null +++ b/internal/datasource/config/configfakes/fake_config_parser.go @@ -0,0 +1,121 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package configfakes + +import ( + "context" + "sync" + + v1 "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/datasource/config" + "github.com/nginx/agent/v3/internal/model" +) + +type FakeConfigParser struct { + ParseStub func(context.Context, *v1.Instance) (*model.NginxConfigContext, error) + parseMutex sync.RWMutex + parseArgsForCall []struct { + arg1 context.Context + arg2 *v1.Instance + } + parseReturns struct { + result1 *model.NginxConfigContext + result2 error + } + parseReturnsOnCall map[int]struct { + result1 *model.NginxConfigContext + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeConfigParser) Parse(arg1 context.Context, arg2 *v1.Instance) (*model.NginxConfigContext, error) { + fake.parseMutex.Lock() + ret, specificReturn := fake.parseReturnsOnCall[len(fake.parseArgsForCall)] + fake.parseArgsForCall = append(fake.parseArgsForCall, struct { + arg1 context.Context + arg2 *v1.Instance + }{arg1, arg2}) + stub := fake.ParseStub + fakeReturns := fake.parseReturns + fake.recordInvocation("Parse", []interface{}{arg1, arg2}) + fake.parseMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeConfigParser) ParseCallCount() int { + fake.parseMutex.RLock() + defer fake.parseMutex.RUnlock() + return len(fake.parseArgsForCall) +} + +func (fake *FakeConfigParser) ParseCalls(stub func(context.Context, *v1.Instance) (*model.NginxConfigContext, error)) { + fake.parseMutex.Lock() + defer fake.parseMutex.Unlock() + fake.ParseStub = stub +} + +func (fake *FakeConfigParser) ParseArgsForCall(i int) (context.Context, *v1.Instance) { + fake.parseMutex.RLock() + defer fake.parseMutex.RUnlock() + argsForCall := fake.parseArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeConfigParser) ParseReturns(result1 *model.NginxConfigContext, result2 error) { + fake.parseMutex.Lock() + defer fake.parseMutex.Unlock() + fake.ParseStub = nil + fake.parseReturns = struct { + result1 *model.NginxConfigContext + result2 error + }{result1, result2} +} + +func (fake *FakeConfigParser) ParseReturnsOnCall(i int, result1 *model.NginxConfigContext, result2 error) { + fake.parseMutex.Lock() + defer fake.parseMutex.Unlock() + fake.ParseStub = nil + if fake.parseReturnsOnCall == nil { + fake.parseReturnsOnCall = make(map[int]struct { + result1 *model.NginxConfigContext + result2 error + }) + } + fake.parseReturnsOnCall[i] = struct { + result1 *model.NginxConfigContext + result2 error + }{result1, result2} +} + +func (fake *FakeConfigParser) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.parseMutex.RLock() + defer fake.parseMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeConfigParser) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ config.ConfigParser = new(FakeConfigParser) diff --git a/internal/watcher/instance/nginx_config_parser.go b/internal/datasource/config/nginx_config_parser.go similarity index 97% rename from internal/watcher/instance/nginx_config_parser.go rename to internal/datasource/config/nginx_config_parser.go index af9a8e3b9..36bbebfde 100644 --- a/internal/watcher/instance/nginx_config_parser.go +++ b/internal/datasource/config/nginx_config_parser.go @@ -3,7 +3,7 @@ // This source code is licensed under the Apache License, Version 2.0 license found in the // LICENSE file in the root directory of this source tree. -package instance +package config import ( "context" @@ -42,13 +42,20 @@ const ( locationDirective = "location" ) +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate +//counterfeiter:generate . ConfigParser + type ( NginxConfigParser struct { agentConfig *config.Config } ) -var _ nginxConfigParser = (*NginxConfigParser)(nil) +type ConfigParser interface { + Parse(ctx context.Context, instance *mpi.Instance) (*model.NginxConfigContext, error) +} + +var _ ConfigParser = (*NginxConfigParser)(nil) type ( crossplaneTraverseCallback = func(ctx context.Context, parent, current *crossplane.Directive) error @@ -207,6 +214,124 @@ func (ncp *NginxConfigParser) createNginxConfigContext( return nginxConfigContext, nil } +func (ncp *NginxConfigParser) crossplaneConfigTraverse( + ctx context.Context, + root *crossplane.Config, + callback crossplaneTraverseCallback, +) error { + for _, dir := range root.Parsed { + err := callback(ctx, nil, dir) + if err != nil { + return err + } + + err = ncp.traverse(ctx, dir, callback) + if err != nil { + return err + } + } + + return nil +} + +func (ncp *NginxConfigParser) traverse( + ctx context.Context, + root *crossplane.Directive, + callback crossplaneTraverseCallback, +) error { + for _, child := range root.Block { + err := callback(ctx, root, child) + if err != nil { + return err + } + + err = ncp.traverse(ctx, child, callback) + if err != nil { + return err + } + } + + return nil +} + +func (ncp *NginxConfigParser) crossplaneConfigTraverseAPIDetails( + ctx context.Context, + root *crossplane.Config, + callback crossplaneTraverseCallbackAPIDetails, + apiType string, +) *model.APIDetails { + stop := false + response := &model.APIDetails{ + URL: "", + Listen: "", + Location: "", + } + for _, dir := range root.Parsed { + response = callback(ctx, nil, dir, apiType) + if response.URL != "" { + return response + } + response = traverseAPIDetails(ctx, dir, callback, &stop, apiType) + if response.URL != "" { + return response + } + } + + return response +} + +func traverseAPIDetails( + ctx context.Context, + root *crossplane.Directive, + callback crossplaneTraverseCallbackAPIDetails, + stop *bool, + apiType string, +) *model.APIDetails { + response := &model.APIDetails{ + URL: "", + Listen: "", + Location: "", + } + if *stop { + return &model.APIDetails{ + URL: "", + Listen: "", + Location: "", + } + } + for _, child := range root.Block { + response = callback(ctx, root, child, apiType) + if response.URL != "" { + *stop = true + return response + } + response = traverseAPIDetails(ctx, child, callback, stop, apiType) + if *stop { + return response + } + } + + return response +} + +func (ncp *NginxConfigParser) formatMap(directive *crossplane.Directive) map[string]string { + formatMap := make(map[string]string) + + if ncp.hasAdditionArguments(directive.Args) { + if directive.Args[0] == ltsvArg { + formatMap[directive.Args[0]] = ltsvArg + } else { + formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") + } + } + + return formatMap +} + +func (ncp *NginxConfigParser) hasAdditionArguments(args []string) bool { + return len(args) >= defaultNumberOfDirectiveArguments +} + func (ncp *NginxConfigParser) ignoreLog(logPath string) bool { ignoreLogs := []string{"off", "/dev/stderr", "/dev/stdout", "/dev/null", "stderr", "stdout"} @@ -247,20 +372,6 @@ func (ncp *NginxConfigParser) isExcludeLog(path string) bool { return false } -func (ncp *NginxConfigParser) formatMap(directive *crossplane.Directive) map[string]string { - formatMap := make(map[string]string) - - if ncp.hasAdditionArguments(directive.Args) { - if directive.Args[0] == ltsvArg { - formatMap[directive.Args[0]] = ltsvArg - } else { - formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") - } - } - - return formatMap -} - func (ncp *NginxConfigParser) accessLog(file, format string, formatMap map[string]string) *model.AccessLog { accessLog := &model.AccessLog{ Name: file, @@ -278,24 +389,6 @@ func (ncp *NginxConfigParser) accessLog(file, format string, formatMap map[strin return accessLog } -func (ncp *NginxConfigParser) updateLogFormat( - format string, - formatMap map[string]string, - accessLog *model.AccessLog, -) *model.AccessLog { - if formatMap[format] != "" { - accessLog.Format = formatMap[format] - } else if format == "" || format == "combined" { - accessLog.Format = predefinedAccessLogFormat - } else if format == ltsvArg { - accessLog.Format = format - } else { - accessLog.Format = "" - } - - return accessLog -} - func (ncp *NginxConfigParser) errorLog(file, level string) *model.ErrorLog { errorLog := &model.ErrorLog{ Name: file, @@ -327,6 +420,24 @@ func (ncp *NginxConfigParser) errorLogDirectiveLevel(directive *crossplane.Direc return "" } +func (ncp *NginxConfigParser) updateLogFormat( + format string, + formatMap map[string]string, + accessLog *model.AccessLog, +) *model.AccessLog { + if formatMap[format] != "" { + accessLog.Format = formatMap[format] + } else if format == "" || format == "combined" { + accessLog.Format = predefinedAccessLogFormat + } else if format == ltsvArg { + accessLog.Format = format + } else { + accessLog.Format = "" + } + + return accessLog +} + func (ncp *NginxConfigParser) sslCert(ctx context.Context, file, rootDir string) (sslCertFile *mpi.File) { if strings.Contains(file, "$") { slog.DebugContext(ctx, "Cannot process SSL certificate file path with variables", "file", file) @@ -351,120 +462,6 @@ func (ncp *NginxConfigParser) sslCert(ctx context.Context, file, rootDir string) return sslCertFile } -func (ncp *NginxConfigParser) isDuplicateFile(nginxConfigContextFiles []*mpi.File, newFile *mpi.File) bool { - for _, nginxConfigContextFile := range nginxConfigContextFiles { - if nginxConfigContextFile.GetFileMeta().GetName() == newFile.GetFileMeta().GetName() { - return true - } - } - - return false -} - -func (ncp *NginxConfigParser) crossplaneConfigTraverse( - ctx context.Context, - root *crossplane.Config, - callback crossplaneTraverseCallback, -) error { - for _, dir := range root.Parsed { - err := callback(ctx, nil, dir) - if err != nil { - return err - } - - err = ncp.traverse(ctx, dir, callback) - if err != nil { - return err - } - } - - return nil -} - -func (ncp *NginxConfigParser) crossplaneConfigTraverseAPIDetails( - ctx context.Context, - root *crossplane.Config, - callback crossplaneTraverseCallbackAPIDetails, - apiType string, -) *model.APIDetails { - stop := false - response := &model.APIDetails{ - URL: "", - Listen: "", - Location: "", - } - for _, dir := range root.Parsed { - response = callback(ctx, nil, dir, apiType) - if response.URL != "" { - return response - } - response = traverseAPIDetails(ctx, dir, callback, &stop, apiType) - if response.URL != "" { - return response - } - } - - return response -} - -func traverseAPIDetails( - ctx context.Context, - root *crossplane.Directive, - callback crossplaneTraverseCallbackAPIDetails, - stop *bool, - apiType string, -) *model.APIDetails { - response := &model.APIDetails{ - URL: "", - Listen: "", - Location: "", - } - if *stop { - return &model.APIDetails{ - URL: "", - Listen: "", - Location: "", - } - } - for _, child := range root.Block { - response = callback(ctx, root, child, apiType) - if response.URL != "" { - *stop = true - return response - } - response = traverseAPIDetails(ctx, child, callback, stop, apiType) - if *stop { - return response - } - } - - return response -} - -func (ncp *NginxConfigParser) traverse( - ctx context.Context, - root *crossplane.Directive, - callback crossplaneTraverseCallback, -) error { - for _, child := range root.Block { - err := callback(ctx, root, child) - if err != nil { - return err - } - - err = ncp.traverse(ctx, child, callback) - if err != nil { - return err - } - } - - return nil -} - -func (ncp *NginxConfigParser) hasAdditionArguments(args []string) bool { - return len(args) >= defaultNumberOfDirectiveArguments -} - func (ncp *NginxConfigParser) apiCallback(ctx context.Context, parent, current *crossplane.Directive, apiType string, ) *model.APIDetails { @@ -496,7 +493,7 @@ func (ncp *NginxConfigParser) pingAPIEndpoint(ctx context.Context, statusAPIDeta statusAPI := statusAPIDetail.URL if strings.HasPrefix(listen, "unix:") { - httpClient = ncp.SocketClient(strings.TrimPrefix(listen, "unix:")) + httpClient = ncp.socketClient(strings.TrimPrefix(listen, "unix:")) } else { httpClient.Timeout = ncp.agentConfig.Client.HTTP.Timeout } @@ -599,19 +596,6 @@ func (ncp *NginxConfigParser) urlsForLocationDirectiveAPIDetails( return urls } -func (ncp *NginxConfigParser) parsePathFromLocationDirective(location *crossplane.Directive) string { - path := "/" - if len(location.Args) > 0 { - if location.Args[0] != "=" { - path = location.Args[0] - } else { - path = location.Args[1] - } - } - - return path -} - func (ncp *NginxConfigParser) parseAddressesFromServerDirective(parent *crossplane.Directive) []string { foundHosts := []string{} port := "80" @@ -645,6 +629,19 @@ func (ncp *NginxConfigParser) parseAddressesFromServerDirective(parent *crosspla return ncp.formatAddresses(foundHosts, port) } +func (ncp *NginxConfigParser) parsePathFromLocationDirective(location *crossplane.Directive) string { + path := "/" + if len(location.Args) > 0 { + if location.Args[0] != "=" { + path = location.Args[0] + } else { + path = location.Args[1] + } + } + + return path +} + func (ncp *NginxConfigParser) formatAddresses(foundHosts []string, port string) []string { addresses := []string{} for _, foundHost := range foundHosts { @@ -688,7 +685,7 @@ func (ncp *NginxConfigParser) isPort(value string) bool { return err == nil && port >= 1 && port <= 65535 } -func (ncp *NginxConfigParser) SocketClient(socketPath string) *http.Client { +func (ncp *NginxConfigParser) socketClient(socketPath string) *http.Client { return &http.Client{ Timeout: ncp.agentConfig.Client.Grpc.KeepAlive.Timeout, Transport: &http.Transport{ @@ -698,3 +695,13 @@ func (ncp *NginxConfigParser) SocketClient(socketPath string) *http.Client { }, } } + +func (ncp *NginxConfigParser) isDuplicateFile(nginxConfigContextFiles []*mpi.File, newFile *mpi.File) bool { + for _, nginxConfigContextFile := range nginxConfigContextFiles { + if nginxConfigContextFile.GetFileMeta().GetName() == newFile.GetFileMeta().GetName() { + return true + } + } + + return false +} diff --git a/internal/watcher/instance/nginx_config_parser_benchmark_test.go b/internal/datasource/config/nginx_config_parser_benchmark_test.go similarity index 99% rename from internal/watcher/instance/nginx_config_parser_benchmark_test.go rename to internal/datasource/config/nginx_config_parser_benchmark_test.go index 162306788..e61fb2380 100644 --- a/internal/watcher/instance/nginx_config_parser_benchmark_test.go +++ b/internal/datasource/config/nginx_config_parser_benchmark_test.go @@ -3,7 +3,7 @@ // This source code is licensed under the Apache License, Version 2.0 license found in the // LICENSE file in the root directory of this source tree. -package instance +package config import ( "context" diff --git a/internal/watcher/instance/nginx_config_parser_test.go b/internal/datasource/config/nginx_config_parser_test.go similarity index 99% rename from internal/watcher/instance/nginx_config_parser_test.go rename to internal/datasource/config/nginx_config_parser_test.go index e0c22e9be..304a1fc06 100644 --- a/internal/watcher/instance/nginx_config_parser_test.go +++ b/internal/datasource/config/nginx_config_parser_test.go @@ -3,7 +3,7 @@ // This source code is licensed under the Apache License, Version 2.0 license found in the // LICENSE file in the root directory of this source tree. -package instance +package config import ( "bytes" diff --git a/internal/datasource/proto/instance.go b/internal/datasource/proto/instance.go new file mode 100644 index 000000000..495263412 --- /dev/null +++ b/internal/datasource/proto/instance.go @@ -0,0 +1,77 @@ +// Copyright (c) F5, Inc. +// +// This source code is licensed under the Apache License, Version 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +package proto + +import ( + "reflect" + + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/model" +) + +func NginxPlusRuntimeInfoEqual(nginxPlusRuntimeInfo *mpi.NGINXPlusRuntimeInfo, + nginxConfigContext *model.NginxConfigContext, accessLogs, errorLogs []string, +) bool { + if !reflect.DeepEqual(nginxPlusRuntimeInfo.GetAccessLogs(), accessLogs) || + !reflect.DeepEqual(nginxPlusRuntimeInfo.GetErrorLogs(), errorLogs) || + nginxPlusRuntimeInfo.GetStubStatus().GetListen() != nginxConfigContext.StubStatus.Listen || + nginxPlusRuntimeInfo.GetPlusApi().GetListen() != nginxConfigContext.PlusAPI.Listen || + nginxPlusRuntimeInfo.GetStubStatus().GetLocation() != nginxConfigContext.StubStatus.Location || + nginxPlusRuntimeInfo.GetPlusApi().GetLocation() != nginxConfigContext.PlusAPI.Location { + return true + } + + return false +} + +func NginxRuntimeInfoEqual(nginxRuntimeInfo *mpi.NGINXRuntimeInfo, nginxConfigContext *model.NginxConfigContext, + accessLogs, errorLogs []string, +) bool { + if !reflect.DeepEqual(nginxRuntimeInfo.GetAccessLogs(), accessLogs) || + !reflect.DeepEqual(nginxRuntimeInfo.GetErrorLogs(), errorLogs) || + nginxRuntimeInfo.GetStubStatus().GetListen() != nginxConfigContext.StubStatus.Listen || + nginxRuntimeInfo.GetStubStatus().GetLocation() != nginxConfigContext.StubStatus.Location { + return true + } + + return false +} + +func UpdateNginxInstanceRuntime( + instance *mpi.Instance, + nginxConfigContext *model.NginxConfigContext, +) (updatesRequired bool) { + instanceType := instance.GetInstanceMeta().GetInstanceType() + + accessLogs := model.ConvertAccessLogs(nginxConfigContext.AccessLogs) + errorLogs := model.ConvertErrorLogs(nginxConfigContext.ErrorLogs) + + if instanceType == mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS { + nginxPlusRuntimeInfo := instance.GetInstanceRuntime().GetNginxPlusRuntimeInfo() + + if NginxPlusRuntimeInfoEqual(nginxPlusRuntimeInfo, nginxConfigContext, accessLogs, errorLogs) { + nginxPlusRuntimeInfo.AccessLogs = accessLogs + nginxPlusRuntimeInfo.ErrorLogs = errorLogs + nginxPlusRuntimeInfo.StubStatus.Listen = nginxConfigContext.StubStatus.Listen + nginxPlusRuntimeInfo.PlusApi.Listen = nginxConfigContext.PlusAPI.Listen + nginxPlusRuntimeInfo.StubStatus.Location = nginxConfigContext.StubStatus.Location + nginxPlusRuntimeInfo.PlusApi.Location = nginxConfigContext.PlusAPI.Location + updatesRequired = true + } + } else { + nginxRuntimeInfo := instance.GetInstanceRuntime().GetNginxRuntimeInfo() + + if NginxRuntimeInfoEqual(nginxRuntimeInfo, nginxConfigContext, accessLogs, errorLogs) { + nginxRuntimeInfo.AccessLogs = accessLogs + nginxRuntimeInfo.ErrorLogs = errorLogs + nginxRuntimeInfo.StubStatus.Location = nginxConfigContext.StubStatus.Location + nginxRuntimeInfo.StubStatus.Listen = nginxConfigContext.StubStatus.Listen + updatesRequired = true + } + } + + return updatesRequired +} diff --git a/internal/datasource/proto/instance_test.go b/internal/datasource/proto/instance_test.go new file mode 100644 index 000000000..155a4b9e4 --- /dev/null +++ b/internal/datasource/proto/instance_test.go @@ -0,0 +1,101 @@ +// Copyright (c) F5, Inc. +// +// This source code is licensed under the Apache License, Version 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +package proto + +import ( + "testing" + + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/test/protos" + "github.com/stretchr/testify/assert" +) + +func TestInstanceWatcherService_updateNginxInstanceRuntime(t *testing.T) { + nginxOSSConfigContext := &model.NginxConfigContext{ + AccessLogs: []*model.AccessLog{ + { + Name: "/usr/local/var/log/nginx/access.log", + }, + }, + ErrorLogs: []*model.ErrorLog{ + { + Name: "/usr/local/var/log/nginx/error.log", + }, + }, + StubStatus: &model.APIDetails{ + URL: "http://127.0.0.1:8081/api", + Listen: "", + }, + } + + nginxPlusConfigContext := &model.NginxConfigContext{ + AccessLogs: []*model.AccessLog{ + { + Name: "/usr/local/var/log/nginx/access.log", + }, + }, + ErrorLogs: []*model.ErrorLog{ + { + Name: "/usr/local/var/log/nginx/error.log", + }, + }, + PlusAPI: &model.APIDetails{ + URL: "http://127.0.0.1:8081/api", + Listen: "", + }, + StubStatus: &model.APIDetails{ + URL: "http://127.0.0.1:8081/api", + Listen: "", + }, + } + + tests := []struct { + nginxConfigContext *model.NginxConfigContext + instance *mpi.Instance + name string + }{ + { + name: "Test 1: OSS Instance", + nginxConfigContext: nginxOSSConfigContext, + instance: protos.GetNginxOssInstance([]string{}), + }, + { + name: "Test 2: Plus Instance", + nginxConfigContext: nginxPlusConfigContext, + instance: protos.GetNginxPlusInstance([]string{}), + }, + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + UpdateNginxInstanceRuntime(test.instance, test.nginxConfigContext) + if test.name == "Test 2: Plus Instance" { + assert.Equal(t, test.nginxConfigContext.AccessLogs[0].Name, test.instance.GetInstanceRuntime(). + GetNginxPlusRuntimeInfo().GetAccessLogs()[0]) + assert.Equal(t, test.nginxConfigContext.ErrorLogs[0].Name, test.instance.GetInstanceRuntime(). + GetNginxPlusRuntimeInfo().GetErrorLogs()[0]) + assert.Equal(t, test.nginxConfigContext.StubStatus.Location, test.instance.GetInstanceRuntime(). + GetNginxPlusRuntimeInfo().GetStubStatus().GetLocation()) + assert.Equal(t, test.nginxConfigContext.PlusAPI.Location, test.instance.GetInstanceRuntime(). + GetNginxPlusRuntimeInfo().GetPlusApi().GetLocation()) + assert.Equal(t, test.nginxConfigContext.StubStatus.Listen, test.instance.GetInstanceRuntime(). + GetNginxPlusRuntimeInfo().GetStubStatus().GetListen()) + assert.Equal(t, test.nginxConfigContext.PlusAPI.Listen, test.instance.GetInstanceRuntime(). + GetNginxPlusRuntimeInfo().GetPlusApi().GetListen()) + } else { + assert.Equal(t, test.nginxConfigContext.AccessLogs[0].Name, test.instance.GetInstanceRuntime(). + GetNginxRuntimeInfo().GetAccessLogs()[0]) + assert.Equal(t, test.nginxConfigContext.ErrorLogs[0].Name, test.instance.GetInstanceRuntime(). + GetNginxRuntimeInfo().GetErrorLogs()[0]) + assert.Equal(t, test.nginxConfigContext.StubStatus.Location, test.instance.GetInstanceRuntime(). + GetNginxRuntimeInfo().GetStubStatus().GetLocation()) + assert.Equal(t, test.nginxConfigContext.StubStatus.Listen, test.instance.GetInstanceRuntime(). + GetNginxRuntimeInfo().GetStubStatus().GetListen()) + } + }) + } +} diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index 6aa47a41a..88086473f 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -73,8 +73,10 @@ func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) { fp.handleConfigUploadRequest(ctx, msg) case bus.ConfigApplyRequestTopic: fp.handleConfigApplyRequest(ctx, msg) - case bus.ConfigApplySuccessfulTopic, bus.ConfigApplyCompleteTopic: + case bus.ConfigApplyCompleteTopic: fp.handleConfigApplyComplete(ctx, msg) + case bus.ConfigApplySuccessfulTopic: + fp.handleConfigApplySuccess(ctx, msg) case bus.ConfigApplyFailedTopic: fp.handleConfigApplyFailedRequest(ctx, msg) default: @@ -125,6 +127,18 @@ func (fp *FilePlugin) handleConfigApplyComplete(ctx context.Context, msg *bus.Me fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) } +func (fp *FilePlugin) handleConfigApplySuccess(ctx context.Context, msg *bus.Message) { + successMessage, ok := msg.Data.(*model.ConfigApplySuccess) + + if !ok { + slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplySuccess", "payload", msg.Data) + return + } + + fp.fileManagerService.ClearCache() + fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: successMessage.DataPlaneResponse}) +} + func (fp *FilePlugin) handleConfigApplyFailedRequest(ctx context.Context, msg *bus.Message) { data, ok := msg.Data.(*model.ConfigApplyMessage) if data.InstanceID == "" || !ok { @@ -183,7 +197,7 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes switch writeStatus { case model.NoChange: - response = fp.createDataPlaneResponse( + dpResponse := fp.createDataPlaneResponse( correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, "Config apply successful, no files to change", @@ -191,8 +205,13 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes "", ) + successMessage := &model.ConfigApplySuccess{ + ConfigContext: &model.NginxConfigContext{}, + DataPlaneResponse: dpResponse, + } + fp.fileManagerService.ClearCache() - fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: response}) + fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: successMessage}) return case model.Error: diff --git a/internal/file/file_plugin_test.go b/internal/file/file_plugin_test.go index d550d8b3b..a0d6bf548 100644 --- a/internal/file/file_plugin_test.go +++ b/internal/file/file_plugin_test.go @@ -198,13 +198,13 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) { case test.configApplyStatus == model.NoChange: assert.Len(t, messages, 1) - response, ok := messages[0].Data.(*mpi.DataPlaneResponse) + response, ok := messages[0].Data.(*model.ConfigApplySuccess) assert.True(t, ok) assert.Equal(t, bus.ConfigApplySuccessfulTopic, messages[0].Topic) assert.Equal( t, mpi.CommandResponse_COMMAND_STATUS_OK, - response.GetCommandResponse().GetStatus(), + response.DataPlaneResponse.GetCommandResponse().GetStatus(), ) case test.message == nil: assert.Empty(t, messages) @@ -457,7 +457,10 @@ func TestFilePlugin_Process_ConfigApplyRollbackCompleteTopic(t *testing.T) { InstanceId: instance.GetInstanceMeta().GetInstanceId(), } - filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: expectedResponse}) + filePlugin.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: &model.ConfigApplySuccess{ + ConfigContext: &model.NginxConfigContext{}, + DataPlaneResponse: expectedResponse, + }}) messages := messagePipe.GetMessages() response, ok := messages[0].Data.(*mpi.DataPlaneResponse) diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 9a6d7b064..87171a20b 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -13,7 +13,6 @@ import ( "path" "strings" - "github.com/nginx/agent/v3/internal/config" "github.com/nginx/agent/v3/pkg/id" ) @@ -44,11 +43,11 @@ type ( } ) -func New(params config.Log) *slog.Logger { +func New(logPath, level string) *slog.Logger { handler := slog.NewTextHandler( - getLogWriter(params.Path), + getLogWriter(logPath), &slog.HandlerOptions{ - Level: GetLogLevel(params.Level), + Level: GetLogLevel(level), }, ) diff --git a/internal/logger/logger_test.go b/internal/logger/logger_test.go index 1eefdf175..9ee83ad22 100644 --- a/internal/logger/logger_test.go +++ b/internal/logger/logger_test.go @@ -15,12 +15,11 @@ import ( "github.com/stretchr/testify/require" - "github.com/nginx/agent/v3/internal/config" "github.com/stretchr/testify/assert" ) func TestNewLogger(t *testing.T) { - result := New(config.Log{}) + result := New("", "") assert.IsType(t, &slog.Logger{}, result) } diff --git a/internal/model/config.go b/internal/model/config.go index 543d090c8..f5b93a7c6 100644 --- a/internal/model/config.go +++ b/internal/model/config.go @@ -39,20 +39,59 @@ type ManifestFileMeta struct { // The size of the file in bytes Size int64 `json:"size"` } +type ConfigApplyMessage struct { + Error error + CorrelationID string + InstanceID string +} + +type AccessLog struct { + Name string + Format string + Permissions string + Readable bool +} + +type ErrorLog struct { + Name string + LogLevel string + Permissions string + Readable bool +} + +type ( + WriteStatus int +) + +const ( + RollbackRequired WriteStatus = iota + 1 + NoChange + Error + OK +) + +type ConfigApplySuccess struct { + ConfigContext *NginxConfigContext + DataPlaneResponse *v1.DataPlaneResponse +} // Complexity is 11, allowed is 10 // nolint: revive, cyclop func (ncc *NginxConfigContext) Equal(otherNginxConfigContext *NginxConfigContext) bool { - if ncc.StubStatus.URL != otherNginxConfigContext.StubStatus.URL || ncc.StubStatus.Listen != - otherNginxConfigContext.StubStatus.Listen || ncc.StubStatus.Location != - otherNginxConfigContext.StubStatus.Location { - return false + if ncc.StubStatus != nil && otherNginxConfigContext.StubStatus != nil { + if ncc.StubStatus.URL != otherNginxConfigContext.StubStatus.URL || ncc.StubStatus.Listen != + otherNginxConfigContext.StubStatus.Listen || ncc.StubStatus.Location != + otherNginxConfigContext.StubStatus.Location { + return false + } } - if ncc.PlusAPI.URL != otherNginxConfigContext.PlusAPI.URL || ncc.PlusAPI.Listen != - otherNginxConfigContext.PlusAPI.Listen || ncc.PlusAPI.Location != - otherNginxConfigContext.PlusAPI.Location { - return false + if ncc.PlusAPI != nil && otherNginxConfigContext.PlusAPI != nil { + if ncc.PlusAPI.URL != otherNginxConfigContext.PlusAPI.URL || ncc.PlusAPI.Listen != + otherNginxConfigContext.PlusAPI.Listen || ncc.PlusAPI.Location != + otherNginxConfigContext.PlusAPI.Location { + return false + } } if ncc.InstanceID != otherNginxConfigContext.InstanceID { @@ -95,33 +134,18 @@ func (ncc *NginxConfigContext) areFileEqual(files []*v1.File) bool { return true } -type ConfigApplyMessage struct { - Error error - CorrelationID string - InstanceID string -} - -type AccessLog struct { - Name string - Format string - Permissions string - Readable bool -} +func ConvertAccessLogs(accessLogs []*AccessLog) (logs []string) { + for _, log := range accessLogs { + logs = append(logs, log.Name) + } -type ErrorLog struct { - Name string - LogLevel string - Permissions string - Readable bool + return logs } -type ( - WriteStatus int -) +func ConvertErrorLogs(errorLogs []*ErrorLog) (logs []string) { + for _, log := range errorLogs { + logs = append(logs, log.Name) + } -const ( - RollbackRequired WriteStatus = iota + 1 - NoChange - Error - OK -) + return logs +} diff --git a/internal/model/config_test.go b/internal/model/config_test.go index 0d7ea9069..8bda7d00c 100644 --- a/internal/model/config_test.go +++ b/internal/model/config_test.go @@ -81,6 +81,10 @@ func TestNginxConfigContext_Equal(t *testing.T) { nginxConfigContextWithDifferentErrorLogs := *nginxConfigContext nginxConfigContextWithDifferentErrorLogs.ErrorLogs = []*ErrorLog{} + nginxConfigContextWithNilValues := *nginxConfigContext + nginxConfigContextWithNilValues.StubStatus = nil + nginxConfigContextWithNilValues.PlusAPI = nil + assert.True(t, nginxConfigContext.Equal(&nginxConfigContextWithSameValues)) assert.False(t, nginxConfigContext.Equal(&nginxConfigContextWithDifferentStubStatus)) assert.False(t, nginxConfigContext.Equal(&nginxConfigContextWithDifferentPlusAPI)) @@ -89,4 +93,5 @@ func TestNginxConfigContext_Equal(t *testing.T) { assert.False(t, nginxConfigContext.Equal(&nginxConfigContextWithDifferentFileHashes)) assert.False(t, nginxConfigContext.Equal(&nginxConfigContextWithDifferentAccessLogs)) assert.False(t, nginxConfigContext.Equal(&nginxConfigContextWithDifferentErrorLogs)) + assert.True(t, nginxConfigContext.Equal(&nginxConfigContextWithNilValues)) } diff --git a/internal/resource/resource_plugin.go b/internal/resource/resource_plugin.go index 5d9015c35..b71679c95 100644 --- a/internal/resource/resource_plugin.go +++ b/internal/resource/resource_plugin.go @@ -220,7 +220,7 @@ func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Mes return } - err := r.resourceService.ApplyConfig(ctx, data.InstanceID) + configContext, err := r.resourceService.ApplyConfig(ctx, data.InstanceID) if err != nil { data.Error = err slog.Error("errors found during config apply, sending error status, rolling back config", "err", err) @@ -236,13 +236,12 @@ func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Mes dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_OK, "Config apply successful", data.InstanceID, "") - r.messagePipe.Process( - ctx, - &bus.Message{ - Topic: bus.ConfigApplySuccessfulTopic, - Data: dpResponse, - }, - ) + successMessage := &model.ConfigApplySuccess{ + ConfigContext: configContext, + DataPlaneResponse: dpResponse, + } + + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplySuccessfulTopic, Data: successMessage}) } func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) { @@ -252,7 +251,7 @@ func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) { return } - err := r.resourceService.ApplyConfig(ctx, data.InstanceID) + _, err := r.resourceService.ApplyConfig(ctx, data.InstanceID) if err != nil { slog.Error("errors found during rollback, sending failure status", "err", err) diff --git a/internal/resource/resource_plugin_test.go b/internal/resource/resource_plugin_test.go index 313b8bb98..1f15a9596 100644 --- a/internal/resource/resource_plugin_test.go +++ b/internal/resource/resource_plugin_test.go @@ -163,7 +163,7 @@ func TestResource_Process_Apply(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { fakeResourceService := &resourcefakes.FakeResourceServiceInterface{} - fakeResourceService.ApplyConfigReturns(test.applyErr) + fakeResourceService.ApplyConfigReturns(&model.NginxConfigContext{}, test.applyErr) messagePipe := busfakes.NewFakeMessagePipe() resourcePlugin := NewResource(types.AgentConfig()) @@ -862,7 +862,7 @@ func TestResource_Process_Rollback(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { fakeResourceService := &resourcefakes.FakeResourceServiceInterface{} - fakeResourceService.ApplyConfigReturns(test.rollbackErr) + fakeResourceService.ApplyConfigReturns(&model.NginxConfigContext{}, test.rollbackErr) messagePipe := busfakes.NewFakeMessagePipe() resourcePlugin := NewResource(types.AgentConfig()) diff --git a/internal/resource/resource_service.go b/internal/resource/resource_service.go index c53ee6d27..feb16263a 100644 --- a/internal/resource/resource_service.go +++ b/internal/resource/resource_service.go @@ -16,6 +16,10 @@ import ( "strings" "sync" + parser "github.com/nginx/agent/v3/internal/datasource/config" + datasource "github.com/nginx/agent/v3/internal/datasource/proto" + "github.com/nginx/agent/v3/internal/model" + "google.golang.org/protobuf/proto" "github.com/nginxinc/nginx-plus-go-client/v2/client" @@ -46,7 +50,7 @@ type resourceServiceInterface interface { AddInstances(instanceList []*mpi.Instance) *mpi.Resource UpdateInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource DeleteInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource - ApplyConfig(ctx context.Context, instanceID string) error + ApplyConfig(ctx context.Context, instanceID string) (*model.NginxConfigContext, error) Instance(instanceID string) *mpi.Instance GetHTTPUpstreamServers(ctx context.Context, instance *mpi.Instance, upstreams string) ([]client.UpstreamServer, error) @@ -71,6 +75,7 @@ type ( type ResourceService struct { resource *mpi.Resource + nginxConfigParser parser.ConfigParser agentConfig *config.Config instanceOperators map[string]instanceOperator // key is instance ID info host.InfoInterface @@ -85,6 +90,7 @@ func NewResourceService(ctx context.Context, agentConfig *config.Config) *Resour info: host.NewInfo(), operatorsMutex: sync.Mutex{}, instanceOperators: make(map[string]instanceOperator), + nginxConfigParser: parser.NewNginxConfigParser(agentConfig), agentConfig: agentConfig, } @@ -174,12 +180,12 @@ func (r *ResourceService) DeleteInstances(ctx context.Context, instanceList []*m return r.resource } -func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) error { +func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) (*model.NginxConfigContext, error) { var instance *mpi.Instance operator := r.instanceOperators[instanceID] if operator == nil { - return fmt.Errorf("instance %s not found", instanceID) + return nil, fmt.Errorf("instance %s not found", instanceID) } for _, resourceInstance := range r.resource.GetInstances() { @@ -188,17 +194,26 @@ func (r *ResourceService) ApplyConfig(ctx context.Context, instanceID string) er } } + nginxConfigContext, parseErr := r.nginxConfigParser.Parse(ctx, instance) + if parseErr != nil || nginxConfigContext == nil { + return nil, fmt.Errorf("failed to parse config %w", parseErr) + } + + datasource.UpdateNginxInstanceRuntime(instance, nginxConfigContext) + + slog.DebugContext(ctx, "Updated Instance Runtime after parsing config", "instance", instance.GetInstanceRuntime()) + valErr := operator.Validate(ctx, instance) if valErr != nil { - return fmt.Errorf("failed validating config %w", valErr) + return nil, fmt.Errorf("failed validating config %w", valErr) } reloadErr := operator.Reload(ctx, instance) if reloadErr != nil { - return fmt.Errorf("failed to reload NGINX %w", reloadErr) + return nil, fmt.Errorf("failed to reload NGINX %w", reloadErr) } - return nil + return nginxConfigContext, nil } func (r *ResourceService) GetHTTPUpstreamServers(ctx context.Context, instance *mpi.Instance, diff --git a/internal/resource/resource_service_test.go b/internal/resource/resource_service_test.go index c85a460ce..8fc384ef2 100644 --- a/internal/resource/resource_service_test.go +++ b/internal/resource/resource_service_test.go @@ -11,6 +11,9 @@ import ( "fmt" "testing" + "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/internal/watcher/instance/instancefakes" + "github.com/nginxinc/nginx-plus-go-client/v2/client" "google.golang.org/protobuf/types/known/structpb" @@ -330,10 +333,23 @@ func TestResourceService_ApplyConfig(t *testing.T) { instanceOp.ReloadReturns(test.reloadErr) instanceOp.ValidateReturns(test.validateErr) + nginxParser := instancefakes.FakeNginxConfigParser{} + + nginxParser.ParseReturns(&model.NginxConfigContext{ + StubStatus: &model.APIDetails{}, + PlusAPI: &model.APIDetails{}, + InstanceID: test.instanceID, + Files: nil, + AccessLogs: nil, + ErrorLogs: nil, + NAPSysLogServers: nil, + }, nil) + resourceService := NewResourceService(ctx, types.AgentConfig()) resourceOpMap := make(map[string]instanceOperator) resourceOpMap[protos.GetNginxOssInstance([]string{}).GetInstanceMeta().GetInstanceId()] = instanceOp resourceService.instanceOperators = resourceOpMap + resourceService.nginxConfigParser = &nginxParser instance := protos.GetNginxOssInstance([]string{}) instances := []*v1.Instance{ @@ -341,7 +357,7 @@ func TestResourceService_ApplyConfig(t *testing.T) { } resourceService.resource.Instances = instances - reloadError := resourceService.ApplyConfig(ctx, test.instanceID) + _, reloadError := resourceService.ApplyConfig(ctx, test.instanceID) assert.Equal(t, test.expected, reloadError) }) } diff --git a/internal/resource/resourcefakes/fake_resource_service_interface.go b/internal/resource/resourcefakes/fake_resource_service_interface.go index c05460003..fa5ae561d 100644 --- a/internal/resource/resourcefakes/fake_resource_service_interface.go +++ b/internal/resource/resourcefakes/fake_resource_service_interface.go @@ -6,6 +6,7 @@ import ( "sync" v1 "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/model" "github.com/nginxinc/nginx-plus-go-client/v2/client" "google.golang.org/protobuf/types/known/structpb" ) @@ -22,17 +23,19 @@ type FakeResourceServiceInterface struct { addInstancesReturnsOnCall map[int]struct { result1 *v1.Resource } - ApplyConfigStub func(context.Context, string) error + ApplyConfigStub func(context.Context, string) (*model.NginxConfigContext, error) applyConfigMutex sync.RWMutex applyConfigArgsForCall []struct { arg1 context.Context arg2 string } applyConfigReturns struct { - result1 error + result1 *model.NginxConfigContext + result2 error } applyConfigReturnsOnCall map[int]struct { - result1 error + result1 *model.NginxConfigContext + result2 error } DeleteInstancesStub func(context.Context, []*v1.Instance) *v1.Resource deleteInstancesMutex sync.RWMutex @@ -222,7 +225,7 @@ func (fake *FakeResourceServiceInterface) AddInstancesReturnsOnCall(i int, resul }{result1} } -func (fake *FakeResourceServiceInterface) ApplyConfig(arg1 context.Context, arg2 string) error { +func (fake *FakeResourceServiceInterface) ApplyConfig(arg1 context.Context, arg2 string) (*model.NginxConfigContext, error) { fake.applyConfigMutex.Lock() ret, specificReturn := fake.applyConfigReturnsOnCall[len(fake.applyConfigArgsForCall)] fake.applyConfigArgsForCall = append(fake.applyConfigArgsForCall, struct { @@ -237,9 +240,9 @@ func (fake *FakeResourceServiceInterface) ApplyConfig(arg1 context.Context, arg2 return stub(arg1, arg2) } if specificReturn { - return ret.result1 + return ret.result1, ret.result2 } - return fakeReturns.result1 + return fakeReturns.result1, fakeReturns.result2 } func (fake *FakeResourceServiceInterface) ApplyConfigCallCount() int { @@ -248,7 +251,7 @@ func (fake *FakeResourceServiceInterface) ApplyConfigCallCount() int { return len(fake.applyConfigArgsForCall) } -func (fake *FakeResourceServiceInterface) ApplyConfigCalls(stub func(context.Context, string) error) { +func (fake *FakeResourceServiceInterface) ApplyConfigCalls(stub func(context.Context, string) (*model.NginxConfigContext, error)) { fake.applyConfigMutex.Lock() defer fake.applyConfigMutex.Unlock() fake.ApplyConfigStub = stub @@ -261,27 +264,30 @@ func (fake *FakeResourceServiceInterface) ApplyConfigArgsForCall(i int) (context return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeResourceServiceInterface) ApplyConfigReturns(result1 error) { +func (fake *FakeResourceServiceInterface) ApplyConfigReturns(result1 *model.NginxConfigContext, result2 error) { fake.applyConfigMutex.Lock() defer fake.applyConfigMutex.Unlock() fake.ApplyConfigStub = nil fake.applyConfigReturns = struct { - result1 error - }{result1} + result1 *model.NginxConfigContext + result2 error + }{result1, result2} } -func (fake *FakeResourceServiceInterface) ApplyConfigReturnsOnCall(i int, result1 error) { +func (fake *FakeResourceServiceInterface) ApplyConfigReturnsOnCall(i int, result1 *model.NginxConfigContext, result2 error) { fake.applyConfigMutex.Lock() defer fake.applyConfigMutex.Unlock() fake.ApplyConfigStub = nil if fake.applyConfigReturnsOnCall == nil { fake.applyConfigReturnsOnCall = make(map[int]struct { - result1 error + result1 *model.NginxConfigContext + result2 error }) } fake.applyConfigReturnsOnCall[i] = struct { - result1 error - }{result1} + result1 *model.NginxConfigContext + result2 error + }{result1, result2} } func (fake *FakeResourceServiceInterface) DeleteInstances(arg1 context.Context, arg2 []*v1.Instance) *v1.Resource { diff --git a/internal/watcher/credentials/credential_watcher_service.go b/internal/watcher/credentials/credential_watcher_service.go index 3dd5dcc9f..e44d4134c 100644 --- a/internal/watcher/credentials/credential_watcher_service.go +++ b/internal/watcher/credentials/credential_watcher_service.go @@ -32,7 +32,6 @@ type CredentialUpdateMessage struct { } type CredentialWatcherService struct { - enabled *atomic.Bool agentConfig *config.Config watcher *fsnotify.Watcher filesBeingWatched *sync.Map @@ -40,14 +39,10 @@ type CredentialWatcherService struct { } func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherService { - enabled := &atomic.Bool{} - enabled.Store(true) - filesChanged := &atomic.Bool{} filesChanged.Store(false) return &CredentialWatcherService{ - enabled: enabled, agentConfig: agentConfig, filesBeingWatched: &sync.Map{}, filesChanged: filesChanged, @@ -87,17 +82,7 @@ func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- Creden } } -func (cws *CredentialWatcherService) SetEnabled(enabled bool) { - cws.enabled.Store(enabled) -} - func (cws *CredentialWatcherService) addWatcher(ctx context.Context, filePath string) { - if !cws.enabled.Load() { - slog.DebugContext(ctx, "Credential watcher is disabled") - - return - } - if cws.isWatching(filePath) { slog.DebugContext( ctx, "Credential watcher is already watching ", "path", filePath) @@ -133,26 +118,24 @@ func (cws *CredentialWatcherService) isWatching(path string) bool { } func (cws *CredentialWatcherService) handleEvent(ctx context.Context, event fsnotify.Event) { - if cws.enabled.Load() { - if isEventSkippable(event) { - slog.DebugContext(ctx, "Skipping FSNotify event", "event", event) - return - } + if isEventSkippable(event) { + slog.DebugContext(ctx, "Skipping FSNotify event", "event", event) + return + } - slog.DebugContext(ctx, "Processing FSNotify event", "event", event) + slog.DebugContext(ctx, "Processing FSNotify event", "event", event) - switch { - case event.Has(fsnotify.Remove): - fallthrough - case event.Has(fsnotify.Rename): - if !slices.Contains(cws.watcher.WatchList(), event.Name) { - cws.filesBeingWatched.Store(event.Name, false) - } - cws.addWatcher(ctx, event.Name) + switch { + case event.Has(fsnotify.Remove): + fallthrough + case event.Has(fsnotify.Rename): + if !slices.Contains(cws.watcher.WatchList(), event.Name) { + cws.filesBeingWatched.Store(event.Name, false) } - - cws.filesChanged.Store(true) + cws.addWatcher(ctx, event.Name) } + + cws.filesChanged.Store(true) } func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch chan<- CredentialUpdateMessage) { diff --git a/internal/watcher/credentials/credential_watcher_service_test.go b/internal/watcher/credentials/credential_watcher_service_test.go index 60d740a09..dba4450f3 100644 --- a/internal/watcher/credentials/credential_watcher_service_test.go +++ b/internal/watcher/credentials/credential_watcher_service_test.go @@ -25,21 +25,9 @@ func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) credentialWatcherService := NewCredentialWatcherService(types.AgentConfig()) assert.Empty(t, credentialWatcherService.filesBeingWatched) - assert.True(t, credentialWatcherService.enabled.Load()) assert.False(t, credentialWatcherService.filesChanged.Load()) } -func TestCredentialWatcherService_SetEnabled(t *testing.T) { - credentialWatcherService := NewCredentialWatcherService(types.AgentConfig()) - assert.True(t, credentialWatcherService.enabled.Load()) - - credentialWatcherService.SetEnabled(false) - assert.False(t, credentialWatcherService.enabled.Load()) - - credentialWatcherService.SetEnabled(true) - assert.True(t, credentialWatcherService.enabled.Load()) -} - func TestCredentialWatcherService_Watch(t *testing.T) { ctx := context.Background() cws := NewCredentialWatcherService(types.AgentConfig()) @@ -102,13 +90,6 @@ func TestCredentialWatcherService_addWatcher(t *testing.T) { require.NoError(t, err) defer os.Remove(name) - cws.enabled.Store(false) - - cws.addWatcher(ctx, name) - require.False(t, cws.isWatching(name)) - - cws.enabled.Store(true) - cws.addWatcher(ctx, name) require.True(t, cws.isWatching(name)) diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go index e163ee0c9..291902860 100644 --- a/internal/watcher/instance/instance_watcher_service.go +++ b/internal/watcher/instance/instance_watcher_service.go @@ -8,11 +8,15 @@ package instance import ( "context" "log/slog" - "reflect" "slices" "sync" + "sync/atomic" "time" + "github.com/nginx/agent/v3/internal/datasource/proto" + + parser "github.com/nginx/agent/v3/internal/datasource/config" + "github.com/nginx/agent/v3/pkg/nginxprocess" mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" @@ -29,22 +33,16 @@ const defaultAgentPath = "/run/nginx-agent" //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate //counterfeiter:generate . processParser -//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate -//counterfeiter:generate . nginxConfigParser - type ( processParser interface { Parse(ctx context.Context, processes []*nginxprocess.Process) map[string]*mpi.Instance } - nginxConfigParser interface { - Parse(ctx context.Context, instance *mpi.Instance) (*model.NginxConfigContext, error) - } - InstanceWatcherService struct { processOperator process.ProcessOperatorInterface - nginxConfigParser nginxConfigParser + nginxConfigParser parser.ConfigParser executer exec.ExecInterface + enabled *atomic.Bool agentConfig *config.Config instanceCache map[string]*mpi.Instance nginxConfigCache map[string]*model.NginxConfigContext @@ -73,19 +71,27 @@ type ( ) func NewInstanceWatcherService(agentConfig *config.Config) *InstanceWatcherService { + enabled := &atomic.Bool{} + enabled.Store(true) + return &InstanceWatcherService{ agentConfig: agentConfig, processOperator: process.NewProcessOperator(), nginxParser: NewNginxProcessParser(), nginxAppProtectProcessParser: NewNginxAppProtectProcessParser(), - nginxConfigParser: NewNginxConfigParser(agentConfig), + nginxConfigParser: parser.NewNginxConfigParser(agentConfig), instanceCache: make(map[string]*mpi.Instance), cacheMutex: sync.Mutex{}, nginxConfigCache: make(map[string]*model.NginxConfigContext), executer: &exec.Exec{}, + enabled: enabled, } } +func (iw *InstanceWatcherService) SetEnabled(enabled bool) { + iw.enabled.Store(enabled) +} + func (iw *InstanceWatcherService) Watch( ctx context.Context, instancesChannel chan<- InstanceUpdatesMessage, @@ -108,7 +114,11 @@ func (iw *InstanceWatcherService) Watch( return case <-instanceWatcherTicker.C: - iw.checkForUpdates(ctx) + if iw.enabled.Load() { + iw.checkForUpdates(ctx) + } else { + slog.Debug("Skipping check for instance updates, instance watcher is disabled") + } } } } @@ -116,12 +126,38 @@ func (iw *InstanceWatcherService) Watch( func (iw *InstanceWatcherService) ReparseConfigs(ctx context.Context) { slog.DebugContext(ctx, "Reparsing all instance configurations") for _, instance := range iw.instanceCache { - iw.ReparseConfig(ctx, instance.GetInstanceMeta().GetInstanceId()) + nginxConfigContext := &model.NginxConfigContext{} + var parseErr error + slog.DebugContext( + ctx, + "Reparsing NGINX instance config", + "instance_id", instance.GetInstanceMeta().GetInstanceId(), + ) + + if instance.GetInstanceMeta().GetInstanceType() == mpi.InstanceMeta_INSTANCE_TYPE_NGINX || + instance.GetInstanceMeta().GetInstanceType() == mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS { + nginxConfigContext, parseErr = iw.nginxConfigParser.Parse(ctx, instance) + if parseErr != nil { + slog.WarnContext( + ctx, + "Failed to parse NGINX instance config", + "config_path", instance.GetInstanceRuntime().GetConfigPath(), + "instance_id", instance.GetInstanceMeta().GetInstanceId(), + "error", parseErr, + ) + + return + } + } + + iw.HandleNginxConfigContextUpdate(ctx, instance.GetInstanceMeta().GetInstanceId(), nginxConfigContext) } slog.DebugContext(ctx, "Finished reparsing all instance configurations") } -func (iw *InstanceWatcherService) ReparseConfig(ctx context.Context, instanceID string) { +func (iw *InstanceWatcherService) HandleNginxConfigContextUpdate(ctx context.Context, instanceID string, + nginxConfigContext *model.NginxConfigContext, +) { iw.cacheMutex.Lock() defer iw.cacheMutex.Unlock() @@ -132,28 +168,9 @@ func (iw *InstanceWatcherService) ReparseConfig(ctx context.Context, instanceID if instanceType == mpi.InstanceMeta_INSTANCE_TYPE_NGINX || instanceType == mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS { - slog.DebugContext( - ctx, - "Reparsing NGINX instance config", - "instance_id", instanceID, - ) - - nginxConfigContext, parseErr := iw.nginxConfigParser.Parse(ctx, instance) - if parseErr != nil { - slog.WarnContext( - ctx, - "Unable to parse NGINX instance config", - "config_path", instance.GetInstanceRuntime().GetConfigPath(), - "instance_id", instanceID, - "error", parseErr, - ) - - return - } - iw.sendNginxConfigContextUpdate(ctx, nginxConfigContext) iw.nginxConfigCache[nginxConfigContext.InstanceID] = nginxConfigContext - updatesRequired = iw.updateNginxInstanceRuntime(instance, nginxConfigContext) + updatesRequired = proto.UpdateNginxInstanceRuntime(instance, nginxConfigContext) } if updatesRequired { @@ -205,7 +222,7 @@ func (iw *InstanceWatcherService) checkForUpdates( } else { iw.sendNginxConfigContextUpdate(newCtx, nginxConfigContext) iw.nginxConfigCache[nginxConfigContext.InstanceID] = nginxConfigContext - iw.updateNginxInstanceRuntime(newInstance, nginxConfigContext) + proto.UpdateNginxInstanceRuntime(newInstance, nginxConfigContext) iw.instanceCache[newInstance.GetInstanceMeta().GetInstanceId()] = newInstance } } @@ -393,83 +410,3 @@ func areInstancesEqual(oldRuntime, currentRuntime *mpi.InstanceRuntime) (equal b return true } - -func (iw *InstanceWatcherService) updateNginxInstanceRuntime( - instance *mpi.Instance, - nginxConfigContext *model.NginxConfigContext, -) (updatesRequired bool) { - instanceType := instance.GetInstanceMeta().GetInstanceType() - - accessLogs := convertAccessLogs(nginxConfigContext.AccessLogs) - errorLogs := convertErrorLogs(nginxConfigContext.ErrorLogs) - - if instanceType == mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS { - nginxPlusRuntimeInfo := instance.GetInstanceRuntime().GetNginxPlusRuntimeInfo() - - if nginxPlusRuntimeInfoEqual(nginxPlusRuntimeInfo, nginxConfigContext, accessLogs, errorLogs) { - nginxPlusRuntimeInfo.AccessLogs = accessLogs - nginxPlusRuntimeInfo.ErrorLogs = errorLogs - nginxPlusRuntimeInfo.StubStatus.Listen = nginxConfigContext.StubStatus.Listen - nginxPlusRuntimeInfo.PlusApi.Listen = nginxConfigContext.PlusAPI.Listen - nginxPlusRuntimeInfo.StubStatus.Location = nginxConfigContext.StubStatus.Location - nginxPlusRuntimeInfo.PlusApi.Location = nginxConfigContext.PlusAPI.Location - updatesRequired = true - } - } else { - nginxRuntimeInfo := instance.GetInstanceRuntime().GetNginxRuntimeInfo() - - if nginxRuntimeInfoEqual(nginxRuntimeInfo, nginxConfigContext, accessLogs, errorLogs) { - nginxRuntimeInfo.AccessLogs = accessLogs - nginxRuntimeInfo.ErrorLogs = errorLogs - nginxRuntimeInfo.StubStatus.Location = nginxConfigContext.StubStatus.Location - nginxRuntimeInfo.StubStatus.Listen = nginxConfigContext.StubStatus.Listen - updatesRequired = true - } - } - - return updatesRequired -} - -func nginxPlusRuntimeInfoEqual(nginxPlusRuntimeInfo *mpi.NGINXPlusRuntimeInfo, - nginxConfigContext *model.NginxConfigContext, accessLogs, errorLogs []string, -) bool { - if !reflect.DeepEqual(nginxPlusRuntimeInfo.GetAccessLogs(), accessLogs) || - !reflect.DeepEqual(nginxPlusRuntimeInfo.GetErrorLogs(), errorLogs) || - nginxPlusRuntimeInfo.GetStubStatus().GetListen() != nginxConfigContext.StubStatus.Listen || - nginxPlusRuntimeInfo.GetPlusApi().GetListen() != nginxConfigContext.PlusAPI.Listen || - nginxPlusRuntimeInfo.GetStubStatus().GetLocation() != nginxConfigContext.StubStatus.Location || - nginxPlusRuntimeInfo.GetPlusApi().GetLocation() != nginxConfigContext.PlusAPI.Location { - return true - } - - return false -} - -func nginxRuntimeInfoEqual(nginxRuntimeInfo *mpi.NGINXRuntimeInfo, nginxConfigContext *model.NginxConfigContext, - accessLogs, errorLogs []string, -) bool { - if !reflect.DeepEqual(nginxRuntimeInfo.GetAccessLogs(), accessLogs) || - !reflect.DeepEqual(nginxRuntimeInfo.GetErrorLogs(), errorLogs) || - nginxRuntimeInfo.GetStubStatus().GetListen() != nginxConfigContext.StubStatus.Listen || - nginxRuntimeInfo.GetStubStatus().GetLocation() != nginxConfigContext.StubStatus.Location { - return true - } - - return false -} - -func convertAccessLogs(accessLogs []*model.AccessLog) (logs []string) { - for _, log := range accessLogs { - logs = append(logs, log.Name) - } - - return logs -} - -func convertErrorLogs(errorLogs []*model.ErrorLog) (logs []string) { - for _, log := range errorLogs { - logs = append(logs, log.Name) - } - - return logs -} diff --git a/internal/watcher/instance/instance_watcher_service_test.go b/internal/watcher/instance/instance_watcher_service_test.go index 6e869401f..7cd0f2a28 100644 --- a/internal/watcher/instance/instance_watcher_service_test.go +++ b/internal/watcher/instance/instance_watcher_service_test.go @@ -156,94 +156,6 @@ func TestInstanceWatcherService_instanceUpdates(t *testing.T) { } } -func TestInstanceWatcherService_updateNginxInstanceRuntime(t *testing.T) { - instanceWatcherService := NewInstanceWatcherService(types.AgentConfig()) - - nginxOSSConfigContext := &model.NginxConfigContext{ - AccessLogs: []*model.AccessLog{ - { - Name: "/usr/local/var/log/nginx/access.log", - }, - }, - ErrorLogs: []*model.ErrorLog{ - { - Name: "/usr/local/var/log/nginx/error.log", - }, - }, - StubStatus: &model.APIDetails{ - URL: "http://127.0.0.1:8081/api", - Listen: "", - }, - } - - nginxPlusConfigContext := &model.NginxConfigContext{ - AccessLogs: []*model.AccessLog{ - { - Name: "/usr/local/var/log/nginx/access.log", - }, - }, - ErrorLogs: []*model.ErrorLog{ - { - Name: "/usr/local/var/log/nginx/error.log", - }, - }, - PlusAPI: &model.APIDetails{ - URL: "http://127.0.0.1:8081/api", - Listen: "", - }, - StubStatus: &model.APIDetails{ - URL: "http://127.0.0.1:8081/api", - Listen: "", - }, - } - - tests := []struct { - nginxConfigContext *model.NginxConfigContext - instance *mpi.Instance - name string - }{ - { - name: "Test 1: OSS Instance", - nginxConfigContext: nginxOSSConfigContext, - instance: protos.GetNginxOssInstance([]string{}), - }, - { - name: "Test 2: Plus Instance", - nginxConfigContext: nginxPlusConfigContext, - instance: protos.GetNginxPlusInstance([]string{}), - }, - } - - for _, test := range tests { - t.Run(test.name, func(tt *testing.T) { - instanceWatcherService.updateNginxInstanceRuntime(test.instance, test.nginxConfigContext) - if test.name == "Test 2: Plus Instance" { - assert.Equal(t, test.nginxConfigContext.AccessLogs[0].Name, test.instance.GetInstanceRuntime(). - GetNginxPlusRuntimeInfo().GetAccessLogs()[0]) - assert.Equal(t, test.nginxConfigContext.ErrorLogs[0].Name, test.instance.GetInstanceRuntime(). - GetNginxPlusRuntimeInfo().GetErrorLogs()[0]) - assert.Equal(t, test.nginxConfigContext.StubStatus.Location, test.instance.GetInstanceRuntime(). - GetNginxPlusRuntimeInfo().GetStubStatus().GetLocation()) - assert.Equal(t, test.nginxConfigContext.PlusAPI.Location, test.instance.GetInstanceRuntime(). - GetNginxPlusRuntimeInfo().GetPlusApi().GetLocation()) - assert.Equal(t, test.nginxConfigContext.StubStatus.Listen, test.instance.GetInstanceRuntime(). - GetNginxPlusRuntimeInfo().GetStubStatus().GetListen()) - assert.Equal(t, test.nginxConfigContext.PlusAPI.Listen, test.instance.GetInstanceRuntime(). - GetNginxPlusRuntimeInfo().GetPlusApi().GetListen()) - } else { - assert.Equal(t, test.nginxConfigContext.AccessLogs[0].Name, test.instance.GetInstanceRuntime(). - GetNginxRuntimeInfo().GetAccessLogs()[0]) - assert.Equal(t, test.nginxConfigContext.ErrorLogs[0].Name, test.instance.GetInstanceRuntime(). - GetNginxRuntimeInfo().GetErrorLogs()[0]) - assert.Equal(t, test.nginxConfigContext.StubStatus.Location, test.instance.GetInstanceRuntime(). - GetNginxRuntimeInfo().GetStubStatus().GetLocation()) - assert.Equal(t, test.nginxConfigContext.StubStatus.Listen, test.instance.GetInstanceRuntime(). - GetNginxRuntimeInfo().GetStubStatus().GetListen()) - } - }) - } -} - func TestInstanceWatcherService_areInstancesEqual(t *testing.T) { tests := []struct { oldRuntime *mpi.InstanceRuntime @@ -351,6 +263,8 @@ func TestInstanceWatcherService_ReparseConfig(t *testing.T) { updatedInstance.GetInstanceRuntime().GetNginxRuntimeInfo().AccessLogs = []string{"access2.log"} updatedInstance.GetInstanceRuntime().GetNginxRuntimeInfo().ErrorLogs = []string{"error.log"} + updateNginxConfigContext.InstanceID = updatedInstance.GetInstanceMeta().GetInstanceId() + tests := []struct { parseReturns *model.NginxConfigContext name string @@ -363,13 +277,10 @@ func TestInstanceWatcherService_ReparseConfig(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - fakeNginxConfigParser := &instancefakes.FakeNginxConfigParser{} - fakeNginxConfigParser.ParseReturns(test.parseReturns, nil) instanceUpdatesChannel := make(chan InstanceUpdatesMessage, 1) nginxConfigContextChannel := make(chan NginxConfigContextMessage, 1) instanceWatcherService := NewInstanceWatcherService(types.AgentConfig()) - instanceWatcherService.nginxConfigParser = fakeNginxConfigParser instanceWatcherService.instancesChannel = instanceUpdatesChannel instanceWatcherService.nginxConfigContextChannel = nginxConfigContextChannel @@ -381,11 +292,16 @@ func TestInstanceWatcherService_ReparseConfig(t *testing.T) { instance.GetInstanceMeta().GetInstanceId(): instance, } - instanceWatcherService.ReparseConfig(ctx, updatedInstance.GetInstanceMeta().GetInstanceId()) + instanceWatcherService.HandleNginxConfigContextUpdate(ctx, updatedInstance. + GetInstanceMeta().GetInstanceId(), + updateNginxConfigContext) nginxConfigContextMessage := <-nginxConfigContextChannel assert.Equal(t, updateNginxConfigContext, nginxConfigContextMessage.NginxConfigContext) + assert.Equal(tt, updateNginxConfigContext, instanceWatcherService. + nginxConfigCache[updatedInstance.GetInstanceMeta().GetInstanceId()]) + instanceUpdatesMessage := <-instanceUpdatesChannel assert.Len(t, instanceUpdatesMessage.InstanceUpdates.UpdatedInstances, 1) assert.Equal(tt, updatedInstance, instanceUpdatesMessage.InstanceUpdates.UpdatedInstances[0]) diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 3fb7ec544..b0678fe6d 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -11,6 +11,8 @@ import ( "slices" "sync" + "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/internal/grpc" "github.com/nginx/agent/v3/internal/watcher/credentials" @@ -55,8 +57,9 @@ type ( instancesChannel chan<- instance.InstanceUpdatesMessage, nginxConfigContextChannel chan<- instance.NginxConfigContextMessage, ) - ReparseConfig(ctx context.Context, instanceID string) + HandleNginxConfigContextUpdate(ctx context.Context, instanceID string, configContext *model.NginxConfigContext) ReparseConfigs(ctx context.Context) + SetEnabled(enabled bool) } credentialWatcherServiceInterface interface { @@ -64,7 +67,6 @@ type ( ctx context.Context, credentialUpdateChannel chan<- credentials.CredentialUpdateMessage, ) - SetEnabled(enabled bool) } ) @@ -156,6 +158,7 @@ func (*Watcher) Subscriptions() []string { } func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message) { + slog.Info("Watcher plugin received ConfigApplyRequest event") managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) if !ok { slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.ManagementPlaneRequest", @@ -179,18 +182,27 @@ func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message w.instancesWithConfigApplyInProgress = append(w.instancesWithConfigApplyInProgress, instanceID) w.fileWatcherService.SetEnabled(false) + w.instanceWatcherService.SetEnabled(false) } func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message) { - response, ok := msg.Data.(*mpi.DataPlaneResponse) + successMessage, ok := msg.Data.(*model.ConfigApplySuccess) if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *mpi.DataPlaneResponse", "payload", + slog.ErrorContext(ctx, "Unable to cast message payload to *model.ConfigApplySuccess", "payload", msg.Data, "topic", msg.Topic) return } - instanceID := response.GetInstanceId() + instanceID := successMessage.DataPlaneResponse.GetInstanceId() + + // If the config apply had no changes to any files, it is results in a ConfigApplySuccessfulTopic with an empty + // configContext being sent, there is no need to reparse the config as no change has occurred. + if successMessage.ConfigContext.InstanceID == "" { + slog.DebugContext(ctx, "NginxConfigContext is empty, no need to reparse config") + return + } + w.instanceWatcherService.HandleNginxConfigContextUpdate(ctx, instanceID, successMessage.ConfigContext) w.watcherMutex.Lock() w.instancesWithConfigApplyInProgress = slices.DeleteFunc( @@ -202,8 +214,7 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message w.fileWatcherService.SetEnabled(true) w.watcherMutex.Unlock() - - w.instanceWatcherService.ReparseConfig(ctx, instanceID) + w.instanceWatcherService.SetEnabled(true) } func (w *Watcher) handleHealthRequest(ctx context.Context) { @@ -232,6 +243,7 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag }, ) + w.instanceWatcherService.SetEnabled(true) w.fileWatcherService.SetEnabled(true) } diff --git a/internal/watcher/watcher_plugin_test.go b/internal/watcher/watcher_plugin_test.go index 2a739d530..752f98568 100644 --- a/internal/watcher/watcher_plugin_test.go +++ b/internal/watcher/watcher_plugin_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + model2 "github.com/nginx/agent/v3/internal/model" + "github.com/nginx/agent/v3/internal/watcher/credentials" "github.com/nginx/agent/v3/internal/bus/busfakes" @@ -163,18 +165,23 @@ func TestWatcher_Process_ConfigApplySuccessfulTopic(t *testing.T) { ctx := context.Background() data := protos.GetNginxOssInstance([]string{}) - response := &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: "dfsbhj6-bc92-30c1-a9c9-85591422068e", - Timestamp: timestamppb.Now(), + response := &model2.ConfigApplySuccess{ + ConfigContext: &model2.NginxConfigContext{ + InstanceID: data.GetInstanceMeta().GetInstanceId(), }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Config apply successful", - Error: "", + DataPlaneResponse: &mpi.DataPlaneResponse{ + MessageMeta: &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: "dfsbhj6-bc92-30c1-a9c9-85591422068e", + Timestamp: timestamppb.Now(), + }, + CommandResponse: &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Config apply successful", + Error: "", + }, + InstanceId: data.GetInstanceMeta().GetInstanceId(), }, - InstanceId: data.GetInstanceMeta().GetInstanceId(), } message := &bus.Message{ @@ -189,7 +196,7 @@ func TestWatcher_Process_ConfigApplySuccessfulTopic(t *testing.T) { watcherPlugin.Process(ctx, message) - assert.Equal(t, 1, fakeWatcherService.ReparseConfigCallCount()) + assert.Equal(t, 1, fakeWatcherService.HandleNginxConfigContextUpdateCallCount()) assert.Empty(t, watcherPlugin.instancesWithConfigApplyInProgress) } diff --git a/internal/watcher/watcherfakes/fake_instance_watcher_service_interface.go b/internal/watcher/watcherfakes/fake_instance_watcher_service_interface.go index cbb484c4f..27ddf3807 100644 --- a/internal/watcher/watcherfakes/fake_instance_watcher_service_interface.go +++ b/internal/watcher/watcherfakes/fake_instance_watcher_service_interface.go @@ -5,21 +5,28 @@ import ( "context" "sync" + "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/internal/watcher/instance" ) type FakeInstanceWatcherServiceInterface struct { - ReparseConfigStub func(context.Context, string) - reparseConfigMutex sync.RWMutex - reparseConfigArgsForCall []struct { + HandleNginxConfigContextUpdateStub func(context.Context, string, *model.NginxConfigContext) + handleNginxConfigContextUpdateMutex sync.RWMutex + handleNginxConfigContextUpdateArgsForCall []struct { arg1 context.Context arg2 string + arg3 *model.NginxConfigContext } ReparseConfigsStub func(context.Context) reparseConfigsMutex sync.RWMutex reparseConfigsArgsForCall []struct { arg1 context.Context } + SetEnabledStub func(bool) + setEnabledMutex sync.RWMutex + setEnabledArgsForCall []struct { + arg1 bool + } WatchStub func(context.Context, chan<- instance.InstanceUpdatesMessage, chan<- instance.NginxConfigContextMessage) watchMutex sync.RWMutex watchArgsForCall []struct { @@ -31,37 +38,38 @@ type FakeInstanceWatcherServiceInterface struct { invocationsMutex sync.RWMutex } -func (fake *FakeInstanceWatcherServiceInterface) ReparseConfig(arg1 context.Context, arg2 string) { - fake.reparseConfigMutex.Lock() - fake.reparseConfigArgsForCall = append(fake.reparseConfigArgsForCall, struct { +func (fake *FakeInstanceWatcherServiceInterface) HandleNginxConfigContextUpdate(arg1 context.Context, arg2 string, arg3 *model.NginxConfigContext) { + fake.handleNginxConfigContextUpdateMutex.Lock() + fake.handleNginxConfigContextUpdateArgsForCall = append(fake.handleNginxConfigContextUpdateArgsForCall, struct { arg1 context.Context arg2 string - }{arg1, arg2}) - stub := fake.ReparseConfigStub - fake.recordInvocation("ReparseConfig", []interface{}{arg1, arg2}) - fake.reparseConfigMutex.Unlock() + arg3 *model.NginxConfigContext + }{arg1, arg2, arg3}) + stub := fake.HandleNginxConfigContextUpdateStub + fake.recordInvocation("HandleNginxConfigContextUpdate", []interface{}{arg1, arg2, arg3}) + fake.handleNginxConfigContextUpdateMutex.Unlock() if stub != nil { - fake.ReparseConfigStub(arg1, arg2) + fake.HandleNginxConfigContextUpdateStub(arg1, arg2, arg3) } } -func (fake *FakeInstanceWatcherServiceInterface) ReparseConfigCallCount() int { - fake.reparseConfigMutex.RLock() - defer fake.reparseConfigMutex.RUnlock() - return len(fake.reparseConfigArgsForCall) +func (fake *FakeInstanceWatcherServiceInterface) HandleNginxConfigContextUpdateCallCount() int { + fake.handleNginxConfigContextUpdateMutex.RLock() + defer fake.handleNginxConfigContextUpdateMutex.RUnlock() + return len(fake.handleNginxConfigContextUpdateArgsForCall) } -func (fake *FakeInstanceWatcherServiceInterface) ReparseConfigCalls(stub func(context.Context, string)) { - fake.reparseConfigMutex.Lock() - defer fake.reparseConfigMutex.Unlock() - fake.ReparseConfigStub = stub +func (fake *FakeInstanceWatcherServiceInterface) HandleNginxConfigContextUpdateCalls(stub func(context.Context, string, *model.NginxConfigContext)) { + fake.handleNginxConfigContextUpdateMutex.Lock() + defer fake.handleNginxConfigContextUpdateMutex.Unlock() + fake.HandleNginxConfigContextUpdateStub = stub } -func (fake *FakeInstanceWatcherServiceInterface) ReparseConfigArgsForCall(i int) (context.Context, string) { - fake.reparseConfigMutex.RLock() - defer fake.reparseConfigMutex.RUnlock() - argsForCall := fake.reparseConfigArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 +func (fake *FakeInstanceWatcherServiceInterface) HandleNginxConfigContextUpdateArgsForCall(i int) (context.Context, string, *model.NginxConfigContext) { + fake.handleNginxConfigContextUpdateMutex.RLock() + defer fake.handleNginxConfigContextUpdateMutex.RUnlock() + argsForCall := fake.handleNginxConfigContextUpdateArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeInstanceWatcherServiceInterface) ReparseConfigs(arg1 context.Context) { @@ -96,6 +104,38 @@ func (fake *FakeInstanceWatcherServiceInterface) ReparseConfigsArgsForCall(i int return argsForCall.arg1 } +func (fake *FakeInstanceWatcherServiceInterface) SetEnabled(arg1 bool) { + fake.setEnabledMutex.Lock() + fake.setEnabledArgsForCall = append(fake.setEnabledArgsForCall, struct { + arg1 bool + }{arg1}) + stub := fake.SetEnabledStub + fake.recordInvocation("SetEnabled", []interface{}{arg1}) + fake.setEnabledMutex.Unlock() + if stub != nil { + fake.SetEnabledStub(arg1) + } +} + +func (fake *FakeInstanceWatcherServiceInterface) SetEnabledCallCount() int { + fake.setEnabledMutex.RLock() + defer fake.setEnabledMutex.RUnlock() + return len(fake.setEnabledArgsForCall) +} + +func (fake *FakeInstanceWatcherServiceInterface) SetEnabledCalls(stub func(bool)) { + fake.setEnabledMutex.Lock() + defer fake.setEnabledMutex.Unlock() + fake.SetEnabledStub = stub +} + +func (fake *FakeInstanceWatcherServiceInterface) SetEnabledArgsForCall(i int) bool { + fake.setEnabledMutex.RLock() + defer fake.setEnabledMutex.RUnlock() + argsForCall := fake.setEnabledArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeInstanceWatcherServiceInterface) Watch(arg1 context.Context, arg2 chan<- instance.InstanceUpdatesMessage, arg3 chan<- instance.NginxConfigContextMessage) { fake.watchMutex.Lock() fake.watchArgsForCall = append(fake.watchArgsForCall, struct { @@ -133,10 +173,12 @@ func (fake *FakeInstanceWatcherServiceInterface) WatchArgsForCall(i int) (contex func (fake *FakeInstanceWatcherServiceInterface) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.reparseConfigMutex.RLock() - defer fake.reparseConfigMutex.RUnlock() + fake.handleNginxConfigContextUpdateMutex.RLock() + defer fake.handleNginxConfigContextUpdateMutex.RUnlock() fake.reparseConfigsMutex.RLock() defer fake.reparseConfigsMutex.RUnlock() + fake.setEnabledMutex.RLock() + defer fake.setEnabledMutex.RUnlock() fake.watchMutex.RLock() defer fake.watchMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/test/config/nginx/invalid-nginx.conf b/test/config/nginx/invalid-nginx.conf index eee4cd3ff..0087d50fd 100644 --- a/test/config/nginx/invalid-nginx.conf +++ b/test/config/nginx/invalid-nginx.conf @@ -1 +1,47 @@ -invalid config file +worker_processes 1 +error_log /var/log/nginx/error.log; +events { + worker_connections 1024; +} + +http { + include mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for" ' + '"$bytes_sent" "$request_length" "$request_time" ' + '"$gzip_ratio" $server_protocol '; + + access_log /var/log/nginx/access.log main; + + sendfile on; + keepalive_timeout 65; + + server { + listen 8080; + server_name localhost; + + location / { + root /usr/share/nginx/html; + index index.html index.htm; + } + + ## + # Enable Metrics + ## + location /api { + stub_status; + allow 127.0.0.1; + deny all; + } + + # redirect server error pages to the static page /50x.html + # + error_page 500 502 503 504 /50x.html; + location = /50x.html { + root /usr/share/nginx/html; + } + } +} diff --git a/test/config/nginx/nginx-plus-with-test-location.conf b/test/config/nginx/nginx-plus-with-test-location.conf new file mode 100644 index 000000000..c4764e189 --- /dev/null +++ b/test/config/nginx/nginx-plus-with-test-location.conf @@ -0,0 +1,54 @@ +worker_processes 1; +error_log /var/log/nginx/error.log; + +events { + worker_connections 1024; +} + +http { + include mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for" ' + '"$bytes_sent" "$request_length" "$request_time" ' + '"$gzip_ratio" $server_protocol '; + + access_log /var/log/nginx/access.log main; + + sendfile on; + keepalive_timeout 65; + + server { + listen 8080; + server_name localhost; + + location / { + root /usr/share/nginx/html; + index index.html index.htm; + } + + ## + # Enable Metrics + ## + location /api/ { + api write=on; + allow 127.0.0.1; + deny all; + status_zone my_location_zone1; + } + + + location /test { + return 200 "Test Success"; + } + + # redirect server error pages to the static page /50x.html + # + error_page 500 502 503 504 /50x.html; + location = /50x.html { + root /usr/share/nginx/html; + } + } +} diff --git a/test/integration/grpc_management_plane_api_test.go b/test/integration/grpc_management_plane_api_test.go index 1d67a6292..1d7c98b59 100644 --- a/test/integration/grpc_management_plane_api_test.go +++ b/test/integration/grpc_management_plane_api_test.go @@ -31,13 +31,12 @@ import ( ) const ( - configApplyErrorMessage = "failed validating config NGINX config test failed exit status 1:" + - " nginx: [emerg] unexpected end of file, expecting \";\" or \"}\" in /etc/nginx/nginx.conf:2\nnginx: " + - "configuration file /etc/nginx/nginx.conf test failed\n" + configApplyErrorMessage = "failed to parse config invalid " + + "number of arguments in \"worker_processes\" directive in /etc/nginx/nginx.conf:1" - retryCount = 5 - retryWaitTime = 4 * time.Second - retryMaxWaitTime = 5 * time.Second + retryCount = 8 + retryWaitTime = 5 * time.Second + retryMaxWaitTime = 6 * time.Second ) var ( @@ -295,9 +294,15 @@ func TestGrpc_ConfigApply(t *testing.T) { t.Run("Test 2: Valid config", func(t *testing.T) { clearManagementPlaneResponses(t) + newConfigFile := "../config/nginx/nginx-with-test-location.conf" + + if os.Getenv("IMAGE_PATH") == "/nginx-plus/agent" { + newConfigFile = "../config/nginx/nginx-plus-with-test-location.conf" + } + err := mockManagementPlaneGrpcContainer.CopyFileToContainer( ctx, - "../config/nginx/nginx-with-test-location.conf", + newConfigFile, fmt.Sprintf("/mock-management-plane-grpc/config/%s/etc/nginx/nginx.conf", nginxInstanceID), 0o666, ) @@ -305,11 +310,17 @@ func TestGrpc_ConfigApply(t *testing.T) { performConfigApply(t, nginxInstanceID) - responses = getManagementPlaneResponses(t, 1) + responses = getManagementPlaneResponses(t, 2) t.Logf("Config apply responses: %v", responses) + sort.Slice(responses, func(i, j int) bool { + return responses[i].GetCommandResponse().GetMessage() < responses[j].GetCommandResponse().GetMessage() + }) + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) assert.Equal(t, "Config apply successful", responses[0].GetCommandResponse().GetMessage()) + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[1].GetCommandResponse().GetStatus()) + assert.Equal(t, "Successfully updated all files", responses[1].GetCommandResponse().GetMessage()) }) t.Run("Test 3: Invalid config", func(t *testing.T) { @@ -623,26 +634,36 @@ func verifyUpdateDataPlaneHealth(t *testing.T) { t.Helper() client := resty.New() + client.SetRetryCount(retryCount).SetRetryWaitTime(retryWaitTime).SetRetryMaxWaitTime(retryMaxWaitTime) + client.AddRetryCondition( + func(r *resty.Response, err error) bool { return r.StatusCode() == http.StatusNotFound }, ) url := fmt.Sprintf("http://%s/api/v1/health", mockManagementPlaneAPIAddress) + resp, err := client.R().EnableTrace().Get(url) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode()) responseData := resp.Body() + t.Logf("Response: %s", string(responseData)) + assert.True(t, json.Valid(responseData)) pb := protojson.UnmarshalOptions{DiscardUnknown: true} updateDataPlaneHealthRequest := mpi.UpdateDataPlaneHealthRequest{} + unmarshalErr := pb.Unmarshal(responseData, &updateDataPlaneHealthRequest) + require.NoError(t, unmarshalErr) t.Logf("UpdateDataPlaneHealthRequest: %v", &updateDataPlaneHealthRequest) @@ -650,16 +671,23 @@ func verifyUpdateDataPlaneHealth(t *testing.T) { assert.NotNil(t, &updateDataPlaneHealthRequest) // Verify message metadata + messageMeta := updateDataPlaneHealthRequest.GetMessageMeta() + assert.NotEmpty(t, messageMeta.GetCorrelationId()) + assert.NotEmpty(t, messageMeta.GetMessageId()) + assert.NotEmpty(t, messageMeta.GetTimestamp()) healths := updateDataPlaneHealthRequest.GetInstanceHealths() + assert.Len(t, healths, 1) // Verify health metadata + assert.NotEmpty(t, healths[0].GetInstanceId()) + assert.Equal(t, mpi.InstanceHealth_INSTANCE_HEALTH_STATUS_HEALTHY, healths[0].GetInstanceHealthStatus()) } @@ -667,22 +695,29 @@ func verifyUpdateDataPlaneStatus(t *testing.T) { t.Helper() client := resty.New() + client.SetRetryCount(3).SetRetryWaitTime(50 * time.Millisecond).SetRetryMaxWaitTime(200 * time.Millisecond) url := fmt.Sprintf("http://%s/api/v1/status", mockManagementPlaneAPIAddress) + resp, err := client.R().EnableTrace().Get(url) require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode()) updateDataPlaneStatusRequest := mpi.UpdateDataPlaneStatusRequest{} responseData := resp.Body() + t.Logf("Response: %s", string(responseData)) + assert.True(t, json.Valid(responseData)) pb := protojson.UnmarshalOptions{DiscardUnknown: true} + unmarshalErr := pb.Unmarshal(responseData, &updateDataPlaneStatusRequest) + require.NoError(t, unmarshalErr) t.Logf("UpdateDataPlaneStatusRequest: %v", &updateDataPlaneStatusRequest) @@ -690,40 +725,60 @@ func verifyUpdateDataPlaneStatus(t *testing.T) { assert.NotNil(t, &updateDataPlaneStatusRequest) // Verify message metadata + messageMeta := updateDataPlaneStatusRequest.GetMessageMeta() + assert.NotEmpty(t, messageMeta.GetCorrelationId()) + assert.NotEmpty(t, messageMeta.GetMessageId()) + assert.NotEmpty(t, messageMeta.GetTimestamp()) instances := updateDataPlaneStatusRequest.GetResource().GetInstances() + sort.Slice(instances, func(i, j int) bool { return instances[i].GetInstanceMeta().GetInstanceType() < instances[j].GetInstanceMeta().GetInstanceType() }) + assert.Len(t, instances, 2) // Verify agent instance metadata + assert.NotEmpty(t, instances[0].GetInstanceMeta().GetInstanceId()) + assert.Equal(t, mpi.InstanceMeta_INSTANCE_TYPE_AGENT, instances[0].GetInstanceMeta().GetInstanceType()) + assert.NotEmpty(t, instances[0].GetInstanceMeta().GetVersion()) // Verify agent instance configuration + assert.Empty(t, instances[0].GetInstanceConfig().GetActions()) + assert.NotEmpty(t, instances[0].GetInstanceRuntime().GetProcessId()) + assert.Equal(t, "/usr/bin/nginx-agent", instances[0].GetInstanceRuntime().GetBinaryPath()) + assert.Equal(t, "/etc/nginx-agent/nginx-agent.conf", instances[0].GetInstanceRuntime().GetConfigPath()) // Verify NGINX instance metadata + assert.NotEmpty(t, instances[1].GetInstanceMeta().GetInstanceId()) + if os.Getenv("IMAGE_PATH") == "/nginx-plus/agent" { assert.Equal(t, mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS, instances[1].GetInstanceMeta().GetInstanceType()) } else { assert.Equal(t, mpi.InstanceMeta_INSTANCE_TYPE_NGINX, instances[1].GetInstanceMeta().GetInstanceType()) } + assert.NotEmpty(t, instances[1].GetInstanceMeta().GetVersion()) // Verify NGINX instance configuration + assert.Empty(t, instances[1].GetInstanceConfig().GetActions()) + assert.NotEmpty(t, instances[1].GetInstanceRuntime().GetProcessId()) + assert.Equal(t, "/usr/sbin/nginx", instances[1].GetInstanceRuntime().GetBinaryPath()) + assert.Equal(t, "/etc/nginx/nginx.conf", instances[1].GetInstanceRuntime().GetConfigPath()) }