Skip to content

Commit 9b1bc5b

Browse files
authored
Add config apply request queue (#949)
1 parent b6e143c commit 9b1bc5b

File tree

2 files changed

+310
-13
lines changed

2 files changed

+310
-13
lines changed

internal/command/command_service.go

Lines changed: 114 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,16 @@ const (
3333

3434
type (
3535
CommandService struct {
36-
commandServiceClient mpi.CommandServiceClient
37-
subscribeClient mpi.CommandService_SubscribeClient
38-
agentConfig *config.Config
39-
isConnected *atomic.Bool
40-
subscribeCancel context.CancelFunc
41-
subscribeChannel chan *mpi.ManagementPlaneRequest
42-
subscribeMutex sync.Mutex
43-
subscribeClientMutex sync.Mutex
36+
commandServiceClient mpi.CommandServiceClient
37+
subscribeClient mpi.CommandService_SubscribeClient
38+
agentConfig *config.Config
39+
isConnected *atomic.Bool
40+
subscribeCancel context.CancelFunc
41+
subscribeChannel chan *mpi.ManagementPlaneRequest
42+
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
43+
subscribeMutex sync.Mutex
44+
subscribeClientMutex sync.Mutex
45+
configApplyRequestQueueMutex sync.Mutex
4446
}
4547
)
4648

@@ -54,10 +56,11 @@ func NewCommandService(
5456
isConnected.Store(false)
5557

5658
commandService := &CommandService{
57-
commandServiceClient: commandServiceClient,
58-
agentConfig: agentConfig,
59-
isConnected: isConnected,
60-
subscribeChannel: subscribeChannel,
59+
commandServiceClient: commandServiceClient,
60+
agentConfig: agentConfig,
61+
isConnected: isConnected,
62+
subscribeChannel: subscribeChannel,
63+
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
6164
}
6265

6366
var subscribeCtx context.Context
@@ -165,6 +168,11 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m
165168
backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime)
166169
defer backoffCancel()
167170

171+
err := cs.handleConfigApplyResponse(ctx, response)
172+
if err != nil {
173+
return err
174+
}
175+
168176
return backoff.Retry(
169177
cs.sendDataPlaneResponseCallback(ctx, response),
170178
backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff),
@@ -271,6 +279,81 @@ func (cs *CommandService) sendDataPlaneResponseCallback(
271279
}
272280
}
273281

282+
func (cs *CommandService) handleConfigApplyResponse(
283+
ctx context.Context,
284+
response *mpi.DataPlaneResponse,
285+
) error {
286+
cs.configApplyRequestQueueMutex.Lock()
287+
defer cs.configApplyRequestQueueMutex.Unlock()
288+
289+
isConfigApplyResponse := false
290+
var indexOfConfigApplyRequest int
291+
292+
for index, configApplyRequest := range cs.configApplyRequestQueue[response.GetInstanceId()] {
293+
if configApplyRequest.GetMessageMeta().GetCorrelationId() == response.GetMessageMeta().GetCorrelationId() {
294+
indexOfConfigApplyRequest = index
295+
isConfigApplyResponse = true
296+
297+
break
298+
}
299+
}
300+
301+
if isConfigApplyResponse {
302+
err := cs.sendResponseForQueuedConfigApplyRequests(ctx, response, indexOfConfigApplyRequest)
303+
if err != nil {
304+
return err
305+
}
306+
}
307+
308+
return nil
309+
}
310+
311+
func (cs *CommandService) sendResponseForQueuedConfigApplyRequests(
312+
ctx context.Context,
313+
response *mpi.DataPlaneResponse,
314+
indexOfConfigApplyRequest int,
315+
) error {
316+
instanceID := response.GetInstanceId()
317+
for i := 0; i < indexOfConfigApplyRequest; i++ {
318+
newResponse := response
319+
320+
newResponse.GetMessageMeta().MessageId = proto.GenerateMessageID()
321+
322+
request := cs.configApplyRequestQueue[instanceID][i]
323+
newResponse.GetMessageMeta().CorrelationId = request.GetMessageMeta().GetCorrelationId()
324+
325+
slog.DebugContext(
326+
ctx,
327+
"Sending data plane response for queued config apply request",
328+
"response", newResponse,
329+
)
330+
331+
backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime)
332+
333+
err := backoff.Retry(
334+
cs.sendDataPlaneResponseCallback(ctx, newResponse),
335+
backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff),
336+
)
337+
if err != nil {
338+
slog.ErrorContext(ctx, "Failed to send data plane response", "error", err)
339+
backoffCancel()
340+
341+
return err
342+
}
343+
344+
backoffCancel()
345+
}
346+
347+
cs.configApplyRequestQueue[instanceID] = cs.configApplyRequestQueue[instanceID][indexOfConfigApplyRequest+1:]
348+
slog.DebugContext(ctx, "Removed config apply requests from queue", "queue", cs.configApplyRequestQueue[instanceID])
349+
350+
if len(cs.configApplyRequestQueue[instanceID]) > 0 {
351+
cs.subscribeChannel <- cs.configApplyRequestQueue[instanceID][len(cs.configApplyRequestQueue[instanceID])-1]
352+
}
353+
354+
return nil
355+
}
356+
274357
// Retry callback for sending a data plane health status to the Management Plane.
275358
func (cs *CommandService) dataPlaneHealthCallback(
276359
ctx context.Context,
@@ -333,7 +416,25 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
333416
return recvError
334417
}
335418

336-
cs.subscribeChannel <- request
419+
switch request.GetRequest().(type) {
420+
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
421+
cs.configApplyRequestQueueMutex.Lock()
422+
defer cs.configApplyRequestQueueMutex.Unlock()
423+
424+
instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
425+
cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request)
426+
if len(cs.configApplyRequestQueue[instanceID]) == 1 {
427+
cs.subscribeChannel <- request
428+
} else {
429+
slog.DebugContext(
430+
ctx,
431+
"Config apply request is already in progress, queuing new config apply request",
432+
"request", request,
433+
)
434+
}
435+
default:
436+
cs.subscribeChannel <- request
437+
}
337438

338439
return nil
339440
}

internal/command/command_service_test.go

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@ import (
1010
"context"
1111
"errors"
1212
"log/slog"
13+
"sync"
1314
"testing"
1415
"time"
1516

17+
"github.com/google/uuid"
18+
"google.golang.org/protobuf/types/known/timestamppb"
19+
1620
"github.com/nginx/agent/v3/internal/logger"
1721
"github.com/nginx/agent/v3/test/helpers"
1822
"github.com/nginx/agent/v3/test/stub"
@@ -37,9 +41,41 @@ func (*FakeSubscribeClient) Send(*mpi.DataPlaneResponse) error {
3741

3842
// nolint: nilnil
3943
func (*FakeSubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) {
44+
time.Sleep(1 * time.Second)
45+
4046
return nil, nil
4147
}
4248

49+
type FakeConfigApplySubscribeClient struct {
50+
grpc.ClientStream
51+
}
52+
53+
func (*FakeConfigApplySubscribeClient) Send(*mpi.DataPlaneResponse) error {
54+
return nil
55+
}
56+
57+
// nolint: nilnil
58+
func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) {
59+
protos.CreateManagementPlaneRequest()
60+
return &mpi.ManagementPlaneRequest{
61+
MessageMeta: &mpi.MessageMeta{
62+
MessageId: "1",
63+
CorrelationId: "123",
64+
Timestamp: timestamppb.Now(),
65+
},
66+
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
67+
ConfigApplyRequest: &mpi.ConfigApplyRequest{
68+
Overview: &mpi.FileOverview{
69+
ConfigVersion: &mpi.ConfigVersion{
70+
InstanceId: "12314",
71+
Version: "4215432",
72+
},
73+
},
74+
},
75+
},
76+
}, nil
77+
}
78+
4379
func TestCommandService_NewCommandService(t *testing.T) {
4480
ctx := context.Background()
4581
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
@@ -61,6 +97,46 @@ func TestCommandService_NewCommandService(t *testing.T) {
6197
)
6298
}
6399

100+
func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
101+
ctx := context.Background()
102+
fakeSubscribeClient := &FakeConfigApplySubscribeClient{}
103+
104+
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
105+
commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil)
106+
107+
subscribeChannel := make(chan *mpi.ManagementPlaneRequest)
108+
109+
commandService := NewCommandService(
110+
ctx,
111+
commandServiceClient,
112+
types.AgentConfig(),
113+
subscribeChannel,
114+
)
115+
116+
defer commandService.CancelSubscription(ctx)
117+
118+
var wg sync.WaitGroup
119+
120+
wg.Add(1)
121+
go func() {
122+
requestFromChannel := <-subscribeChannel
123+
assert.NotNil(t, requestFromChannel)
124+
wg.Done()
125+
}()
126+
127+
assert.Eventually(
128+
t,
129+
func() bool { return commandServiceClient.SubscribeCallCount() > 0 },
130+
2*time.Second,
131+
10*time.Millisecond,
132+
)
133+
134+
commandService.configApplyRequestQueueMutex.Lock()
135+
defer commandService.configApplyRequestQueueMutex.Unlock()
136+
assert.Len(t, commandService.configApplyRequestQueue, 1)
137+
wg.Wait()
138+
}
139+
64140
func TestCommandService_UpdateDataPlaneStatus(t *testing.T) {
65141
ctx := context.Background()
66142

@@ -193,3 +269,123 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) {
193269

194270
require.NoError(t, err)
195271
}
272+
273+
func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) {
274+
ctx := context.Background()
275+
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
276+
subscribeClient := &FakeSubscribeClient{}
277+
subscribeChannel := make(chan *mpi.ManagementPlaneRequest)
278+
279+
commandService := NewCommandService(
280+
ctx,
281+
commandServiceClient,
282+
types.AgentConfig(),
283+
subscribeChannel,
284+
)
285+
286+
defer commandService.CancelSubscription(ctx)
287+
288+
request1 := &mpi.ManagementPlaneRequest{
289+
MessageMeta: &mpi.MessageMeta{
290+
MessageId: "1",
291+
CorrelationId: "123",
292+
Timestamp: timestamppb.Now(),
293+
},
294+
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
295+
ConfigApplyRequest: &mpi.ConfigApplyRequest{
296+
Overview: &mpi.FileOverview{
297+
Files: []*mpi.File{},
298+
ConfigVersion: &mpi.ConfigVersion{
299+
InstanceId: "12314",
300+
Version: "4215432",
301+
},
302+
},
303+
},
304+
},
305+
}
306+
307+
request2 := &mpi.ManagementPlaneRequest{
308+
MessageMeta: &mpi.MessageMeta{
309+
MessageId: "2",
310+
CorrelationId: "1232",
311+
Timestamp: timestamppb.Now(),
312+
},
313+
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
314+
ConfigApplyRequest: &mpi.ConfigApplyRequest{
315+
Overview: &mpi.FileOverview{
316+
Files: []*mpi.File{},
317+
ConfigVersion: &mpi.ConfigVersion{
318+
InstanceId: "12314",
319+
Version: "4215432",
320+
},
321+
},
322+
},
323+
},
324+
}
325+
326+
request3 := &mpi.ManagementPlaneRequest{
327+
MessageMeta: &mpi.MessageMeta{
328+
MessageId: "3",
329+
CorrelationId: "1233",
330+
Timestamp: timestamppb.Now(),
331+
},
332+
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
333+
ConfigApplyRequest: &mpi.ConfigApplyRequest{
334+
Overview: &mpi.FileOverview{
335+
Files: []*mpi.File{},
336+
ConfigVersion: &mpi.ConfigVersion{
337+
InstanceId: "12314",
338+
Version: "4215432",
339+
},
340+
},
341+
},
342+
},
343+
}
344+
345+
commandService.configApplyRequestQueueMutex.Lock()
346+
commandService.configApplyRequestQueue = map[string][]*mpi.ManagementPlaneRequest{
347+
"12314": {
348+
request1,
349+
request2,
350+
request3,
351+
},
352+
}
353+
commandService.configApplyRequestQueueMutex.Unlock()
354+
355+
commandService.subscribeClientMutex.Lock()
356+
commandService.subscribeClient = subscribeClient
357+
commandService.subscribeClientMutex.Unlock()
358+
359+
var wg sync.WaitGroup
360+
361+
wg.Add(1)
362+
go func() {
363+
requestFromChannel := <-subscribeChannel
364+
assert.Equal(t, request3, requestFromChannel)
365+
wg.Done()
366+
}()
367+
368+
err := commandService.SendDataPlaneResponse(
369+
ctx,
370+
&mpi.DataPlaneResponse{
371+
MessageMeta: &mpi.MessageMeta{
372+
MessageId: uuid.NewString(),
373+
CorrelationId: "1232",
374+
Timestamp: timestamppb.Now(),
375+
},
376+
CommandResponse: &mpi.CommandResponse{
377+
Status: mpi.CommandResponse_COMMAND_STATUS_OK,
378+
Message: "Success",
379+
},
380+
InstanceId: "12314",
381+
},
382+
)
383+
384+
require.NoError(t, err)
385+
386+
commandService.configApplyRequestQueueMutex.Lock()
387+
defer commandService.configApplyRequestQueueMutex.Unlock()
388+
assert.Len(t, commandService.configApplyRequestQueue, 1)
389+
assert.Equal(t, request3, commandService.configApplyRequestQueue["12314"][0])
390+
wg.Wait()
391+
}

0 commit comments

Comments
 (0)