Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ package command
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"sync/atomic"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/cenkalti/backoff/v4"

mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
Expand Down Expand Up @@ -40,11 +44,11 @@ type (
subscribeCancel context.CancelFunc
subscribeChannel chan *mpi.ManagementPlaneRequest
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
instances []*mpi.Instance
resource *mpi.Resource
subscribeMutex sync.Mutex
subscribeClientMutex sync.Mutex
configApplyRequestQueueMutex sync.Mutex
instancesMutex sync.Mutex
resourceMutex sync.Mutex
}
)

Expand All @@ -63,7 +67,7 @@ func NewCommandService(
isConnected: isConnected,
subscribeChannel: subscribeChannel,
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
instances: []*mpi.Instance{},
resource: &mpi.Resource{},
}

var subscribeCtx context.Context
Expand Down Expand Up @@ -130,9 +134,9 @@ func (cs *CommandService) UpdateDataPlaneStatus(
}
slog.DebugContext(ctx, "UpdateDataPlaneStatus response", "response", response)

cs.instancesMutex.Lock()
defer cs.instancesMutex.Unlock()
cs.instances = resource.GetInstances()
cs.resourceMutex.Lock()
defer cs.resourceMutex.Unlock()
cs.resource = resource

return err
}
Expand Down Expand Up @@ -259,9 +263,9 @@ func (cs *CommandService) CreateConnection(

cs.isConnected.Store(true)

cs.instancesMutex.Lock()
defer cs.instancesMutex.Unlock()
cs.instances = resource.GetInstances()
cs.resourceMutex.Lock()
defer cs.resourceMutex.Unlock()
cs.resource = resource

return response, nil
}
Expand Down Expand Up @@ -405,14 +409,15 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
var err error
cs.subscribeClient, err = cs.commandServiceClient.Subscribe(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed to create subscribe client", "error", err)
subscribeErr := cs.handleSubscribeError(ctx, err, "create subscribe client")
cs.subscribeClientMutex.Unlock()

return err
return subscribeErr
}

if cs.subscribeClient == nil {
cs.subscribeClientMutex.Unlock()

return errors.New("subscribe service client not initialized yet")
}
}
Expand All @@ -421,10 +426,9 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {

request, recvError := cs.subscribeClient.Recv()
if recvError != nil {
slog.ErrorContext(ctx, "Failed to receive message from subscribe stream", "error", recvError)
cs.subscribeClient = nil

return recvError
return cs.handleSubscribeError(ctx, recvError, "receive message from subscribe stream")
}

if cs.isValidRequest(ctx, request) {
Expand All @@ -440,6 +444,25 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
}
}

func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, errorMsg string) error {
codeError, ok := status.FromError(err)

if ok && codeError.Code() == codes.Unavailable {
slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s, rpc unavailable. "+
"Trying create connection rpc", errorMsg), "error", err)
_, connectionErr := cs.CreateConnection(ctx, cs.resource)
if connectionErr != nil {
slog.ErrorContext(ctx, "Unable to create connection", "error", err)
}

return nil
}

slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s", errorMsg), "error", err)

return err
}

func (cs *CommandService) queueConfigApplyRequests(ctx context.Context, request *mpi.ManagementPlaneRequest) {
cs.configApplyRequestQueueMutex.Lock()
defer cs.configApplyRequestQueueMutex.Unlock()
Expand Down Expand Up @@ -484,13 +507,13 @@ func (cs *CommandService) checkIfInstanceExists(
) bool {
instanceFound := false

cs.instancesMutex.Lock()
for _, instance := range cs.instances {
cs.resourceMutex.Lock()
for _, instance := range cs.resource.GetInstances() {
if instance.GetInstanceMeta().GetInstanceId() == requestInstanceID {
instanceFound = true
}
}
cs.instancesMutex.Unlock()
cs.resourceMutex.Unlock()

if !instanceFound {
slog.WarnContext(
Expand Down
12 changes: 6 additions & 6 deletions internal/command/command_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
)

nginxInstance := protos.GetNginxOssInstance([]string{})
commandService.instancesMutex.Lock()
commandService.instances = append(commandService.instances, nginxInstance)
commandService.instancesMutex.Unlock()
commandService.resourceMutex.Lock()
commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance)
commandService.resourceMutex.Unlock()

defer commandService.CancelSubscription(ctx)

Expand Down Expand Up @@ -414,9 +414,9 @@ func TestCommandService_isValidRequest(t *testing.T) {

nginxInstance := protos.GetNginxOssInstance([]string{})

commandService.instancesMutex.Lock()
commandService.instances = append(commandService.instances, nginxInstance)
commandService.instancesMutex.Unlock()
commandService.resourceMutex.Lock()
commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance)
commandService.resourceMutex.Unlock()

testCases := []struct {
req *mpi.ManagementPlaneRequest
Expand Down
4 changes: 3 additions & 1 deletion internal/resource/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,9 @@ func (r *ResourceService) UpdateHTTPUpstreamServers(ctx context.Context, instanc

added, updated, deleted, updateError := plusClient.UpdateHTTPServers(ctx, upstream, servers)

slog.Warn("Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError)
if updateError != nil {
slog.Warn("Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError)
}

return added, updated, deleted, createPlusAPIError(updateError)
}
Expand Down
28 changes: 28 additions & 0 deletions test/integration/grpc_management_plane_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,34 @@ func setupLocalEnvironment(tb testing.TB) {
}(tb)
}

func TestGrpc_Reconnection(t *testing.T) {
ctx := context.Background()
teardownTest := setupConnectionTest(t, false, false)
defer teardownTest(t)

timeout := 15 * time.Second

originalID := verifyConnection(t, 2)

stopErr := mockManagementPlaneGrpcContainer.Stop(ctx, &timeout)

require.NoError(t, stopErr)

startErr := mockManagementPlaneGrpcContainer.Start(ctx)
require.NoError(t, startErr)

ipAddress, err := mockManagementPlaneGrpcContainer.Host(ctx)
require.NoError(t, err)
ports, err := mockManagementPlaneGrpcContainer.Ports(ctx)
require.NoError(t, err)
mockManagementPlaneAPIAddress = net.JoinHostPort(ipAddress, ports["9093/tcp"][0].HostPort)

time.Sleep(5 * time.Second)

currentID := verifyConnection(t, 2)
assert.Equal(t, originalID, currentID)
}

// Verify that the agent sends a connection request and an update data plane status request
func TestGrpc_StartUp(t *testing.T) {
teardownTest := setupConnectionTest(t, true, false)
Expand Down