Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions internal/resource/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"strings"
"sync"

"google.golang.org/protobuf/proto"

"github.com/nginxinc/nginx-plus-go-client/v2/client"
"google.golang.org/protobuf/types/known/structpb"

Expand Down Expand Up @@ -131,12 +133,18 @@ func (r *ResourceService) UpdateInstances(instanceList []*mpi.Instance) *mpi.Res
defer r.resourceMutex.Unlock()

for _, updatedInstance := range instanceList {
for _, instance := range r.resource.GetInstances() {
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
if ok {
for _, instance := range resourceCopy.GetInstances() {
if updatedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
instance.InstanceMeta = updatedInstance.GetInstanceMeta()
instance.InstanceRuntime = updatedInstance.GetInstanceRuntime()
instance.InstanceConfig = updatedInstance.GetInstanceConfig()
}
}
r.resource = resourceCopy
} else {
slog.Warn("Error updating resource instances", "instances", instanceList)
}
}

Expand All @@ -148,10 +156,15 @@ func (r *ResourceService) DeleteInstances(instanceList []*mpi.Instance) *mpi.Res
defer r.resourceMutex.Unlock()

for _, deletedInstance := range instanceList {
for index, instance := range r.resource.GetInstances() {
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
resourceCopy, ok := proto.Clone(r.resource).(*mpi.Resource)
if ok {
for index, instance := range resourceCopy.GetInstances() {
if deletedInstance.GetInstanceMeta().GetInstanceId() == instance.GetInstanceMeta().GetInstanceId() {
r.resource.Instances = append(r.resource.Instances[:index], r.resource.GetInstances()[index+1:]...)
}
}
} else {
slog.Warn("Error deleting instances from resource", "instances", instanceList)
}
}
r.RemoveOperator(instanceList)
Expand Down
2 changes: 2 additions & 0 deletions internal/watcher/health/health_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService {
}

func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()
for _, instance := range instances {
switch instance.GetInstanceMeta().GetInstanceType() {
case mpi.InstanceMeta_INSTANCE_TYPE_NGINX, mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS:
Expand Down
5 changes: 2 additions & 3 deletions internal/watcher/instance/instance_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ func (iw *InstanceWatcherService) ReparseConfig(ctx context.Context, instanceID
func (iw *InstanceWatcherService) checkForUpdates(
ctx context.Context,
) {
iw.cacheMutex.Lock()
defer iw.cacheMutex.Unlock()

var instancesToParse []*mpi.Instance
correlationID := logger.GenerateCorrelationID()
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, correlationID)
Expand Down Expand Up @@ -245,6 +242,8 @@ func (iw *InstanceWatcherService) instanceUpdates(ctx context.Context) (
instanceUpdates InstanceUpdates,
err error,
) {
iw.cacheMutex.Lock()
defer iw.cacheMutex.Unlock()
processes, err := iw.processOperator.Processes(ctx)
if err != nil {
return instanceUpdates, err
Expand Down
12 changes: 11 additions & 1 deletion internal/watcher/watcher_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"log/slog"
"slices"
"sync"

mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"

Expand Down Expand Up @@ -39,6 +40,7 @@ type (
fileUpdatesChannel chan file.FileUpdateMessage
cancel context.CancelFunc
instancesWithConfigApplyInProgress []string
watcherMutex sync.Mutex
}

instanceWatcherServiceInterface interface {
Expand All @@ -65,6 +67,7 @@ func NewWatcher(agentConfig *config.Config) *Watcher {
instanceHealthChannel: make(chan health.InstanceHealthMessage),
fileUpdatesChannel: make(chan file.FileUpdateMessage),
instancesWithConfigApplyInProgress: []string{},
watcherMutex: sync.Mutex{},
}
}

Expand Down Expand Up @@ -148,6 +151,8 @@ func (w *Watcher) handleConfigApplyRequest(ctx context.Context, msg *bus.Message

instanceID := request.ConfigApplyRequest.GetOverview().GetConfigVersion().GetInstanceId()

w.watcherMutex.Lock()
defer w.watcherMutex.Unlock()
w.instancesWithConfigApplyInProgress = append(w.instancesWithConfigApplyInProgress, instanceID)
w.fileWatcherService.SetEnabled(false)
}
Expand All @@ -163,13 +168,15 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message

instanceID := response.GetInstanceId()

w.watcherMutex.Lock()
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
w.instancesWithConfigApplyInProgress,
func(element string) bool {
return element == instanceID
},
)
w.fileWatcherService.SetEnabled(true)
w.watcherMutex.Unlock()

w.instanceWatcherService.ReparseConfig(ctx, instanceID)
}
Expand All @@ -191,6 +198,8 @@ func (w *Watcher) handleConfigApplyComplete(ctx context.Context, msg *bus.Messag

instanceID := response.GetInstanceId()

w.watcherMutex.Lock()
defer w.watcherMutex.Unlock()
w.instancesWithConfigApplyInProgress = slices.DeleteFunc(
w.instancesWithConfigApplyInProgress,
func(element string) bool {
Expand All @@ -210,7 +219,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) {
w.handleInstanceUpdates(newCtx, message)
case message := <-w.nginxConfigContextChannel:
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)

w.watcherMutex.Lock()
if !slices.Contains(w.instancesWithConfigApplyInProgress, message.NginxConfigContext.InstanceID) {
slog.DebugContext(
newCtx,
Expand All @@ -229,6 +238,7 @@ func (w *Watcher) monitorWatchers(ctx context.Context) {
"nginx_config_context", message.NginxConfigContext,
)
}
w.watcherMutex.Unlock()
case message := <-w.instanceHealthChannel:
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.CorrelationID)
w.messagePipe.Process(newCtx, &bus.Message{
Expand Down
1 change: 1 addition & 0 deletions test/config/nginx/nginx-plus.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
worker_processes 1;
error_log /var/log/nginx/error.log;

events {
worker_connections 1024;
Expand Down
4 changes: 2 additions & 2 deletions test/helpers/test_containers_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func LogAndTerminateContainers(
) {
tb.Helper()

tb.Log("Logging nginx agent container logs")
tb.Log("======================== Logging Agent Container Logs ========================")
logReader, err := agentContainer.Logs(ctx)
require.NoError(tb, err)

Expand All @@ -294,7 +294,7 @@ func LogAndTerminateContainers(
require.NoError(tb, err)

if mockManagementPlaneContainer != nil {
tb.Log("Logging mock management container logs")
tb.Log("======================== Logging Mock Management Container Logs ========================")
logReader, err = mockManagementPlaneContainer.Logs(ctx)
require.NoError(tb, err)

Expand Down
60 changes: 15 additions & 45 deletions test/integration/grpc_management_plane_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,6 @@ func TestGrpc_ConfigApply(t *testing.T) {
teardownTest := setupConnectionTest(t, false, false)
defer teardownTest(t)

instanceType := "OSS"
if os.Getenv("IMAGE_PATH") == "/nginx-plus/agent" {
instanceType = "PLUS"
}

nginxInstanceID := verifyConnection(t, 2)

responses := getManagementPlaneResponses(t, 1)
Expand Down Expand Up @@ -312,24 +307,11 @@ func TestGrpc_ConfigApply(t *testing.T) {

performConfigApply(t, nginxInstanceID)

if instanceType == "OSS" {
responses = getManagementPlaneResponses(t, 1)
t.Logf("Config apply responses: %v", responses)

assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus())
assert.Equal(t, "Config apply successful", responses[0].GetCommandResponse().GetMessage())
} else {
// NGINX Plus contains two extra Successfully updated all files responses as the NginxConfigContext
// is updated, and the file overview is then updated
responses = getManagementPlaneResponses(t, 3)
t.Logf("Config apply responses: %v", responses)
assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus())
assert.Equal(t, "Config apply successful", responses[0].GetCommandResponse().GetMessage())
assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[1].GetCommandResponse().GetStatus())
assert.Equal(t, "Successfully updated all files", responses[1].GetCommandResponse().GetMessage())
assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[2].GetCommandResponse().GetStatus())
assert.Equal(t, "Successfully updated all files", responses[2].GetCommandResponse().GetMessage())
}
responses = getManagementPlaneResponses(t, 1)
t.Logf("Config apply responses: %v", responses)

assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus())
assert.Equal(t, "Config apply successful", responses[0].GetCommandResponse().GetMessage())
})

t.Run("Test 3: Invalid config", func(t *testing.T) {
Expand All @@ -344,27 +326,15 @@ func TestGrpc_ConfigApply(t *testing.T) {

performConfigApply(t, nginxInstanceID)

if instanceType == "OSS" {
responses = getManagementPlaneResponses(t, 2)
t.Logf("Config apply responses: %v", responses)

assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_ERROR, responses[0].GetCommandResponse().GetStatus())
assert.Equal(t, "Config apply failed, rolling back config", responses[0].GetCommandResponse().GetMessage())
assert.Equal(t, configApplyErrorMessage, responses[0].GetCommandResponse().GetError())
assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_FAILURE, responses[1].GetCommandResponse().GetStatus())
assert.Equal(t, "Config apply failed, rollback successful", responses[1].GetCommandResponse().GetMessage())
assert.Equal(t, configApplyErrorMessage, responses[1].GetCommandResponse().GetError())
} else {
responses = getManagementPlaneResponses(t, 2)
t.Logf("Config apply responses: %v", len(responses))

assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_ERROR, responses[0].GetCommandResponse().GetStatus())
assert.Equal(t, "Config apply failed, rolling back config", responses[0].GetCommandResponse().GetMessage())
assert.Equal(t, configApplyErrorMessage, responses[0].GetCommandResponse().GetError())
assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_FAILURE, responses[1].GetCommandResponse().GetStatus())
assert.Equal(t, "Config apply failed, rollback successful", responses[1].GetCommandResponse().GetMessage())
assert.Equal(t, configApplyErrorMessage, responses[1].GetCommandResponse().GetError())
}
responses = getManagementPlaneResponses(t, 2)
t.Logf("Config apply responses: %v", responses)

assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_ERROR, responses[0].GetCommandResponse().GetStatus())
assert.Equal(t, "Config apply failed, rolling back config", responses[0].GetCommandResponse().GetMessage())
assert.Equal(t, configApplyErrorMessage, responses[0].GetCommandResponse().GetError())
assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_FAILURE, responses[1].GetCommandResponse().GetStatus())
assert.Equal(t, "Config apply failed, rollback successful", responses[1].GetCommandResponse().GetMessage())
assert.Equal(t, configApplyErrorMessage, responses[1].GetCommandResponse().GetError())
})

t.Run("Test 4: File not in allowed directory", func(t *testing.T) {
Expand Down Expand Up @@ -481,7 +451,7 @@ func performInvalidConfigApply(t *testing.T, nginxInstanceID string) {
"size": 0
},
"action": "FILE_ACTION_UPDATE"
},
},
{
"file_meta": {
"name": "/unknown/nginx.conf",
Expand Down