Skip to content

Commit 92fff50

Browse files
authored
Send create connection after disconnect from management plane (#967)
1 parent 0f0c0e2 commit 92fff50

File tree

4 files changed

+76
-23
lines changed

4 files changed

+76
-23
lines changed

internal/command/command_service.go

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ package command
88
import (
99
"context"
1010
"errors"
11+
"fmt"
1112
"log/slog"
1213
"sync"
1314
"sync/atomic"
1415

16+
"google.golang.org/grpc/codes"
17+
"google.golang.org/grpc/status"
18+
1519
"github.com/cenkalti/backoff/v4"
1620

1721
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
@@ -40,11 +44,11 @@ type (
4044
subscribeCancel context.CancelFunc
4145
subscribeChannel chan *mpi.ManagementPlaneRequest
4246
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
43-
instances []*mpi.Instance
47+
resource *mpi.Resource
4448
subscribeMutex sync.Mutex
4549
subscribeClientMutex sync.Mutex
4650
configApplyRequestQueueMutex sync.Mutex
47-
instancesMutex sync.Mutex
51+
resourceMutex sync.Mutex
4852
}
4953
)
5054

@@ -63,7 +67,7 @@ func NewCommandService(
6367
isConnected: isConnected,
6468
subscribeChannel: subscribeChannel,
6569
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
66-
instances: []*mpi.Instance{},
70+
resource: &mpi.Resource{},
6771
}
6872

6973
var subscribeCtx context.Context
@@ -130,9 +134,9 @@ func (cs *CommandService) UpdateDataPlaneStatus(
130134
}
131135
slog.DebugContext(ctx, "UpdateDataPlaneStatus response", "response", response)
132136

133-
cs.instancesMutex.Lock()
134-
defer cs.instancesMutex.Unlock()
135-
cs.instances = resource.GetInstances()
137+
cs.resourceMutex.Lock()
138+
defer cs.resourceMutex.Unlock()
139+
cs.resource = resource
136140

137141
return err
138142
}
@@ -259,9 +263,9 @@ func (cs *CommandService) CreateConnection(
259263

260264
cs.isConnected.Store(true)
261265

262-
cs.instancesMutex.Lock()
263-
defer cs.instancesMutex.Unlock()
264-
cs.instances = resource.GetInstances()
266+
cs.resourceMutex.Lock()
267+
defer cs.resourceMutex.Unlock()
268+
cs.resource = resource
265269

266270
return response, nil
267271
}
@@ -405,14 +409,15 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
405409
var err error
406410
cs.subscribeClient, err = cs.commandServiceClient.Subscribe(ctx)
407411
if err != nil {
408-
slog.ErrorContext(ctx, "Failed to create subscribe client", "error", err)
412+
subscribeErr := cs.handleSubscribeError(ctx, err, "create subscribe client")
409413
cs.subscribeClientMutex.Unlock()
410414

411-
return err
415+
return subscribeErr
412416
}
413417

414418
if cs.subscribeClient == nil {
415419
cs.subscribeClientMutex.Unlock()
420+
416421
return errors.New("subscribe service client not initialized yet")
417422
}
418423
}
@@ -421,10 +426,9 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
421426

422427
request, recvError := cs.subscribeClient.Recv()
423428
if recvError != nil {
424-
slog.ErrorContext(ctx, "Failed to receive message from subscribe stream", "error", recvError)
425429
cs.subscribeClient = nil
426430

427-
return recvError
431+
return cs.handleSubscribeError(ctx, recvError, "receive message from subscribe stream")
428432
}
429433

430434
if cs.isValidRequest(ctx, request) {
@@ -440,6 +444,25 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
440444
}
441445
}
442446

447+
func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, errorMsg string) error {
448+
codeError, ok := status.FromError(err)
449+
450+
if ok && codeError.Code() == codes.Unavailable {
451+
slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s, rpc unavailable. "+
452+
"Trying create connection rpc", errorMsg), "error", err)
453+
_, connectionErr := cs.CreateConnection(ctx, cs.resource)
454+
if connectionErr != nil {
455+
slog.ErrorContext(ctx, "Unable to create connection", "error", err)
456+
}
457+
458+
return nil
459+
}
460+
461+
slog.ErrorContext(ctx, fmt.Sprintf("Failed to %s", errorMsg), "error", err)
462+
463+
return err
464+
}
465+
443466
func (cs *CommandService) queueConfigApplyRequests(ctx context.Context, request *mpi.ManagementPlaneRequest) {
444467
cs.configApplyRequestQueueMutex.Lock()
445468
defer cs.configApplyRequestQueueMutex.Unlock()
@@ -484,13 +507,13 @@ func (cs *CommandService) checkIfInstanceExists(
484507
) bool {
485508
instanceFound := false
486509

487-
cs.instancesMutex.Lock()
488-
for _, instance := range cs.instances {
510+
cs.resourceMutex.Lock()
511+
for _, instance := range cs.resource.GetInstances() {
489512
if instance.GetInstanceMeta().GetInstanceId() == requestInstanceID {
490513
instanceFound = true
491514
}
492515
}
493-
cs.instancesMutex.Unlock()
516+
cs.resourceMutex.Unlock()
494517

495518
if !instanceFound {
496519
slog.WarnContext(

internal/command/command_service_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
115115
)
116116

117117
nginxInstance := protos.GetNginxOssInstance([]string{})
118-
commandService.instancesMutex.Lock()
119-
commandService.instances = append(commandService.instances, nginxInstance)
120-
commandService.instancesMutex.Unlock()
118+
commandService.resourceMutex.Lock()
119+
commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance)
120+
commandService.resourceMutex.Unlock()
121121

122122
defer commandService.CancelSubscription(ctx)
123123

@@ -414,9 +414,9 @@ func TestCommandService_isValidRequest(t *testing.T) {
414414

415415
nginxInstance := protos.GetNginxOssInstance([]string{})
416416

417-
commandService.instancesMutex.Lock()
418-
commandService.instances = append(commandService.instances, nginxInstance)
419-
commandService.instancesMutex.Unlock()
417+
commandService.resourceMutex.Lock()
418+
commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance)
419+
commandService.resourceMutex.Unlock()
420420

421421
testCases := []struct {
422422
req *mpi.ManagementPlaneRequest

internal/resource/resource_service.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,9 @@ func (r *ResourceService) UpdateHTTPUpstreamServers(ctx context.Context, instanc
267267

268268
added, updated, deleted, updateError := plusClient.UpdateHTTPServers(ctx, upstream, servers)
269269

270-
slog.Warn("Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError)
270+
if updateError != nil {
271+
slog.Warn("Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError)
272+
}
271273

272274
return added, updated, deleted, createPlusAPIError(updateError)
273275
}

test/integration/grpc_management_plane_api_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,34 @@ func setupLocalEnvironment(tb testing.TB) {
190190
}(tb)
191191
}
192192

193+
func TestGrpc_Reconnection(t *testing.T) {
194+
ctx := context.Background()
195+
teardownTest := setupConnectionTest(t, false, false)
196+
defer teardownTest(t)
197+
198+
timeout := 15 * time.Second
199+
200+
originalID := verifyConnection(t, 2)
201+
202+
stopErr := mockManagementPlaneGrpcContainer.Stop(ctx, &timeout)
203+
204+
require.NoError(t, stopErr)
205+
206+
startErr := mockManagementPlaneGrpcContainer.Start(ctx)
207+
require.NoError(t, startErr)
208+
209+
ipAddress, err := mockManagementPlaneGrpcContainer.Host(ctx)
210+
require.NoError(t, err)
211+
ports, err := mockManagementPlaneGrpcContainer.Ports(ctx)
212+
require.NoError(t, err)
213+
mockManagementPlaneAPIAddress = net.JoinHostPort(ipAddress, ports["9093/tcp"][0].HostPort)
214+
215+
time.Sleep(5 * time.Second)
216+
217+
currentID := verifyConnection(t, 2)
218+
assert.Equal(t, originalID, currentID)
219+
}
220+
193221
// Verify that the agent sends a connection request and an update data plane status request
194222
func TestGrpc_StartUp(t *testing.T) {
195223
teardownTest := setupConnectionTest(t, true, false)

0 commit comments

Comments
 (0)