Skip to content

Commit 87e6c56

Browse files
committed
handle command and file service client updates after grpc reset
1 parent 0ebaf6b commit 87e6c56

File tree

3 files changed

+39
-3
lines changed

3 files changed

+39
-3
lines changed

internal/command/command_plugin.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type (
3131
UpdateDataPlaneStatus(ctx context.Context, resource *mpi.Resource) error
3232
UpdateDataPlaneHealth(ctx context.Context, instanceHealths []*mpi.InstanceHealth) error
3333
SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error
34+
UpdateClient(client mpi.CommandServiceClient)
35+
Resource() *mpi.Resource
3436
Subscribe(ctx context.Context)
3537
IsConnected() bool
3638
CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error)
@@ -177,8 +179,19 @@ func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.
177179
func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Message) {
178180
slog.DebugContext(ctx, "Command plugin received connection reset")
179181
if newConnection, ok := msg.Data.(*grpc.GrpcConnection); ok {
180-
_ = cp.conn.Close(ctx)
182+
if !cp.commandService.IsConnected() {
183+
slog.DebugContext(ctx, "Command plugin: service is not connected")
184+
return
185+
}
186+
err := cp.conn.Close(ctx)
187+
if err != nil {
188+
slog.ErrorContext(ctx, "Unable to close connection", "error", err)
189+
}
181190
cp.conn = newConnection
191+
cp.commandService.UpdateClient(cp.conn.CommandServiceClient())
192+
//go cp.monitorSubscribeChannel(ctx)
193+
//cp.createConnection(ctx, cp.commandService.Resource())
194+
slog.DebugContext(ctx, "Command plugin: client reset successfully")
182195
}
183196
}
184197

internal/command/command_service.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type (
4141
subscribeClient mpi.CommandService_SubscribeClient
4242
agentConfig *config.Config
4343
isConnected *atomic.Bool
44+
clientUpdated *atomic.Bool
4445
subscribeChannel chan *mpi.ManagementPlaneRequest
4546
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
4647
resource *mpi.Resource
@@ -58,10 +59,14 @@ func NewCommandService(
5859
isConnected := &atomic.Bool{}
5960
isConnected.Store(false)
6061

62+
clientUpdated := &atomic.Bool{}
63+
clientUpdated.Store(false)
64+
6165
commandService := &CommandService{
6266
commandServiceClient: commandServiceClient,
6367
agentConfig: agentConfig,
6468
isConnected: isConnected,
69+
clientUpdated: clientUpdated,
6570
subscribeChannel: subscribeChannel,
6671
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
6772
resource: &mpi.Resource{},
@@ -210,6 +215,10 @@ func (cs *CommandService) CreateConnection(
210215
slog.InfoContext(ctx, "No Data Plane Instance found")
211216
}
212217

218+
if cs.isConnected.Load() {
219+
return nil, errors.New("command service already connected")
220+
}
221+
213222
request := &mpi.CreateConnectionRequest{
214223
MessageMeta: &mpi.MessageMeta{
215224
MessageId: id.GenerateMessageID(),
@@ -228,7 +237,6 @@ func (cs *CommandService) CreateConnection(
228237
}
229238

230239
slog.DebugContext(ctx, "Sending create connection request", "request", request)
231-
232240
response, err := backoff.RetryWithData(
233241
cs.connectCallback(ctx, request),
234242
backoffHelpers.Context(ctx, commonSettings),
@@ -249,6 +257,17 @@ func (cs *CommandService) CreateConnection(
249257
return response, nil
250258
}
251259

260+
func (cs *CommandService) UpdateClient(client mpi.CommandServiceClient) {
261+
cs.commandServiceClient = client
262+
cs.clientUpdated.Store(true)
263+
}
264+
265+
func (cs *CommandService) Resource() *mpi.Resource {
266+
cs.resourceMutex.Lock()
267+
defer cs.resourceMutex.Unlock()
268+
return cs.resource
269+
}
270+
252271
// Retry callback for sending a data plane response to the Management Plane.
253272
func (cs *CommandService) sendDataPlaneResponseCallback(
254273
ctx context.Context,

internal/file/file_plugin.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,12 @@ func (fp *FilePlugin) Subscriptions() []string {
9797
func (fp *FilePlugin) handleConnectionReset(ctx context.Context, msg *bus.Message) {
9898
slog.DebugContext(ctx, "File plugin received connection reset message")
9999
if newConnection, ok := msg.Data.(*grpc.GrpcConnection); ok {
100-
_ = fp.conn.Close(ctx)
100+
err := fp.conn.Close(ctx)
101+
if err != nil {
102+
slog.ErrorContext(ctx, "Error closing file plugin", "error", err)
103+
}
101104
fp.conn = newConnection
105+
fp.fileManagerService = NewFileManagerService(fp.conn.FileServiceClient(), fp.config)
102106
}
103107
}
104108

0 commit comments

Comments
 (0)