diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 78bd7df08..61127db9e 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -8,10 +8,14 @@ package command import ( "context" "errors" + "fmt" "log/slog" "sync" "sync/atomic" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/cenkalti/backoff/v4" mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" @@ -40,11 +44,11 @@ type ( subscribeCancel context.CancelFunc subscribeChannel chan *mpi.ManagementPlaneRequest configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID - instances []*mpi.Instance + resource *mpi.Resource subscribeMutex sync.Mutex subscribeClientMutex sync.Mutex configApplyRequestQueueMutex sync.Mutex - instancesMutex sync.Mutex + resourceMutex sync.Mutex } ) @@ -63,7 +67,7 @@ func NewCommandService( isConnected: isConnected, subscribeChannel: subscribeChannel, configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest), - instances: []*mpi.Instance{}, + resource: &mpi.Resource{}, } var subscribeCtx context.Context @@ -130,9 +134,9 @@ func (cs *CommandService) UpdateDataPlaneStatus( } slog.DebugContext(ctx, "UpdateDataPlaneStatus response", "response", response) - cs.instancesMutex.Lock() - defer cs.instancesMutex.Unlock() - cs.instances = resource.GetInstances() + cs.resourceMutex.Lock() + defer cs.resourceMutex.Unlock() + cs.resource = resource return err } @@ -259,9 +263,9 @@ func (cs *CommandService) CreateConnection( cs.isConnected.Store(true) - cs.instancesMutex.Lock() - defer cs.instancesMutex.Unlock() - cs.instances = resource.GetInstances() + cs.resourceMutex.Lock() + defer cs.resourceMutex.Unlock() + cs.resource = resource return response, nil } @@ -405,14 +409,15 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error { var err error cs.subscribeClient, err = cs.commandServiceClient.Subscribe(ctx) if err != nil { - slog.ErrorContext(ctx, "Failed to create subscribe client", "error", err) + subscribeErr := cs.handleSubscribeError(ctx, err, "create subscribe client") cs.subscribeClientMutex.Unlock() - return err + return subscribeErr } if cs.subscribeClient == nil { cs.subscribeClientMutex.Unlock() + return errors.New("subscribe service client not initialized yet") } } @@ -421,10 +426,9 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error { request, recvError := cs.subscribeClient.Recv() if recvError != nil { - slog.ErrorContext(ctx, "Failed to receive message from subscribe stream", "error", recvError) cs.subscribeClient = nil - return recvError + return cs.handleSubscribeError(ctx, recvError, "receive message from subscribe stream") } if cs.isValidRequest(ctx, request) { @@ -440,6 +444,25 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error { } } +func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, errorMsg string) error { + codeError, ok := status.FromError(err) + + if ok && codeError.Code() == codes.Unavailable { + slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s, rpc unavailable. "+ + "Trying create connection rpc", errorMsg), "error", err) + _, connectionErr := cs.CreateConnection(ctx, cs.resource) + if connectionErr != nil { + slog.ErrorContext(ctx, "Unable to create connection", "error", err) + } + + return nil + } + + slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s", errorMsg), "error", err) + + return err +} + func (cs *CommandService) queueConfigApplyRequests(ctx context.Context, request *mpi.ManagementPlaneRequest) { cs.configApplyRequestQueueMutex.Lock() defer cs.configApplyRequestQueueMutex.Unlock() @@ -484,13 +507,13 @@ func (cs *CommandService) checkIfInstanceExists( ) bool { instanceFound := false - cs.instancesMutex.Lock() - for _, instance := range cs.instances { + cs.resourceMutex.Lock() + for _, instance := range cs.resource.GetInstances() { if instance.GetInstanceMeta().GetInstanceId() == requestInstanceID { instanceFound = true } } - cs.instancesMutex.Unlock() + cs.resourceMutex.Unlock() if !instanceFound { slog.WarnContext( diff --git a/internal/command/command_service_test.go b/internal/command/command_service_test.go index a148bb139..899653f8f 100644 --- a/internal/command/command_service_test.go +++ b/internal/command/command_service_test.go @@ -115,9 +115,9 @@ func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) { ) nginxInstance := protos.GetNginxOssInstance([]string{}) - commandService.instancesMutex.Lock() - commandService.instances = append(commandService.instances, nginxInstance) - commandService.instancesMutex.Unlock() + commandService.resourceMutex.Lock() + commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance) + commandService.resourceMutex.Unlock() defer commandService.CancelSubscription(ctx) @@ -414,9 +414,9 @@ func TestCommandService_isValidRequest(t *testing.T) { nginxInstance := protos.GetNginxOssInstance([]string{}) - commandService.instancesMutex.Lock() - commandService.instances = append(commandService.instances, nginxInstance) - commandService.instancesMutex.Unlock() + commandService.resourceMutex.Lock() + commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance) + commandService.resourceMutex.Unlock() testCases := []struct { req *mpi.ManagementPlaneRequest diff --git a/internal/resource/resource_service.go b/internal/resource/resource_service.go index d74836e64..de1dc0281 100644 --- a/internal/resource/resource_service.go +++ b/internal/resource/resource_service.go @@ -267,7 +267,9 @@ func (r *ResourceService) UpdateHTTPUpstreamServers(ctx context.Context, instanc added, updated, deleted, updateError := plusClient.UpdateHTTPServers(ctx, upstream, servers) - slog.Warn("Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError) + if updateError != nil { + slog.Warn("Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError) + } return added, updated, deleted, createPlusAPIError(updateError) } diff --git a/test/integration/grpc_management_plane_api_test.go b/test/integration/grpc_management_plane_api_test.go index b2936c19e..f7e1dc00a 100644 --- a/test/integration/grpc_management_plane_api_test.go +++ b/test/integration/grpc_management_plane_api_test.go @@ -190,6 +190,34 @@ func setupLocalEnvironment(tb testing.TB) { }(tb) } +func TestGrpc_Reconnection(t *testing.T) { + ctx := context.Background() + teardownTest := setupConnectionTest(t, false, false) + defer teardownTest(t) + + timeout := 15 * time.Second + + originalID := verifyConnection(t, 2) + + stopErr := mockManagementPlaneGrpcContainer.Stop(ctx, &timeout) + + require.NoError(t, stopErr) + + startErr := mockManagementPlaneGrpcContainer.Start(ctx) + require.NoError(t, startErr) + + ipAddress, err := mockManagementPlaneGrpcContainer.Host(ctx) + require.NoError(t, err) + ports, err := mockManagementPlaneGrpcContainer.Ports(ctx) + require.NoError(t, err) + mockManagementPlaneAPIAddress = net.JoinHostPort(ipAddress, ports["9093/tcp"][0].HostPort) + + time.Sleep(5 * time.Second) + + currentID := verifyConnection(t, 2) + assert.Equal(t, originalID, currentID) +} + // Verify that the agent sends a connection request and an update data plane status request func TestGrpc_StartUp(t *testing.T) { teardownTest := setupConnectionTest(t, true, false)