Skip to content

Commit c541aa4

Browse files
committed
send create connection
1 parent 73551f5 commit c541aa4

File tree

8 files changed

+80
-23
lines changed

8 files changed

+80
-23
lines changed

internal/command/command_plugin.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type (
3131
UpdateDataPlaneStatus(ctx context.Context, resource *mpi.Resource) error
3232
UpdateDataPlaneHealth(ctx context.Context, instanceHealths []*mpi.InstanceHealth) error
3333
SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error
34-
UpdateClient(client mpi.CommandServiceClient)
34+
UpdateClient(ctx context.Context, client mpi.CommandServiceClient) error
3535
Subscribe(ctx context.Context)
3636
IsConnected() bool
3737
CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error)
@@ -68,7 +68,7 @@ func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeIn
6868
}
6969

7070
func (cp *CommandPlugin) Close(ctx context.Context) error {
71-
slog.InfoContext(ctx, "Canceling subscribe context")
71+
slog.InfoContext(ctx, "Closing command plugin")
7272

7373
cp.subscribeMutex.Lock()
7474
if cp.subscribeCancel != nil {
@@ -178,12 +178,16 @@ func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.
178178
func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Message) {
179179
slog.DebugContext(ctx, "Command plugin received connection reset")
180180
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
181-
err := cp.conn.Close(ctx)
182-
if err != nil {
183-
slog.ErrorContext(ctx, "Command plugin: unable to close connection", "error", err)
181+
connectionErr := cp.conn.Close(ctx)
182+
if connectionErr != nil {
183+
slog.ErrorContext(ctx, "Command plugin: unable to close connection", "error", connectionErr)
184184
}
185185
cp.conn = newConnection
186-
cp.commandService.UpdateClient(cp.conn.CommandServiceClient())
186+
err := cp.commandService.UpdateClient(ctx, cp.conn.CommandServiceClient())
187+
if err != nil {
188+
slog.ErrorContext(ctx, "Failed to reset connection", "error", err)
189+
return
190+
}
187191
slog.DebugContext(ctx, "Command service client reset successfully")
188192
}
189193
}

internal/command/command_service.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,19 @@ func (cs *CommandService) CreateConnection(
254254
return response, nil
255255
}
256256

257-
func (cs *CommandService) UpdateClient(client mpi.CommandServiceClient) {
257+
func (cs *CommandService) UpdateClient(ctx context.Context, client mpi.CommandServiceClient) error {
258258
cs.subscribeClientMutex.Lock()
259-
defer cs.subscribeClientMutex.Unlock()
260259
cs.commandServiceClient = client
260+
cs.subscribeClientMutex.Unlock()
261+
262+
cs.isConnected.Store(false)
263+
resp, err := cs.CreateConnection(ctx, cs.resource)
264+
if err != nil {
265+
return err
266+
}
267+
slog.InfoContext(ctx, "Successfully sent create connection request", "response", resp)
268+
269+
return nil
261270
}
262271

263272
// Retry callback for sending a data plane response to the Management Plane.
@@ -551,6 +560,11 @@ func (cs *CommandService) connectCallback(
551560

552561
validatedError := grpc.ValidateGrpcError(connectErr)
553562
if validatedError != nil {
563+
codeError, ok := status.FromError(validatedError)
564+
if ok && codeError.Message() == "grpc: the client connection is closing" {
565+
return nil, backoff.Permanent(fmt.Errorf("failed to create connection: %w, "+
566+
"stopping retry", validatedError))
567+
}
554568
slog.ErrorContext(ctx, "Failed to create connection", "error", validatedError)
555569

556570
return nil, validatedError

internal/command/command_service_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,15 @@ func TestCommandService_CreateConnection(t *testing.T) {
200200

201201
func TestCommandService_UpdateClient(t *testing.T) {
202202
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
203+
ctx := context.Background()
203204

204205
commandService := NewCommandService(
205206
commandServiceClient,
206207
types.AgentConfig(),
207208
make(chan *mpi.ManagementPlaneRequest),
208209
)
209-
commandService.UpdateClient(commandServiceClient)
210+
err := commandService.UpdateClient(ctx, commandServiceClient)
211+
require.NoError(t, err)
210212
assert.NotNil(t, commandService.commandServiceClient)
211213
}
212214

internal/command/commandfakes/fake_command_service.go

Lines changed: 47 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/file/file_plugin.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func (fp *FilePlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInter
5151
}
5252

5353
func (fp *FilePlugin) Close(ctx context.Context) error {
54+
slog.InfoContext(ctx, "Closing file plugin")
5455
return fp.conn.Close(ctx)
5556
}
5657

internal/grpc/grpc.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,11 @@ func (gc *GrpcConnection) FileServiceClient() mpi.FileServiceClient {
115115
}
116116

117117
func (gc *GrpcConnection) Close(ctx context.Context) error {
118-
slog.InfoContext(ctx, "Closing grpc connection")
119-
120118
gc.mutex.Lock()
121119
defer gc.mutex.Unlock()
122120

123121
if gc.conn != nil {
122+
slog.InfoContext(ctx, "Closing grpc connection")
124123
err := gc.conn.Close()
125124
gc.conn = nil
126125
if err != nil {

internal/resource/resource_plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (r *Resource) Init(ctx context.Context, messagePipe bus.MessagePipeInterfac
6060
}
6161

6262
func (*Resource) Close(ctx context.Context) error {
63-
slog.DebugContext(ctx, "Closing resource plugin")
63+
slog.InfoContext(ctx, "Closing resource plugin")
6464
return nil
6565
}
6666

internal/watcher/watcher_plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (w *Watcher) Init(ctx context.Context, messagePipe bus.MessagePipeInterface
112112
// nolint: unparam
113113
// error is always nil
114114
func (w *Watcher) Close(ctx context.Context) error {
115-
slog.DebugContext(ctx, "Closing watcher plugin")
115+
slog.InfoContext(ctx, "Closing watcher plugin")
116116

117117
w.cancel()
118118

0 commit comments

Comments
 (0)