Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/mod v0.29.0
golang.org/x/sync v0.17.0
google.golang.org/protobuf v1.36.9
)

Expand Down Expand Up @@ -336,7 +337,6 @@ require (
golang.org/x/arch v0.20.0 // indirect
golang.org/x/exp v0.0.0-20250808145144-a408d31f581a // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/tools v0.37.0 // indirect
gonum.org/v1/gonum v0.16.0 // indirect
Expand Down
17 changes: 12 additions & 5 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,12 @@ func registerClientFlags(fs *flag.FlagSet) {
DefMaxFileSize,
"Max file size in bytes.",
)

fs.Int(
ClientGRPCMaxParallelFileOperationsKey,
DefMaxParallelFileOperations,
"Max number of files downloaded or uploaded in parallel",
)
}

func registerCommandFlags(fs *flag.FlagSet) {
Expand Down Expand Up @@ -1087,11 +1093,12 @@ func resolveClient() *Client {
Time: viperInstance.GetDuration(ClientKeepAliveTimeKey),
PermitWithoutStream: viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey),
},
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey),
},
Backoff: &BackOff{
InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey),
Expand Down
11 changes: 6 additions & 5 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,11 +1175,12 @@ func createConfig() *Config {
Time: 10 * time.Second,
PermitWithoutStream: false,
},
MaxMessageSize: 1048575,
MaxMessageReceiveSize: 1048575,
MaxMessageSendSize: 1048575,
MaxFileSize: 485753,
FileChunkSize: 48575,
MaxMessageSize: 1048575,
MaxMessageReceiveSize: 1048575,
MaxMessageSendSize: 1048575,
MaxFileSize: 485753,
FileChunkSize: 48575,
MaxParallelFileOperations: 10,
},
Backoff: &BackOff{
InitialInterval: 200 * time.Millisecond,
Expand Down
11 changes: 6 additions & 5 deletions internal/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ const (
DefAuxiliaryCommandTLServerNameKey = ""

// Client GRPC Settings
DefMaxMessageSize = 0 // 0 = unset
DefMaxMessageRecieveSize = 4194304 // default 4 MB
DefMaxMessageSendSize = 4194304 // default 4 MB
DefMaxFileSize uint32 = 1048576 // 1MB
DefFileChunkSize uint32 = 524288 // 0.5MB
DefMaxMessageSize = 0 // 0 = unset
DefMaxMessageRecieveSize = 4194304 // default 4 MB
DefMaxMessageSendSize = 4194304 // default 4 MB
DefMaxFileSize uint32 = 1048576 // 1MB
DefFileChunkSize uint32 = 524288 // 0.5MB
DefMaxParallelFileOperations = 5

// Client HTTP Settings
DefHTTPTimeout = 10 * time.Second
Expand Down
13 changes: 7 additions & 6 deletions internal/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ var (
ClientKeepAliveTimeKey = pre(GrpcKeepAlive) + "time"
ClientKeepAliveTimeoutKey = pre(GrpcKeepAlive) + "timeout"

ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations"

ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"
ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval"
Expand Down
1 change: 1 addition & 0 deletions internal/config/testdata/nginx-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ client:
max_message_send_size: 1048575
max_file_size: 485753
file_chunk_size: 48575
max_parallel_file_operations: 10
backoff:
initial_interval: 200ms
max_interval: 10s
Expand Down
12 changes: 7 additions & 5 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,17 @@ type (
Multiplier float64 `yaml:"multiplier" mapstructure:"multiplier"`
}

//nolint:lll // max line limit exceeded
GRPC struct {
KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"`
// if MaxMessageSize is size set then we use that value,
// otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
MaxParallelFileOperations int `yaml:"max_parallel_file_operations" mapstructure:"max_parallel_file_operations"`
}

KeepAlive struct {
Expand Down
70 changes: 45 additions & 25 deletions internal/file/file_manager_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strconv"
"sync"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/nginx/agent/v3/internal/model"
Expand Down Expand Up @@ -288,28 +289,36 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context,
}

func (fms *FileManagerService) ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error {
var updatingFilesError error
uploadFiles := configUploadRequest.GetOverview().GetFiles()
if len(uploadFiles) == 0 {
return nil
}

for _, file := range configUploadRequest.GetOverview().GetFiles() {
err := fms.fileServiceOperator.UpdateFile(
ctx,
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
file,
)
if err != nil {
slog.ErrorContext(
ctx,
"Failed to update file",
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
"file_name", file.GetFileMeta().GetName(),
"error", err,
errGroup, errGroupCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)

for _, file := range uploadFiles {
errGroup.Go(func() error {
err := fms.fileServiceOperator.UpdateFile(
errGroupCtx,
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
file,
)
if err != nil {
slog.ErrorContext(
errGroupCtx,
"Failed to update file",
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
"file_name", file.GetFileMeta().GetName(),
"error", err,
)
}

updatingFilesError = errors.Join(updatingFilesError, err)
}
return err
})
}

return updatingFilesError
return errGroup.Wait()
}

// DetermineFileActions compares two sets of files to determine the file action for each file. Returns a map of files
Expand Down Expand Up @@ -570,25 +579,36 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) (actionEr
}

func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Context) (updateError error) {
var downloadFiles []*model.FileCache
for _, fileAction := range fms.fileActions {
if fileAction.Action == model.Add || fileAction.Action == model.Update {
downloadFiles = append(downloadFiles, fileAction)
}
}

if len(downloadFiles) == 0 {
slog.DebugContext(ctx, "No updated files to download")
return nil
}

errGroup, errGroupCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)

for _, fileAction := range downloadFiles {
errGroup.Go(func() error {
tempFilePath := tempFilePath(fileAction.File.GetFileMeta().GetName())

slog.DebugContext(
ctx,
errGroupCtx,
"Downloading file to temp location",
"file", tempFilePath,
)

updateErr := fms.fileUpdate(ctx, fileAction.File, tempFilePath)
if updateErr != nil {
updateError = updateErr
break
}
}
return fms.fileUpdate(errGroupCtx, fileAction.File, tempFilePath)
})
}

return updateError
return errGroup.Wait()
}

func (fms *FileManagerService) moveOrDeleteFiles(ctx context.Context, actionError error) error {
Expand Down
12 changes: 7 additions & 5 deletions test/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ const (
commonRandomizationFactor = 0.1
commonMultiplier = 0.2

reloadMonitoringPeriod = 400 * time.Millisecond
maxParallelFileOperations = 5
reloadMonitoringPeriod = 400 * time.Millisecond
)

// Produces a populated Agent Config for testing usage.
Expand All @@ -49,10 +50,11 @@ func AgentConfig() *config.Config {
Time: clientTime,
PermitWithoutStream: clientPermitWithoutStream,
},
MaxMessageReceiveSize: 1,
MaxMessageSendSize: 1,
MaxFileSize: 1,
FileChunkSize: 1,
MaxMessageReceiveSize: 1,
MaxMessageSendSize: 1,
MaxFileSize: 1,
FileChunkSize: 1,
MaxParallelFileOperations: maxParallelFileOperations,
},
Backoff: &config.BackOff{
InitialInterval: commonInitialInterval,
Expand Down
Loading