Skip to content

Commit b7a5c70

Browse files
committed
merging main to resolve conflicts
2 parents d31b2df + b1947ec commit b7a5c70

File tree

9 files changed

+107
-67
lines changed

9 files changed

+107
-67
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ require (
8181
go.uber.org/multierr v1.11.0
8282
go.uber.org/zap v1.27.0
8383
golang.org/x/mod v0.29.0
84+
golang.org/x/sync v0.17.0
8485
google.golang.org/protobuf v1.36.10
8586
)
8687

@@ -352,7 +353,6 @@ require (
352353
golang.org/x/arch v0.20.0 // indirect
353354
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b // indirect
354355
golang.org/x/oauth2 v0.32.0 // indirect
355-
golang.org/x/sync v0.17.0 // indirect
356356
golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 // indirect
357357
golang.org/x/time v0.14.0 // indirect
358358
golang.org/x/tools v0.38.0 // indirect

internal/config/config.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,12 @@ func registerClientFlags(fs *flag.FlagSet) {
653653
DefClientFileDownloadTimeout,
654654
"Timeout value in seconds, to downloading file for config apply.",
655655
)
656+
657+
fs.Int(
658+
ClientGRPCMaxParallelFileOperationsKey,
659+
DefMaxParallelFileOperations,
660+
"Maximum number of file downloads or uploads performed in parallel",
661+
)
656662
}
657663

658664
func registerCommandFlags(fs *flag.FlagSet) {
@@ -1125,11 +1131,12 @@ func resolveClient() *Client {
11251131
Time: viperInstance.GetDuration(ClientKeepAliveTimeKey),
11261132
PermitWithoutStream: viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey),
11271133
},
1128-
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
1129-
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
1130-
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
1131-
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
1132-
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
1134+
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
1135+
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
1136+
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
1137+
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
1138+
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
1139+
MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey),
11331140
},
11341141
Backoff: &BackOff{
11351142
InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey),

internal/config/config_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,11 +1178,12 @@ func createConfig() *Config {
11781178
Time: 10 * time.Second,
11791179
PermitWithoutStream: false,
11801180
},
1181-
MaxMessageSize: 1048575,
1182-
MaxMessageReceiveSize: 1048575,
1183-
MaxMessageSendSize: 1048575,
1184-
MaxFileSize: 485753,
1185-
FileChunkSize: 48575,
1181+
MaxMessageSize: 1048575,
1182+
MaxMessageReceiveSize: 1048575,
1183+
MaxMessageSendSize: 1048575,
1184+
MaxFileSize: 485753,
1185+
FileChunkSize: 48575,
1186+
MaxParallelFileOperations: 10,
11861187
},
11871188
Backoff: &BackOff{
11881189
InitialInterval: 200 * time.Millisecond,

internal/config/defaults.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ const (
6060
DefAuxiliaryCommandTLServerNameKey = ""
6161

6262
// Client GRPC Settings
63-
DefMaxMessageSize = 0 // 0 = unset
64-
DefMaxMessageRecieveSize = 4194304 // default 4 MB
65-
DefMaxMessageSendSize = 4194304 // default 4 MB
66-
DefMaxFileSize uint32 = 1048576 // 1MB
67-
DefFileChunkSize uint32 = 524288 // 0.5MB
63+
DefMaxMessageSize = 0 // 0 = unset
64+
DefMaxMessageRecieveSize = 4194304 // default 4 MB
65+
DefMaxMessageSendSize = 4194304 // default 4 MB
66+
DefMaxFileSize uint32 = 1048576 // 1MB
67+
DefFileChunkSize uint32 = 524288 // 0.5MB
68+
DefMaxParallelFileOperations = 5
6869

6970
// Client HTTP Settings
7071
DefHTTPTimeout = 10 * time.Second

internal/config/flags.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ var (
3535
ClientKeepAliveTimeKey = pre(GrpcKeepAlive) + "time"
3636
ClientKeepAliveTimeoutKey = pre(GrpcKeepAlive) + "timeout"
3737

38-
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
39-
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
40-
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
41-
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
42-
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
43-
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
38+
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
39+
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
40+
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
41+
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
42+
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
43+
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
44+
ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations"
4445

4546
ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"
4647
ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval"

internal/config/testdata/nginx-agent.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ client:
5454
max_message_send_size: 1048575
5555
max_file_size: 485753
5656
file_chunk_size: 48575
57+
max_parallel_file_operations: 10
5758
backoff:
5859
initial_interval: 200ms
5960
max_interval: 10s

internal/config/types.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,17 @@ type (
9393
Multiplier float64 `yaml:"multiplier" mapstructure:"multiplier"`
9494
}
9595

96+
//nolint:lll // max line limit exceeded
9697
GRPC struct {
9798
KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"`
9899
// if MaxMessageSize is size set then we use that value,
99100
// otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings
100-
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
101-
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
102-
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
103-
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
104-
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
101+
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
102+
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
103+
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
104+
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
105+
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
106+
MaxParallelFileOperations int `yaml:"max_parallel_file_operations" mapstructure:"max_parallel_file_operations"`
105107
}
106108

107109
KeepAlive struct {

internal/file/file_manager_service.go

Lines changed: 60 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"strings"
2222
"sync"
2323

24+
"golang.org/x/sync/errgroup"
2425
"google.golang.org/grpc"
2526

2627
"github.com/nginx/agent/v3/internal/model"
@@ -300,28 +301,36 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context,
300301
}
301302

302303
func (fms *FileManagerService) ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error {
303-
var updatingFilesError error
304+
uploadFiles := configUploadRequest.GetOverview().GetFiles()
305+
if len(uploadFiles) == 0 {
306+
return nil
307+
}
304308

305-
for _, file := range configUploadRequest.GetOverview().GetFiles() {
306-
err := fms.fileServiceOperator.UpdateFile(
307-
ctx,
308-
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
309-
file,
310-
)
311-
if err != nil {
312-
slog.ErrorContext(
313-
ctx,
314-
"Failed to update file",
315-
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
316-
"file_name", file.GetFileMeta().GetName(),
317-
"error", err,
309+
errGroup, errGroupCtx := errgroup.WithContext(ctx)
310+
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)
311+
312+
for _, file := range uploadFiles {
313+
errGroup.Go(func() error {
314+
err := fms.fileServiceOperator.UpdateFile(
315+
errGroupCtx,
316+
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
317+
file,
318318
)
319+
if err != nil {
320+
slog.ErrorContext(
321+
errGroupCtx,
322+
"Failed to update file",
323+
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
324+
"file_name", file.GetFileMeta().GetName(),
325+
"error", err,
326+
)
327+
}
319328

320-
updatingFilesError = errors.Join(updatingFilesError, err)
321-
}
329+
return err
330+
})
322331
}
323332

324-
return updatingFilesError
333+
return errGroup.Wait()
325334
}
326335

327336
// DetermineFileActions compares two sets of files to determine the file action for each file. Returns a map of files
@@ -585,29 +594,45 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) (actionEr
585594
}
586595

587596
func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Context) (updateError error) {
597+
var downloadFiles []*model.FileCache
588598
for _, fileAction := range fms.fileActions {
589-
tempFilePath := tempFilePath(fileAction.File.GetFileMeta().GetName())
590-
591-
switch fileAction.Action {
592-
case model.ExternalFile:
593-
updateError = fms.handleExternalFileDownload(ctx, fileAction, tempFilePath)
594-
case model.Add, model.Update:
595-
slog.DebugContext(
596-
ctx,
597-
"Downloading file to temp location",
598-
"file", tempFilePath,
599-
)
600-
updateError = fms.fileUpdate(ctx, fileAction.File, tempFilePath)
601-
case model.Delete, model.Unchanged:
602-
continue
599+
if fileAction.Action == model.ExternalFile || fileAction.Action == model.Add ||
600+
fileAction.Action == model.Update {
601+
downloadFiles = append(downloadFiles, fileAction)
603602
}
603+
}
604604

605-
if updateError != nil {
606-
return updateError
607-
}
605+
if len(downloadFiles) == 0 {
606+
slog.DebugContext(ctx, "No updated files to download")
607+
return nil
608608
}
609+
errGroup, errGroupCtx := errgroup.WithContext(ctx)
610+
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)
609611

610-
return nil
612+
for _, fileAction := range downloadFiles {
613+
errGroup.Go(func() error {
614+
tempFilePath := tempFilePath(fileAction.File.GetFileMeta().GetName())
615+
616+
switch fileAction.Action {
617+
case model.ExternalFile:
618+
return fms.handleExternalFileDownload(errGroupCtx, fileAction, tempFilePath)
619+
case model.Add, model.Update:
620+
slog.DebugContext(
621+
errGroupCtx,
622+
"Downloading file to temp location",
623+
"file", tempFilePath,
624+
)
625+
626+
return fms.fileUpdate(errGroupCtx, fileAction.File, tempFilePath)
627+
case model.Delete, model.Unchanged: // had to add for linter
628+
return nil
629+
default:
630+
return nil
631+
}
632+
})
633+
}
634+
635+
return errGroup.Wait()
611636
}
612637

613638
func (fms *FileManagerService) moveOrDeleteFiles(ctx context.Context, actionError error) error {

test/types/config.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ const (
2929
commonRandomizationFactor = 0.1
3030
commonMultiplier = 0.2
3131

32-
reloadMonitoringPeriod = 400 * time.Millisecond
32+
maxParallelFileOperations = 5
33+
reloadMonitoringPeriod = 400 * time.Millisecond
3334
)
3435

3536
// Produces a populated Agent Config for testing usage.
@@ -52,10 +53,11 @@ func AgentConfig() *config.Config {
5253
Time: clientTime,
5354
PermitWithoutStream: clientPermitWithoutStream,
5455
},
55-
MaxMessageReceiveSize: 1,
56-
MaxMessageSendSize: 1,
57-
MaxFileSize: 1,
58-
FileChunkSize: 1,
56+
MaxMessageReceiveSize: 1,
57+
MaxMessageSendSize: 1,
58+
MaxFileSize: 1,
59+
FileChunkSize: 1,
60+
MaxParallelFileOperations: maxParallelFileOperations,
5961
},
6062
Backoff: &config.BackOff{
6163
InitialInterval: commonInitialInterval,

0 commit comments

Comments
 (0)