Skip to content

Commit 5783936

Browse files
committed
PR feedback
1 parent a011222 commit 5783936

File tree

5 files changed

+91
-130
lines changed

5 files changed

+91
-130
lines changed

internal/command/command_plugin.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package command
88
import (
99
"context"
1010
"log/slog"
11+
"sync"
1112

1213
"google.golang.org/protobuf/types/known/timestamppb"
1314

@@ -30,18 +31,19 @@ type (
3031
UpdateDataPlaneStatus(ctx context.Context, resource *mpi.Resource) error
3132
UpdateDataPlaneHealth(ctx context.Context, instanceHealths []*mpi.InstanceHealth) error
3233
SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error
33-
CancelSubscription(ctx context.Context)
34+
Subscribe(ctx context.Context)
3435
IsConnected() bool
35-
StartSubscription()
3636
CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error)
3737
}
3838

3939
CommandPlugin struct {
4040
messagePipe bus.MessagePipeInterface
4141
config *config.Config
42+
subscribeCancel context.CancelFunc
4243
conn grpc.GrpcConnectionInterface
4344
commandService commandService
4445
subscribeChannel chan *mpi.ManagementPlaneRequest
46+
subscribeMutex sync.Mutex
4547
}
4648
)
4749

@@ -57,15 +59,22 @@ func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeIn
5759
slog.DebugContext(ctx, "Starting command plugin")
5860

5961
cp.messagePipe = messagePipe
60-
cp.commandService = NewCommandService(ctx, cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)
62+
cp.commandService = NewCommandService(cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)
6163

6264
go cp.monitorSubscribeChannel(ctx)
6365

6466
return nil
6567
}
6668

6769
func (cp *CommandPlugin) Close(ctx context.Context) error {
68-
cp.commandService.CancelSubscription(ctx)
70+
slog.InfoContext(ctx, "Canceling subscribe context")
71+
72+
cp.subscribeMutex.Lock()
73+
if cp.subscribeCancel != nil {
74+
cp.subscribeCancel()
75+
}
76+
cp.subscribeMutex.Unlock()
77+
6978
return cp.conn.Close(ctx)
7079
}
7180

@@ -104,12 +113,20 @@ func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Mes
104113
}
105114

106115
func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Resource) {
116+
var subscribeCtx context.Context
117+
107118
createConnectionResponse, err := cp.commandService.CreateConnection(ctx, resource)
108119
if err != nil {
109120
slog.ErrorContext(ctx, "Unable to create connection", "error", err)
110121
}
122+
111123
if createConnectionResponse != nil {
112-
cp.commandService.StartSubscription()
124+
cp.subscribeMutex.Lock()
125+
subscribeCtx, cp.subscribeCancel = context.WithCancel(ctx)
126+
cp.subscribeMutex.Unlock()
127+
128+
go cp.commandService.Subscribe(subscribeCtx)
129+
113130
cp.messagePipe.Process(ctx, &bus.Message{
114131
Topic: bus.ConnectionCreatedTopic,
115132
Data: createConnectionResponse,

internal/command/command_plugin_test.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,39 @@ func TestCommandPlugin_Init(t *testing.T) {
7070

7171
closeError := commandPlugin.Close(ctx)
7272
require.NoError(t, closeError)
73-
require.Equal(t, 1, fakeCommandService.CancelSubscriptionCallCount())
73+
}
74+
75+
func TestCommandPlugin_createConnection(t *testing.T) {
76+
ctx := context.Background()
77+
commandService := &commandfakes.FakeCommandService{}
78+
commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil)
79+
messagePipe := busfakes.NewFakeMessagePipe()
80+
81+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
82+
err := commandPlugin.Init(ctx, messagePipe)
83+
commandPlugin.commandService = commandService
84+
require.NoError(t, err)
85+
defer commandPlugin.Close(ctx)
86+
87+
commandPlugin.createConnection(ctx, &mpi.Resource{})
88+
89+
assert.Eventually(
90+
t,
91+
func() bool { return commandService.SubscribeCallCount() > 0 },
92+
2*time.Second,
93+
10*time.Millisecond,
94+
)
95+
96+
assert.Eventually(
97+
t,
98+
func() bool { return len(messagePipe.GetMessages()) == 1 },
99+
2*time.Second,
100+
10*time.Millisecond,
101+
)
102+
103+
messages := messagePipe.GetMessages()
104+
assert.Len(t, messages, 1)
105+
assert.Equal(t, bus.ConnectionCreatedTopic, messages[0].Topic)
74106
}
75107

76108
func TestCommandPlugin_Process(t *testing.T) {

internal/command/command_service.go

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,16 @@ type (
4141
subscribeClient mpi.CommandService_SubscribeClient
4242
agentConfig *config.Config
4343
isConnected *atomic.Bool
44-
subscribeCancel context.CancelFunc
4544
subscribeChannel chan *mpi.ManagementPlaneRequest
4645
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
4746
resource *mpi.Resource
48-
subscribeContext context.Context
49-
subscribeMutex sync.Mutex
5047
subscribeClientMutex sync.Mutex
5148
configApplyRequestQueueMutex sync.Mutex
5249
resourceMutex sync.Mutex
5350
}
5451
)
5552

5653
func NewCommandService(
57-
ctx context.Context,
5854
commandServiceClient mpi.CommandServiceClient,
5955
agentConfig *config.Config,
6056
subscribeChannel chan *mpi.ManagementPlaneRequest,
@@ -71,18 +67,9 @@ func NewCommandService(
7167
resource: &mpi.Resource{},
7268
}
7369

74-
commandService.subscribeMutex.Lock()
75-
commandService.subscribeContext, commandService.subscribeCancel = context.WithCancel(ctx)
76-
commandService.subscribeMutex.Unlock()
77-
7870
return commandService
7971
}
8072

81-
func (cs *CommandService) StartSubscription() {
82-
slog.DebugContext(cs.subscribeContext, "Starting subscribe")
83-
go cs.subscribe(cs.subscribeContext)
84-
}
85-
8673
func (cs *CommandService) IsConnected() bool {
8774
return cs.isConnected.Load()
8875
}
@@ -192,17 +179,7 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m
192179
)
193180
}
194181

195-
func (cs *CommandService) CancelSubscription(ctx context.Context) {
196-
slog.InfoContext(ctx, "Canceling subscribe context")
197-
198-
cs.subscribeMutex.Lock()
199-
if cs.subscribeCancel != nil {
200-
cs.subscribeCancel()
201-
}
202-
cs.subscribeMutex.Unlock()
203-
}
204-
205-
func (cs *CommandService) subscribe(ctx context.Context) {
182+
func (cs *CommandService) Subscribe(ctx context.Context) {
206183
commonSettings := &config.BackOff{
207184
InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval,
208185
MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval,
@@ -411,6 +388,7 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
411388
var err error
412389
cs.subscribeClient, err = cs.commandServiceClient.Subscribe(ctx)
413390
if err != nil {
391+
slog.Info("Error: ", "", err)
414392
subscribeErr := cs.handleSubscribeError(ctx, err, "create subscribe client")
415393
cs.subscribeClientMutex.Unlock()
416394

internal/command/command_service_test.go

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -77,53 +77,29 @@ func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, erro
7777
}, nil
7878
}
7979

80-
func TestCommandService_StartSubscribe(t *testing.T) {
81-
ctx := context.Background()
82-
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
83-
84-
commandService := NewCommandService(
85-
ctx,
86-
commandServiceClient,
87-
types.AgentConfig(),
88-
make(chan *mpi.ManagementPlaneRequest),
89-
)
90-
91-
defer commandService.CancelSubscription(ctx)
92-
93-
commandService.StartSubscription()
94-
95-
assert.Eventually(
96-
t,
97-
func() bool { return commandServiceClient.SubscribeCallCount() > 0 },
98-
2*time.Second,
99-
10*time.Millisecond,
100-
)
101-
}
102-
10380
func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
104-
ctx := context.Background()
10581
fakeSubscribeClient := &FakeConfigApplySubscribeClient{}
82+
ctx := context.Background()
83+
subscribeCtx, subscribeCancel := context.WithCancel(ctx)
10684

10785
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
10886
commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil)
10987

11088
subscribeChannel := make(chan *mpi.ManagementPlaneRequest)
11189

11290
commandService := NewCommandService(
113-
ctx,
11491
commandServiceClient,
11592
types.AgentConfig(),
11693
subscribeChannel,
11794
)
118-
commandService.StartSubscription()
95+
go commandService.Subscribe(subscribeCtx)
96+
defer subscribeCancel()
11997

12098
nginxInstance := protos.GetNginxOssInstance([]string{})
12199
commandService.resourceMutex.Lock()
122100
commandService.resource.Instances = append(commandService.resource.Instances, nginxInstance)
123101
commandService.resourceMutex.Unlock()
124102

125-
defer commandService.CancelSubscription(ctx)
126-
127103
var wg sync.WaitGroup
128104

129105
wg.Add(1)
@@ -155,13 +131,10 @@ func TestCommandService_UpdateDataPlaneStatus(t *testing.T) {
155131
commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil)
156132

157133
commandService := NewCommandService(
158-
ctx,
159134
commandServiceClient,
160135
types.AgentConfig(),
161136
make(chan *mpi.ManagementPlaneRequest),
162137
)
163-
defer commandService.CancelSubscription(ctx)
164-
165138
// Fail first time since there are no other instances besides the agent
166139
err := commandService.UpdateDataPlaneStatus(ctx, protos.GetHostResource())
167140
require.Error(t, err)
@@ -194,12 +167,10 @@ func TestCommandService_UpdateDataPlaneStatusSubscribeError(t *testing.T) {
194167
stub.StubLoggerWith(logBuf)
195168

196169
commandService := NewCommandService(
197-
ctx,
198170
commandServiceClient,
199171
types.AgentConfig(),
200172
make(chan *mpi.ManagementPlaneRequest),
201173
)
202-
defer commandService.CancelSubscription(ctx)
203174

204175
commandService.isConnected.Store(true)
205176

@@ -216,7 +187,6 @@ func TestCommandService_CreateConnection(t *testing.T) {
216187
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
217188

218189
commandService := NewCommandService(
219-
ctx,
220190
commandServiceClient,
221191
types.AgentConfig(),
222192
make(chan *mpi.ManagementPlaneRequest),
@@ -233,7 +203,6 @@ func TestCommandService_UpdateDataPlaneHealth(t *testing.T) {
233203
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
234204

235205
commandService := NewCommandService(
236-
ctx,
237206
commandServiceClient,
238207
types.AgentConfig(),
239208
make(chan *mpi.ManagementPlaneRequest),
@@ -264,7 +233,6 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) {
264233
subscribeClient := &FakeSubscribeClient{}
265234

266235
commandService := NewCommandService(
267-
ctx,
268236
commandServiceClient,
269237
types.AgentConfig(),
270238
make(chan *mpi.ManagementPlaneRequest),
@@ -286,14 +254,11 @@ func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) {
286254
subscribeChannel := make(chan *mpi.ManagementPlaneRequest)
287255

288256
commandService := NewCommandService(
289-
ctx,
290257
commandServiceClient,
291258
types.AgentConfig(),
292259
subscribeChannel,
293260
)
294261

295-
defer commandService.CancelSubscription(ctx)
296-
297262
request1 := &mpi.ManagementPlaneRequest{
298263
MessageMeta: &mpi.MessageMeta{
299264
MessageId: "1",
@@ -405,7 +370,6 @@ func TestCommandService_isValidRequest(t *testing.T) {
405370
subscribeClient := &FakeSubscribeClient{}
406371

407372
commandService := NewCommandService(
408-
ctx,
409373
commandServiceClient,
410374
types.AgentConfig(),
411375
make(chan *mpi.ManagementPlaneRequest),

0 commit comments

Comments
 (0)