Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/resource/resource_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) {

return
}
resource := r.resourceService.UpdateInstances(instanceList)
resource := r.resourceService.UpdateInstances(ctx, instanceList)

r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})

Expand All @@ -107,7 +107,7 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) {

return
}
resource := r.resourceService.DeleteInstances(instanceList)
resource := r.resourceService.DeleteInstances(ctx, instanceList)

r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ResourceUpdateTopic, Data: resource})

Expand Down
39 changes: 27 additions & 12 deletions internal/resource/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"strings"
"sync"

"google.golang.org/protobuf/proto"

"github.com/nginxinc/nginx-plus-go-client/v2/client"
"google.golang.org/protobuf/types/known/structpb"

Expand All @@ -42,8 +44,8 @@ const (

type resourceServiceInterface interface {
AddInstances(instanceList []*mpi.Instance) *mpi.Resource
UpdateInstances(instanceList []*mpi.Instance) *mpi.Resource
DeleteInstances(instanceList []*mpi.Instance) *mpi.Resource
UpdateInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource
DeleteInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource
ApplyConfig(ctx context.Context, instanceID string) error
Instance(instanceID string) *mpi.Instance
GetHTTPUpstreamServers(ctx context.Context, instance *mpi.Instance, upstreams string) ([]client.UpstreamServer,
Expand Down Expand Up @@ -126,32 +128,45 @@ func (r *ResourceService) RemoveOperator(instanceList []*mpi.Instance) {
}
}

func (r *ResourceService) UpdateInstances(instanceList []*mpi.Instance) *mpi.Resource {
func (r *ResourceService) UpdateInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource {
r.resourceMutex.Lock()
defer r.resourceMutex.Unlock()

for _, updatedInstance := range instanceList {
for _, instance := range r.resource.GetInstances() {
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
if ok {
for _, instance := range resourceCopy.GetInstances() {
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
}
}
r.resource = resourceCopy
} else {
slog.WarnContext(ctx, "Unable to clone resource while updating instances", "resource",
r.resource, "instances", instanceList)
}
}

return r.resource
}

func (r *ResourceService) DeleteInstances(instanceList []*mpi.Instance) *mpi.Resource {
func (r *ResourceService) DeleteInstances(ctx context.Context, instanceList []*mpi.Instance) *mpi.Resource {
r.resourceMutex.Lock()
defer r.resourceMutex.Unlock()

for _, deletedInstance := range instanceList {
for index, instance := range r.resource.GetInstances() {
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
if ok {
for index, instance := range resourceCopy.GetInstances() {
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
}
}
} else {
slog.WarnContext(ctx, "Unable to clone resource while deleting instances", "resource",
r.resource, "instances", instanceList)
}
}
r.RemoveOperator(instanceList)
Expand Down
4 changes: 2 additions & 2 deletions internal/resource/resource_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestResourceService_UpdateInstance(t *testing.T) {
t.Run(test.name, func(tt *testing.T) {
resourceService := NewResourceService(ctx, types.AgentConfig())
resourceService.resource.Instances = []*v1.Instance{protos.GetNginxOssInstance([]string{})}
resource := resourceService.UpdateInstances(test.instanceList)
resource := resourceService.UpdateInstances(ctx, test.instanceList)
assert.Equal(tt, test.resource.GetInstances(), resource.GetInstances())
})
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestResourceService_DeleteInstance(t *testing.T) {
protos.GetNginxOssInstance([]string{}),
protos.GetNginxPlusInstance([]string{}),
}
resource := resourceService.DeleteInstances(test.instanceList)
resource := resourceService.DeleteInstances(ctx, test.instanceList)
assert.Equal(tt, test.resource.GetInstances(), resource.GetInstances())
})
}
Expand Down
60 changes: 32 additions & 28 deletions internal/resource/resourcefakes/fake_resource_service_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions internal/watcher/health/health_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService {
}

func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()
for _, instance := range instances {
switch instance.GetInstanceMeta().GetInstanceType() {
case mpi.InstanceMeta_INSTANCE_TYPE_NGINX, mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS:
Expand Down
5 changes: 2 additions & 3 deletions internal/watcher/instance/instance_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ func (iw *InstanceWatcherService) ReparseConfig(ctx context.Context, instanceID
func (iw *InstanceWatcherService) checkForUpdates(
ctx context.Context,
) {
iw.cacheMutex.Lock()
defer iw.cacheMutex.Unlock()

var instancesToParse []*mpi.Instance
correlationID := logger.GenerateCorrelationID()
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, correlationID)
Expand Down Expand Up @@ -245,6 +242,8 @@ func (iw *InstanceWatcherService) instanceUpdates(ctx context.Context) (
instanceUpdates InstanceUpdates,
err error,
) {
iw.cacheMutex.Lock()
defer iw.cacheMutex.Unlock()
processes, err := iw.processOperator.Processes(ctx)
if err != nil {
return instanceUpdates, err
Expand Down
12 changes: 11 additions & 1 deletion internal/watcher/watcher_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"log/slog"
"slices"
"sync"

mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"

Expand Down Expand Up @@ -39,6 +40,7 @@ type (
fileUpdatesChannel chan file.FileUpdateMessage
cancel context.CancelFunc
instancesWithConfigApplyInProgress []string
watcherMutex sync.Mutex
}

instanceWatcherServiceInterface interface {
Expand All @@ -65,6 +67,7 @@ func NewWatcher(agentConfig *config.Config) *Watcher {
instanceHealthChannel: make(chan health.InstanceHealthMessage),
fileUpdatesChannel: make(chan file.FileUpdateMessage),
instancesWithConfigApplyInProgress: []string{},
watcherMutex: sync.Mutex{},
}
}

Expand Down Expand Up @@ -148,6 +151,8 @@ func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message

instanceID := request.ConfigApplyRequest.GetOverview().GetConfigVersion().GetInstanceId()

w.watcherMutex.Lock()
defer w.watcherMutex.Unlock()
w.instancesWithConfigApplyInProgress = append(w.instancesWithConfigApplyInProgress, instanceID)
w.fileWatcherService.SetEnabled(false)
}
Expand All @@ -163,13 +168,15 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message

instanceID := response.GetInstanceId()

w.watcherMutex.Lock()
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
w.instancesWithConfigApplyInProgress,
func(element string) bool {
return element == instanceID
},
)
w.fileWatcherService.SetEnabled(true)
w.watcherMutex.Unlock()

w.instanceWatcherService.ReparseConfig(ctx, instanceID)
}
Expand All @@ -191,6 +198,8 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag

instanceID := response.GetInstanceId()

w.watcherMutex.Lock()
defer w.watcherMutex.Unlock()
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
w.instancesWithConfigApplyInProgress,
func(element string) bool {
Expand All @@ -210,7 +219,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) {
w.handleInstanceUpdates(newCtx, message)
case message := <-w.nginxConfigContextChannel:
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)

w.watcherMutex.Lock()
if !slices.Contains(w.instancesWithConfigApplyInProgress, message.NginxConfigContext.InstanceID) {
slog.DebugContext(
newCtx,
Expand All @@ -229,6 +238,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) {
"nginx_config_context", message.NginxConfigContext,
)
}
w.watcherMutex.Unlock()
case message := <-w.instanceHealthChannel:
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)
w.messagePipe.Process(newCtx, &bus.Message{
Expand Down
1 change: 1 addition & 0 deletions test/config/nginx/nginx-plus.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
worker_processes 1;
error_log /var/log/nginx/error.log;

events {
worker_connections 1024;
Expand Down
4 changes: 2 additions & 2 deletions test/helpers/test_containers_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func LogAndTerminateContainers(
) {
tb.Helper()

tb.Log("Logging nginx agent container logs")
tb.Log("======================== Logging Agent Container Logs ========================")
logReader, err := agentContainer.Logs(ctx)
require.NoError(tb, err)

Expand All @@ -294,7 +294,7 @@ func LogAndTerminateContainers(
require.NoError(tb, err)

if mockManagementPlaneContainer != nil {
tb.Log("Logging mock management container logs")
tb.Log("======================== Logging Mock Management Container Logs ========================")
logReader, err = mockManagementPlaneContainer.Logs(ctx)
require.NoError(tb, err)

Expand Down
Loading
Loading