Skip to content

Commit 428f705

Browse files
committed
download and upload files in parallel
1 parent c5193ee commit 428f705

File tree

7 files changed

+84
-51
lines changed

7 files changed

+84
-51
lines changed

internal/config/config.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,12 @@ func registerClientFlags(fs *flag.FlagSet) {
621621
DefMaxFileSize,
622622
"Max file size in bytes.",
623623
)
624+
625+
fs.Int(
626+
ClientGRPCMaxParallelFileOperationsKey,
627+
DefMaxParallelFileOperations,
628+
"Max number of files downloaded or uploaded in parallel",
629+
)
624630
}
625631

626632
func registerCommandFlags(fs *flag.FlagSet) {
@@ -1087,11 +1093,12 @@ func resolveClient() *Client {
10871093
Time: viperInstance.GetDuration(ClientKeepAliveTimeKey),
10881094
PermitWithoutStream: viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey),
10891095
},
1090-
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
1091-
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
1092-
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
1093-
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
1094-
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
1096+
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
1097+
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
1098+
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
1099+
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
1100+
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
1101+
MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey),
10951102
},
10961103
Backoff: &BackOff{
10971104
InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey),

internal/config/config_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,11 +1175,12 @@ func createConfig() *Config {
11751175
Time: 10 * time.Second,
11761176
PermitWithoutStream: false,
11771177
},
1178-
MaxMessageSize: 1048575,
1179-
MaxMessageReceiveSize: 1048575,
1180-
MaxMessageSendSize: 1048575,
1181-
MaxFileSize: 485753,
1182-
FileChunkSize: 48575,
1178+
MaxMessageSize: 1048575,
1179+
MaxMessageReceiveSize: 1048575,
1180+
MaxMessageSendSize: 1048575,
1181+
MaxFileSize: 485753,
1182+
FileChunkSize: 48575,
1183+
MaxParallelFileOperations: 10,
11831184
},
11841185
Backoff: &BackOff{
11851186
InitialInterval: 200 * time.Millisecond,

internal/config/defaults.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,12 @@ const (
5858
DefAuxiliaryCommandTLServerNameKey = ""
5959

6060
// Client GRPC Settings
61-
DefMaxMessageSize = 0 // 0 = unset
62-
DefMaxMessageRecieveSize = 4194304 // default 4 MB
63-
DefMaxMessageSendSize = 4194304 // default 4 MB
64-
DefMaxFileSize uint32 = 1048576 // 1MB
65-
DefFileChunkSize uint32 = 524288 // 0.5MB
61+
DefMaxMessageSize = 0 // 0 = unset
62+
DefMaxMessageRecieveSize = 4194304 // default 4 MB
63+
DefMaxMessageSendSize = 4194304 // default 4 MB
64+
DefMaxFileSize uint32 = 1048576 // 1MB
65+
DefFileChunkSize uint32 = 524288 // 0.5MB
66+
DefMaxParallelFileOperations = 5
6667

6768
// Client HTTP Settings
6869
DefHTTPTimeout = 10 * time.Second

internal/config/flags.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ var (
3434
ClientKeepAliveTimeKey = pre(GrpcKeepAlive) + "time"
3535
ClientKeepAliveTimeoutKey = pre(GrpcKeepAlive) + "timeout"
3636

37-
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
38-
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
39-
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
40-
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
41-
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
42-
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
37+
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
38+
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
39+
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
40+
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
41+
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
42+
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
43+
ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations"
4344

4445
ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"
4546
ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval"

internal/config/testdata/nginx-agent.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ client:
5050
max_message_send_size: 1048575
5151
max_file_size: 485753
5252
file_chunk_size: 48575
53+
max_parallel_file_operations: 10
5354
backoff:
5455
initial_interval: 200ms
5556
max_interval: 10s

internal/config/types.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,17 @@ type (
8787
Multiplier float64 `yaml:"multiplier" mapstructure:"multiplier"`
8888
}
8989

90+
//nolint:lll // max line limit exceeded
9091
GRPC struct {
9192
KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"`
9293
// if MaxMessageSize is size set then we use that value,
9394
// otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings
94-
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
95-
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
96-
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
97-
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
98-
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
95+
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
96+
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
97+
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
98+
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
99+
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
100+
MaxParallelFileOperations int `yaml:"max_parallel_file_operations" mapstructure:"max_parallel_file_operations"`
99101
}
100102

101103
KeepAlive struct {

internal/file/file_manager_service.go

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"strconv"
1818
"sync"
1919

20+
"golang.org/x/sync/errgroup"
2021
"google.golang.org/grpc"
2122

2223
"github.com/nginx/agent/v3/internal/model"
@@ -288,28 +289,36 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context,
288289
}
289290

290291
func (fms *FileManagerService) ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error {
291-
var updatingFilesError error
292+
uploadFiles := configUploadRequest.GetOverview().GetFiles()
293+
if len(uploadFiles) == 0 {
294+
return nil
295+
}
292296

293-
for _, file := range configUploadRequest.GetOverview().GetFiles() {
294-
err := fms.fileServiceOperator.UpdateFile(
295-
ctx,
296-
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
297-
file,
298-
)
299-
if err != nil {
300-
slog.ErrorContext(
301-
ctx,
302-
"Failed to update file",
303-
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
304-
"file_name", file.GetFileMeta().GetName(),
305-
"error", err,
297+
errGroup, errGroupCtx := errgroup.WithContext(ctx)
298+
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)
299+
300+
for _, file := range uploadFiles {
301+
errGroup.Go(func() error {
302+
err := fms.fileServiceOperator.UpdateFile(
303+
errGroupCtx,
304+
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
305+
file,
306306
)
307+
if err != nil {
308+
slog.ErrorContext(
309+
errGroupCtx,
310+
"Failed to update file",
311+
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
312+
"file_name", file.GetFileMeta().GetName(),
313+
"error", err,
314+
)
315+
}
307316

308-
updatingFilesError = errors.Join(updatingFilesError, err)
309-
}
317+
return err
318+
})
310319
}
311320

312-
return updatingFilesError
321+
return errGroup.Wait()
313322
}
314323

315324
// DetermineFileActions compares two sets of files to determine the file action for each file. Returns a map of files
@@ -570,25 +579,36 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) (actionEr
570579
}
571580

572581
func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Context) (updateError error) {
582+
var downloadFiles []*model.FileCache
573583
for _, fileAction := range fms.fileActions {
574584
if fileAction.Action == model.Add || fileAction.Action == model.Update {
585+
downloadFiles = append(downloadFiles, fileAction)
586+
}
587+
}
588+
589+
if len(downloadFiles) == 0 {
590+
slog.DebugContext(ctx, "No updated files to download")
591+
return nil
592+
}
593+
594+
errGroup, errGroupCtx := errgroup.WithContext(ctx)
595+
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)
596+
597+
for _, fileAction := range downloadFiles {
598+
errGroup.Go(func() error {
575599
tempFilePath := tempFilePath(fileAction.File.GetFileMeta().GetName())
576600

577601
slog.DebugContext(
578-
ctx,
602+
errGroupCtx,
579603
"Downloading file to temp location",
580604
"file", tempFilePath,
581605
)
582606

583-
updateErr := fms.fileUpdate(ctx, fileAction.File, tempFilePath)
584-
if updateErr != nil {
585-
updateError = updateErr
586-
break
587-
}
588-
}
607+
return fms.fileUpdate(errGroupCtx, fileAction.File, tempFilePath)
608+
})
589609
}
590610

591-
return updateError
611+
return errGroup.Wait()
592612
}
593613

594614
func (fms *FileManagerService) moveOrDeleteFiles(ctx context.Context, actionError error) error {

0 commit comments

Comments
 (0)