Skip to content

Commit d06517c

Browse files
committed
Add file chunk size config parameter. Add unit tests.
1 parent 8f758b2 commit d06517c

File tree

12 files changed

+411
-56
lines changed

12 files changed

+411
-56
lines changed

internal/config/config.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,13 +287,19 @@ func registerClientFlags(fs *flag.FlagSet) {
287287
fs.Int(
288288
ClientGRPCMaxMessageReceiveSizeKey,
289289
DefMaxMessageRecieveSize,
290-
"Updates the client grpc setting MaxRecvMsgSize with the specific value in MB.",
290+
"Updates the client grpc setting MaxRecvMsgSize with the specific value in bytes.",
291291
)
292292

293293
fs.Int(
294294
ClientGRPCMaxMessageSendSizeKey,
295295
DefMaxMessageSendSize,
296-
"Updates the client grpc setting MaxSendMsgSize with the specific value in MB.",
296+
"Updates the client grpc setting MaxSendMsgSize with the specific value in bytes.",
297+
)
298+
299+
fs.Uint32(
300+
ClientGRPCFileChunkSizeKey,
301+
DefFileChunkSize,
302+
"File chunk size in bytes.",
297303
)
298304
}
299305

@@ -611,6 +617,7 @@ func resolveClient() *Client {
611617
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
612618
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
613619
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
620+
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
614621
},
615622
Backoff: &BackOff{
616623
InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey),

internal/config/config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func checkDefaultsClientValues(t *testing.T, viperInstance *viper.Viper) {
100100
assert.Equal(t, DefMaxMessageSize, viperInstance.GetInt(ClientGRPCMaxMessageSizeKey))
101101
assert.Equal(t, DefMaxMessageRecieveSize, viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey))
102102
assert.Equal(t, DefMaxMessageSendSize, viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey))
103+
assert.Equal(t, DefFileChunkSize, viperInstance.GetUint32(ClientGRPCFileChunkSizeKey))
103104
assert.Equal(t, make(map[string]string), viperInstance.GetStringMapString(LabelsRootKey))
104105
}
105106

@@ -746,6 +747,7 @@ func createConfig() *Config {
746747
MaxMessageSize: 1048575,
747748
MaxMessageReceiveSize: 1048575,
748749
MaxMessageSendSize: 1048575,
750+
FileChunkSize: 48575,
749751
},
750752
Backoff: &BackOff{
751753
InitialInterval: 200 * time.Millisecond,

internal/config/defaults.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ const (
2727
DefCommandTLServerNameKey = ""
2828

2929
// Client GRPC Settings
30-
DefMaxMessageSize = 0 // 0 = unset
31-
DefMaxMessageRecieveSize = 4194304 // default 4 MB
32-
DefMaxMessageSendSize = 4194304 // default 4 MB
30+
DefMaxMessageSize = 0 // 0 = unset
31+
DefMaxMessageRecieveSize = 4194304 // default 4 MB
32+
DefMaxMessageSendSize = 4194304 // default 4 MB
33+
DefFileChunkSize uint32 = 2097152 // 2MB
3334

3435
// Client HTTP Settings
3536
DefHTTPTimeout = 10 * time.Second

internal/config/flags.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ var (
3636
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
3737
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
3838
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
39+
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
3940

4041
ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"
4142
ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval"

internal/config/testdata/nginx-agent.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ client:
3535
max_message_size: 1048575
3636
max_message_receive_size: 1048575
3737
max_message_send_size: 1048575
38+
file_chunk_size: 48575
3839
backoff:
3940
initial_interval: 200ms
4041
max_interval: 10s

internal/config/types.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,11 @@ type (
8080
}
8181

8282
GRPC struct {
83-
KeepAlive *KeepAlive `yaml:"-" mapstructure:"target"`
84-
// if MaxMessageSize is size set then we use that value,
85-
// otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings
86-
MaxMessageSize int `yaml:"-" mapstructure:"max_message_size"`
87-
MaxMessageReceiveSize int `yaml:"-" mapstructure:"max_message_receive_size"`
88-
MaxMessageSendSize int `yaml:"-" mapstructure:"max_message_send_size"`
83+
KeepAlive *KeepAlive `yaml:"-" mapstructure:"target"`
84+
MaxMessageSize int `yaml:"-" mapstructure:"max_message_size"`
85+
MaxMessageReceiveSize int `yaml:"-" mapstructure:"max_message_receive_size"`
86+
MaxMessageSendSize int `yaml:"-" mapstructure:"max_message_send_size"`
87+
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
8988
}
9089

9190
KeepAlive struct {

internal/file/file_manager_service.go

Lines changed: 82 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,7 @@ func (fms *FileManagerService) UpdateFile(
241241
return fms.sendUpdateFileRequest(ctx, fileToUpdate)
242242
}
243243

244-
// TODO: Add config parameter for chunk size
245-
var chunkSize uint32 = 2097152 // 2MB
246-
return fms.sendUpdateFileStream(ctx, fileToUpdate, chunkSize)
244+
return fms.sendUpdateFileStream(ctx, fileToUpdate, fms.agentConfig.Client.Grpc.FileChunkSize)
247245
}
248246

249247
func (fms *FileManagerService) sendUpdateFileRequest(
@@ -314,6 +312,10 @@ func (fms *FileManagerService) sendUpdateFileStream(
314312
fileToUpdate *mpi.File,
315313
chunkSize uint32,
316314
) error {
315+
if chunkSize == 0 {
316+
return fmt.Errorf("file chunk size must be greater than zero")
317+
}
318+
317319
updateFileStreamClient, err := fms.fileServiceClient.UpdateFileStream(ctx)
318320
if err != nil {
319321
return err
@@ -441,10 +443,12 @@ func (fms *FileManagerService) readChunk(
441443
return mpi.FileDataChunk_Content{}, fmt.Errorf("failed to read chunk: %w", err)
442444
}
443445

446+
slog.DebugContext(ctx, "No more data to read from file")
447+
444448
return mpi.FileDataChunk_Content{}, nil
445449
}
446450

447-
slog.DebugContext(ctx, "Read chunk", "chunkID", chunkID, "chunk_size", len(buf))
451+
slog.DebugContext(ctx, "Read file chunk", "chunk_id", chunkID, "chunk_size", len(buf))
448452

449453
chunk := mpi.FileDataChunk_Content{
450454
Content: &mpi.FileDataChunkContent{
@@ -628,38 +632,51 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) error {
628632
}
629633

630634
func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) error {
635+
slog.DebugContext(ctx, "Updating file", "file", file.GetFileMeta().GetName())
631636
if file.GetFileMeta().GetSize() <= int64(fms.agentConfig.Client.Grpc.MaxMessageReceiveSize) {
632-
backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime)
633-
defer backoffCancel()
634-
635-
getFile := func() (*mpi.GetFileResponse, error) {
636-
return fms.fileServiceClient.GetFile(ctx, &mpi.GetFileRequest{
637-
MessageMeta: &mpi.MessageMeta{
638-
MessageId: id.GenerateMessageID(),
639-
CorrelationId: logger.GetCorrelationID(ctx),
640-
Timestamp: timestamppb.Now(),
641-
},
642-
FileMeta: file.GetFileMeta(),
643-
})
644-
}
637+
return fms.getFile(ctx, file)
638+
}
645639

646-
getFileResp, getFileErr := backoff.RetryWithData(
647-
getFile,
648-
backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff),
649-
)
640+
return fms.getChunkedFile(ctx, file)
641+
}
650642

651-
if getFileErr != nil {
652-
return fmt.Errorf("error getting file data for %s: %w", file.GetFileMeta(), getFileErr)
653-
}
643+
func (fms *FileManagerService) getFile(ctx context.Context, file *mpi.File) error {
644+
slog.DebugContext(ctx, "Getting file", "file", file.GetFileMeta().GetName())
654645

655-
if writeErr := fms.fileOperator.Write(ctx, getFileResp.GetContents().GetContents(),
656-
file.GetFileMeta()); writeErr != nil {
657-
return writeErr
658-
}
646+
backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime)
647+
defer backoffCancel()
648+
649+
getFile := func() (*mpi.GetFileResponse, error) {
650+
return fms.fileServiceClient.GetFile(ctx, &mpi.GetFileRequest{
651+
MessageMeta: &mpi.MessageMeta{
652+
MessageId: id.GenerateMessageID(),
653+
CorrelationId: logger.GetCorrelationID(ctx),
654+
Timestamp: timestamppb.Now(),
655+
},
656+
FileMeta: file.GetFileMeta(),
657+
})
658+
}
659+
660+
getFileResp, getFileErr := backoff.RetryWithData(
661+
getFile,
662+
backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff),
663+
)
664+
665+
if getFileErr != nil {
666+
return fmt.Errorf("error getting file data for %s: %w", file.GetFileMeta(), getFileErr)
667+
}
659668

660-
return fms.validateFileHash(file.GetFileMeta().GetName())
669+
if writeErr := fms.fileOperator.Write(ctx, getFileResp.GetContents().GetContents(),
670+
file.GetFileMeta()); writeErr != nil {
671+
return writeErr
661672
}
662673

674+
return fms.validateFileHash(file.GetFileMeta().GetName())
675+
}
676+
677+
func (fms *FileManagerService) getChunkedFile(ctx context.Context, file *mpi.File) error {
678+
slog.DebugContext(ctx, "Getting chunked file", "file", file.GetFileMeta().GetName())
679+
663680
stream, err := fms.fileServiceClient.GetFileStream(ctx, &mpi.GetFileRequest{
664681
MessageMeta: &mpi.MessageMeta{
665682
MessageId: id.GenerateMessageID(),
@@ -673,28 +690,52 @@ func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) e
673690
}
674691

675692
// Get header chunk first
676-
chunk := &mpi.FileDataChunk{}
677-
recvError := stream.RecvMsg(chunk)
678-
if recvError != nil {
679-
return recvError
693+
headerChunk, recvHeaderChunkError := stream.Recv()
694+
if recvHeaderChunkError != nil {
695+
return recvHeaderChunkError
696+
}
697+
698+
slog.DebugContext(ctx, "File header chunk received", "header_chunk", headerChunk)
699+
700+
header := headerChunk.GetHeader()
701+
702+
writeChunkedFileError := fms.writeChunkedFile(ctx, file, header, stream)
703+
if writeChunkedFileError != nil {
704+
return writeChunkedFileError
680705
}
681706

682-
header := chunk.GetHeader()
707+
return nil
708+
}
683709

684-
filePermissions := files.FileMode(header.GetFileMeta().GetPermissions())
685-
createFileDirectoriesError := fms.fileOperator.CreateFileDirectories(ctx, header.GetFileMeta(), filePermissions)
710+
func (fms *FileManagerService) writeChunkedFile(
711+
ctx context.Context,
712+
file *mpi.File,
713+
header *mpi.FileDataChunkHeader,
714+
stream grpc2.ServerStreamingClient[mpi.FileDataChunk],
715+
) error {
716+
filePermissions := files.FileMode(file.GetFileMeta().GetPermissions())
717+
createFileDirectoriesError := fms.fileOperator.CreateFileDirectories(ctx, file.GetFileMeta(), filePermissions)
686718
if createFileDirectoriesError != nil {
687719
return createFileDirectoriesError
688720
}
689721

690-
fileToWrite, openError := os.OpenFile(header.GetFileMeta().GetName(), os.O_WRONLY, filePermissions)
691-
if openError != nil {
692-
return openError
722+
fileToWrite, createError := os.Create(file.GetFileMeta().GetName())
723+
defer func() {
724+
closeError := fileToWrite.Close()
725+
if closeError != nil {
726+
slog.WarnContext(
727+
ctx, "Failed to close file",
728+
"file", file.GetFileMeta().GetName(),
729+
"error", closeError,
730+
)
731+
}
732+
}()
733+
if createError != nil {
734+
return createError
693735
}
694736

695737
for i := uint32(0); i < header.GetChunks(); i++ {
696-
chunk := &mpi.FileDataChunk{}
697-
recvError := stream.RecvMsg(chunk)
738+
chunk, recvError := stream.Recv()
698739
if recvError != nil {
699740
return recvError
700741
}

0 commit comments

Comments
 (0)