Skip to content

Commit 2f4ce40

Browse files
authored
Start subscribe after successful create connection request (#969)
1 parent 8eea4b8 commit 2f4ce40

File tree

5 files changed

+101
-104
lines changed

5 files changed

+101
-104
lines changed

internal/command/command_plugin.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package command
88
import (
99
"context"
1010
"log/slog"
11+
"sync"
1112

1213
"google.golang.org/protobuf/types/known/timestamppb"
1314

@@ -30,17 +31,19 @@ type (
3031
UpdateDataPlaneStatus(ctx context.Context, resource *mpi.Resource) error
3132
UpdateDataPlaneHealth(ctx context.Context, instanceHealths []*mpi.InstanceHealth) error
3233
SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error
33-
CancelSubscription(ctx context.Context)
34+
Subscribe(ctx context.Context)
3435
IsConnected() bool
3536
CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error)
3637
}
3738

3839
CommandPlugin struct {
3940
messagePipe bus.MessagePipeInterface
4041
config *config.Config
42+
subscribeCancel context.CancelFunc
4143
conn grpc.GrpcConnectionInterface
4244
commandService commandService
4345
subscribeChannel chan *mpi.ManagementPlaneRequest
46+
subscribeMutex sync.Mutex
4447
}
4548
)
4649

@@ -56,15 +59,22 @@ func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeIn
5659
slog.DebugContext(ctx, "Starting command plugin")
5760

5861
cp.messagePipe = messagePipe
59-
cp.commandService = NewCommandService(ctx, cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)
62+
cp.commandService = NewCommandService(cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)
6063

6164
go cp.monitorSubscribeChannel(ctx)
6265

6366
return nil
6467
}
6568

6669
func (cp *CommandPlugin) Close(ctx context.Context) error {
67-
cp.commandService.CancelSubscription(ctx)
70+
slog.InfoContext(ctx, "Canceling subscribe context")
71+
72+
cp.subscribeMutex.Lock()
73+
if cp.subscribeCancel != nil {
74+
cp.subscribeCancel()
75+
}
76+
cp.subscribeMutex.Unlock()
77+
6878
return cp.conn.Close(ctx)
6979
}
7080

@@ -103,11 +113,20 @@ func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Mes
103113
}
104114

105115
func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Resource) {
116+
var subscribeCtx context.Context
117+
106118
createConnectionResponse, err := cp.commandService.CreateConnection(ctx, resource)
107119
if err != nil {
108120
slog.ErrorContext(ctx, "Unable to create connection", "error", err)
109121
}
122+
110123
if createConnectionResponse != nil {
124+
cp.subscribeMutex.Lock()
125+
subscribeCtx, cp.subscribeCancel = context.WithCancel(ctx)
126+
cp.subscribeMutex.Unlock()
127+
128+
go cp.commandService.Subscribe(subscribeCtx)
129+
111130
cp.messagePipe.Process(ctx, &bus.Message{
112131
Topic: bus.ConnectionCreatedTopic,
113132
Data: createConnectionResponse,

internal/command/command_plugin_test.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,39 @@ func TestCommandPlugin_Init(t *testing.T) {
7070

7171
closeError := commandPlugin.Close(ctx)
7272
require.NoError(t, closeError)
73-
require.Equal(t, 1, fakeCommandService.CancelSubscriptionCallCount())
73+
}
74+
75+
func TestCommandPlugin_createConnection(t *testing.T) {
76+
ctx := context.Background()
77+
commandService := &commandfakes.FakeCommandService{}
78+
commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil)
79+
messagePipe := busfakes.NewFakeMessagePipe()
80+
81+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
82+
err := commandPlugin.Init(ctx, messagePipe)
83+
commandPlugin.commandService = commandService
84+
require.NoError(t, err)
85+
defer commandPlugin.Close(ctx)
86+
87+
commandPlugin.createConnection(ctx, &mpi.Resource{})
88+
89+
assert.Eventually(
90+
t,
91+
func() bool { return commandService.SubscribeCallCount() > 0 },
92+
2*time.Second,
93+
10*time.Millisecond,
94+
)
95+
96+
assert.Eventually(
97+
t,
98+
func() bool { return len(messagePipe.GetMessages()) == 1 },
99+
2*time.Second,
100+
10*time.Millisecond,
101+
)
102+
103+
messages := messagePipe.GetMessages()
104+
assert.Len(t, messages, 1)
105+
assert.Equal(t, bus.ConnectionCreatedTopic, messages[0].Topic)
74106
}
75107

76108
func TestCommandPlugin_Process(t *testing.T) {
@@ -307,12 +339,12 @@ func TestCommandPlugin_FeatureDisabled(t *testing.T) {
307339

308340
func TestMonitorSubscribeChannel(t *testing.T) {
309341
ctx, cncl := context.WithCancel(context.Background())
310-
defer cncl()
311342

312343
logBuf := &bytes.Buffer{}
313344
stub.StubLoggerWith(logBuf)
314345

315346
cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
347+
cp.subscribeCancel = cncl
316348

317349
message := protos.CreateManagementPlaneRequest()
318350

@@ -327,7 +359,7 @@ func TestMonitorSubscribeChannel(t *testing.T) {
327359
// Give some time to process the message
328360
time.Sleep(100 * time.Millisecond)
329361

330-
cncl()
362+
cp.Close(ctx)
331363

332364
time.Sleep(100 * time.Millisecond)
333365

internal/command/command_service.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,16 @@ type (
4141
subscribeClient mpi.CommandService_SubscribeClient
4242
agentConfig *config.Config
4343
isConnected *atomic.Bool
44-
subscribeCancel context.CancelFunc
4544
subscribeChannel chan *mpi.ManagementPlaneRequest
4645
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
4746
resource *mpi.Resource
48-
subscribeMutex sync.Mutex
4947
subscribeClientMutex sync.Mutex
5048
configApplyRequestQueueMutex sync.Mutex
5149
resourceMutex sync.Mutex
5250
}
5351
)
5452

5553
func NewCommandService(
56-
ctx context.Context,
5754
commandServiceClient mpi.CommandServiceClient,
5855
agentConfig *config.Config,
5956
subscribeChannel chan *mpi.ManagementPlaneRequest,
@@ -70,14 +67,6 @@ func NewCommandService(
7067
resource: &mpi.Resource{},
7168
}
7269

73-
var subscribeCtx context.Context
74-
75-
commandService.subscribeMutex.Lock()
76-
subscribeCtx, commandService.subscribeCancel = context.WithCancel(ctx)
77-
commandService.subscribeMutex.Unlock()
78-
79-
go commandService.subscribe(subscribeCtx)
80-
8170
return commandService
8271
}
8372

@@ -190,17 +179,7 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m
190179
)
191180
}
192181

193-
func (cs *CommandService) CancelSubscription(ctx context.Context) {
194-
slog.InfoContext(ctx, "Canceling subscribe context")
195-
196-
cs.subscribeMutex.Lock()
197-
if cs.subscribeCancel != nil {
198-
cs.subscribeCancel()
199-
}
200-
cs.subscribeMutex.Unlock()
201-
}
202-
203-
func (cs *CommandService) subscribe(ctx context.Context) {
182+
func (cs *CommandService) Subscribe(ctx context.Context) {
204183
commonSettings := &config.BackOff{
205184
InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval,
206185
MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval,

internal/command/command_service_test.go

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -77,50 +77,29 @@ func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, erro
7777
}, nil
7878
}
7979

80-
func TestCommandService_NewCommandService(t *testing.T) {
81-
ctx := context.Background()
82-
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
83-
84-
commandService := NewCommandService(
85-
ctx,
86-
commandServiceClient,
87-
types.AgentConfig(),
88-
make(chan *mpi.ManagementPlaneRequest),
89-
)
90-
91-
defer commandService.CancelSubscription(ctx)
92-
93-
assert.Eventually(
94-
t,
95-
func() bool { return commandServiceClient.SubscribeCallCount() > 0 },
96-
2*time.Second,
97-
10*time.Millisecond,
98-
)
99-
}
100-
10180
func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
102-
ctx := context.Background()
10381
fakeSubscribeClient := &FakeConfigApplySubscribeClient{}
82+
ctx := context.Background()
83+
subscribeCtx, subscribeCancel := context.WithCancel(ctx)
10484

10585
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
10686
commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil)
10787

10888
subscribeChannel := make(chan *mpi.ManagementPlaneRequest)
10989

11090
commandService := NewCommandService(
111-
ctx,
11291
commandServiceClient,
11392
types.AgentConfig(),
11493
subscribeChannel,
11594
)
95+
go commandService.Subscribe(subscribeCtx)
96+
defer subscribeCancel()
11697

11798
nginxInstance := protos.GetNginxOssInstance([]string{})
11899
commandService.resourceMutex.Lock()
119100
commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance)
120101
commandService.resourceMutex.Unlock()
121102

122-
defer commandService.CancelSubscription(ctx)
123-
124103
var wg sync.WaitGroup
125104

126105
wg.Add(1)
@@ -152,13 +131,10 @@ func TestCommandService_UpdateDataPlaneStatus(t *testing.T) {
152131
commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil)
153132

154133
commandService := NewCommandService(
155-
ctx,
156134
commandServiceClient,
157135
types.AgentConfig(),
158136
make(chan *mpi.ManagementPlaneRequest),
159137
)
160-
defer commandService.CancelSubscription(ctx)
161-
162138
// Fail first time since there are no other instances besides the agent
163139
err := commandService.UpdateDataPlaneStatus(ctx, protos.GetHostResource())
164140
require.Error(t, err)
@@ -191,12 +167,10 @@ func TestCommandService_UpdateDataPlaneStatusSubscribeError(t *testing.T) {
191167
stub.StubLoggerWith(logBuf)
192168

193169
commandService := NewCommandService(
194-
ctx,
195170
commandServiceClient,
196171
types.AgentConfig(),
197172
make(chan *mpi.ManagementPlaneRequest),
198173
)
199-
defer commandService.CancelSubscription(ctx)
200174

201175
commandService.isConnected.Store(true)
202176

@@ -213,7 +187,6 @@ func TestCommandService_CreateConnection(t *testing.T) {
213187
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
214188

215189
commandService := NewCommandService(
216-
ctx,
217190
commandServiceClient,
218191
types.AgentConfig(),
219192
make(chan *mpi.ManagementPlaneRequest),
@@ -230,7 +203,6 @@ func TestCommandService_UpdateDataPlaneHealth(t *testing.T) {
230203
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
231204

232205
commandService := NewCommandService(
233-
ctx,
234206
commandServiceClient,
235207
types.AgentConfig(),
236208
make(chan *mpi.ManagementPlaneRequest),
@@ -261,7 +233,6 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) {
261233
subscribeClient := &FakeSubscribeClient{}
262234

263235
commandService := NewCommandService(
264-
ctx,
265236
commandServiceClient,
266237
types.AgentConfig(),
267238
make(chan *mpi.ManagementPlaneRequest),
@@ -283,14 +254,11 @@ func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) {
283254
subscribeChannel := make(chan *mpi.ManagementPlaneRequest)
284255

285256
commandService := NewCommandService(
286-
ctx,
287257
commandServiceClient,
288258
types.AgentConfig(),
289259
subscribeChannel,
290260
)
291261

292-
defer commandService.CancelSubscription(ctx)
293-
294262
request1 := &mpi.ManagementPlaneRequest{
295263
MessageMeta: &mpi.MessageMeta{
296264
MessageId: "1",
@@ -402,7 +370,6 @@ func TestCommandService_isValidRequest(t *testing.T) {
402370
subscribeClient := &FakeSubscribeClient{}
403371

404372
commandService := NewCommandService(
405-
ctx,
406373
commandServiceClient,
407374
types.AgentConfig(),
408375
make(chan *mpi.ManagementPlaneRequest),

0 commit comments

Comments
 (0)