Skip to content

Commit 54cd0d5

Browse files
committed
retry request if no response is sent
1 parent 61f1fc8 commit 54cd0d5

File tree

12 files changed

+85
-23
lines changed

12 files changed

+85
-23
lines changed

internal/command/command_service.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,16 @@ func (cs *CommandService) UpdateDataPlaneStatus(
104104
cs.subscribeClientMutex.Unlock()
105105
return nil, errors.New("command service client is not initialized")
106106
}
107-
response, updateError := cs.commandServiceClient.UpdateDataPlaneStatus(ctx, request)
107+
108+
grpcCtx, cancel := context.WithTimeout(ctx, cs.agentConfig.Client.Grpc.ResponseTimeout)
109+
defer cancel()
110+
111+
response, updateError := cs.commandServiceClient.UpdateDataPlaneStatus(grpcCtx, request)
108112
cs.subscribeClientMutex.Unlock()
109113

110114
validatedError := grpc.ValidateGrpcError(updateError)
111115
if validatedError != nil {
112-
slog.ErrorContext(ctx, "Failed to send update data plane status", "error", validatedError)
116+
slog.ErrorContext(grpcCtx, "Failed to send update data plane status", "error", validatedError)
113117

114118
return nil, validatedError
115119
}
@@ -384,13 +388,16 @@ func (cs *CommandService) dataPlaneHealthCallback(
384388
return nil, errors.New("command service client is not initialized")
385389
}
386390

387-
response, updateError := cs.commandServiceClient.UpdateDataPlaneHealth(ctx, request)
391+
grpcCtx, cancel := context.WithTimeout(ctx, cs.agentConfig.Client.Grpc.ResponseTimeout)
392+
defer cancel()
393+
394+
response, updateError := cs.commandServiceClient.UpdateDataPlaneHealth(grpcCtx, request)
388395
cs.subscribeClientMutex.Unlock()
389396

390397
validatedError := grpc.ValidateGrpcError(updateError)
391398

392399
if validatedError != nil {
393-
slog.ErrorContext(ctx, "Failed to send update data plane health", "error", validatedError)
400+
slog.ErrorContext(grpcCtx, "Failed to send update data plane health", "error", validatedError)
394401

395402
return nil, validatedError
396403
}
@@ -558,13 +565,16 @@ func (cs *CommandService) connectCallback(
558565
request *mpi.CreateConnectionRequest,
559566
) func() (*mpi.CreateConnectionResponse, error) {
560567
return func() (*mpi.CreateConnectionResponse, error) {
568+
grpcCtx, cancel := context.WithTimeout(ctx, cs.agentConfig.Client.Grpc.ResponseTimeout)
569+
defer cancel()
570+
561571
cs.subscribeClientMutex.Lock()
562-
response, connectErr := cs.commandServiceClient.CreateConnection(ctx, request)
572+
response, connectErr := cs.commandServiceClient.CreateConnection(grpcCtx, request)
563573
cs.subscribeClientMutex.Unlock()
564574

565575
validatedError := grpc.ValidateGrpcError(connectErr)
566576
if validatedError != nil {
567-
slog.ErrorContext(ctx, "Failed to create connection", "error", validatedError)
577+
slog.ErrorContext(grpcCtx, "Failed to create connection", "error", validatedError)
568578

569579
return nil, validatedError
570580
}

internal/config/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,12 @@ func registerClientFlags(fs *flag.FlagSet) {
621621
DefMaxFileSize,
622622
"Max file size in bytes.",
623623
)
624+
625+
fs.Duration(
626+
ClientGRPCResponseTimeoutKey,
627+
DefResponseTimeout,
628+
"Duration to wait for a response before retrying request",
629+
)
624630
}
625631

626632
func registerCommandFlags(fs *flag.FlagSet) {
@@ -1091,6 +1097,7 @@ func resolveClient() *Client {
10911097
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
10921098
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
10931099
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
1100+
ResponseTimeout: viperInstance.GetDuration(ClientGRPCResponseTimeoutKey),
10941101
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
10951102
},
10961103
Backoff: &BackOff{

internal/config/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,7 @@ func createConfig() *Config {
11801180
MaxMessageSendSize: 1048575,
11811181
MaxFileSize: 485753,
11821182
FileChunkSize: 48575,
1183+
ResponseTimeout: 30 * time.Second,
11831184
},
11841185
Backoff: &BackOff{
11851186
InitialInterval: 200 * time.Millisecond,

internal/config/defaults.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ const (
6363
DefMaxMessageSendSize = 4194304 // default 4 MB
6464
DefMaxFileSize uint32 = 1048576 // 1MB
6565
DefFileChunkSize uint32 = 524288 // 0.5MB
66+
DefResponseTimeout = 10 * time.Second
6667

6768
// Client HTTP Settings
6869
DefHTTPTimeout = 10 * time.Second

internal/config/flags.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ var (
4040
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
4141
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
4242
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
43+
ClientGRPCResponseTimeoutKey = pre(ClientRootKey) + "grpc_response_timeout"
4344

4445
ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"
4546
ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval"

internal/config/testdata/nginx-agent.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ client:
4949
max_message_receive_size: 1048575
5050
max_message_send_size: 1048575
5151
max_file_size: 485753
52+
response_timeout: 30s
5253
file_chunk_size: 48575
5354
backoff:
5455
initial_interval: 200ms

internal/config/types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ type (
8888
}
8989

9090
GRPC struct {
91-
KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"`
91+
KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"`
92+
ResponseTimeout time.Duration `yaml:"response_timeout" mapstructure:"response_timeout"`
9293
// if MaxMessageSize is size set then we use that value,
9394
// otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings
9495
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`

internal/file/file_service_operator.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ func (fso *FileServiceOperator) File(
7979
defer backoffCancel()
8080

8181
getFile := func() (*mpi.GetFileResponse, error) {
82-
return fso.fileServiceClient.GetFile(ctx, &mpi.GetFileRequest{
82+
grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.Grpc.ResponseTimeout)
83+
defer cancel()
84+
85+
return fso.fileServiceClient.GetFile(grpcCtx, &mpi.GetFileRequest{
8386
MessageMeta: &mpi.MessageMeta{
8487
MessageId: id.GenerateMessageID(),
8588
CorrelationId: logger.CorrelationID(ctx),
@@ -182,12 +185,15 @@ func (fso *FileServiceOperator) UpdateOverview(
182185
"request", request, "parent_correlation_id", correlationID,
183186
)
184187

185-
response, updateError := fso.fileServiceClient.UpdateOverview(newCtx, request)
188+
grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.Grpc.ResponseTimeout)
189+
defer cancel()
190+
191+
response, updateError := fso.fileServiceClient.UpdateOverview(grpcCtx, request)
186192

187193
validatedError := internalgrpc.ValidateGrpcError(updateError)
188194

189195
if validatedError != nil {
190-
slog.ErrorContext(newCtx, "Failed to send update overview", "error", validatedError)
196+
slog.ErrorContext(grpcCtx, "Failed to send update overview", "error", validatedError)
191197

192198
return nil, validatedError
193199
}
@@ -225,7 +231,10 @@ func (fso *FileServiceOperator) ChunkedFile(
225231
) error {
226232
slog.DebugContext(ctx, "Getting chunked file", "file", file.GetFileMeta().GetName())
227233

228-
stream, err := fso.fileServiceClient.GetFileStream(ctx, &mpi.GetFileRequest{
234+
grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.Grpc.ResponseTimeout)
235+
defer cancel()
236+
237+
stream, err := fso.fileServiceClient.GetFileStream(grpcCtx, &mpi.GetFileRequest{
229238
MessageMeta: &mpi.MessageMeta{
230239
MessageId: id.GenerateMessageID(),
231240
CorrelationId: logger.CorrelationID(ctx),
@@ -371,12 +380,15 @@ func (fso *FileServiceOperator) sendUpdateFileRequest(
371380
return nil, errors.New("CreateConnection rpc has not being called yet")
372381
}
373382

374-
response, updateError := fso.fileServiceClient.UpdateFile(ctx, request)
383+
grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.Grpc.ResponseTimeout)
384+
defer cancel()
385+
386+
response, updateError := fso.fileServiceClient.UpdateFile(grpcCtx, request)
375387

376388
validatedError := internalgrpc.ValidateGrpcError(updateError)
377389

378390
if validatedError != nil {
379-
slog.ErrorContext(ctx, "Failed to send update file", "error", validatedError)
391+
slog.ErrorContext(grpcCtx, "Failed to send update file", "error", validatedError)
380392

381393
return nil, validatedError
382394
}
@@ -406,7 +418,10 @@ func (fso *FileServiceOperator) sendUpdateFileStream(
406418
return errors.New("file chunk size must be greater than zero")
407419
}
408420

409-
updateFileStreamClient, err := fso.fileServiceClient.UpdateFileStream(ctx)
421+
grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.Grpc.ResponseTimeout)
422+
defer cancel()
423+
424+
updateFileStreamClient, err := fso.fileServiceClient.UpdateFileStream(grpcCtx)
410425
if err != nil {
411426
return err
412427
}

test/config/agent/nginx-agent-with-auxiliary-command.conf

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ auxiliary_command:
1919
port: 9095
2020
type: grpc
2121

22-
22+
client:
23+
grpc:
24+
response_timeout: 2s
25+
2326
allowed_directories:
2427
- /etc/nginx
2528
- /usr/local/etc/nginx

test/config/agent/nginx-config-with-grpc-client.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ command:
1313
port: 9092
1414
type: grpc
1515

16+
client:
17+
grpc:
18+
response_timeout: 2s
19+
1620
allowed_directories:
1721
- /etc/nginx
1822
- /usr/local/etc/nginx

0 commit comments

Comments
 (0)