Skip to content

Commit bc8b1c2

Browse files
committed
Perform atomic writes during a config apply
1 parent 78306bb commit bc8b1c2

File tree

6 files changed

+168
-89
lines changed

6 files changed

+168
-89
lines changed

internal/file/file_manager_service.go

Lines changed: 80 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"fmt"
1414
"log/slog"
1515
"os"
16+
"path/filepath"
1617
"sync"
1718

1819
"google.golang.org/grpc"
@@ -38,11 +39,11 @@ const (
3839

3940
type (
4041
fileOperator interface {
41-
Write(ctx context.Context, fileContent []byte, file *mpi.FileMeta) error
42-
CreateFileDirectories(ctx context.Context, fileMeta *mpi.FileMeta, filePermission os.FileMode) error
42+
Write(ctx context.Context, fileContent []byte, fileName, filePermissions string) error
43+
CreateFileDirectories(ctx context.Context, fileName string) error
4344
WriteChunkedFile(
4445
ctx context.Context,
45-
file *mpi.File,
46+
fileName, filePermissions string,
4647
header *mpi.FileDataChunkHeader,
4748
stream grpc.ServerStreamingClient[mpi.FileDataChunk],
4849
) error
@@ -57,17 +58,18 @@ type (
5758
}
5859

5960
fileServiceOperatorInterface interface {
60-
File(ctx context.Context, file *mpi.File, fileActions map[string]*model.FileCache) error
61+
File(ctx context.Context, file *mpi.File, tempFilePath, expectedHash string) error
6162
UpdateOverview(ctx context.Context, instanceID string, filesToUpdate []*mpi.File, configPath string,
6263
iteration int) error
63-
ChunkedFile(ctx context.Context, file *mpi.File) error
64+
ChunkedFile(ctx context.Context, file *mpi.File, tempFilePath, expectedHash string) error
6465
IsConnected() bool
6566
UpdateFile(
6667
ctx context.Context,
6768
instanceID string,
6869
fileToUpdate *mpi.File,
6970
) error
7071
SetIsConnected(isConnected bool)
72+
ValidateFileHash(filePath, expectedHash string) error
7173
}
7274

7375
fileManagerServiceInterface interface {
@@ -196,15 +198,16 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string)
196198

197199
continue
198200
case model.Delete, model.Update:
199-
content := fms.rollbackFileContents[fileAction.File.GetFileMeta().GetName()]
200-
err := fms.fileOperator.Write(ctx, content, fileAction.File.GetFileMeta())
201+
fileMeta := fileAction.File.GetFileMeta()
202+
content := fms.rollbackFileContents[fileMeta.GetName()]
203+
err := fms.fileOperator.Write(ctx, content, fileMeta.GetName(), fileMeta.GetPermissions())
201204
if err != nil {
202205
return err
203206
}
204207

205208
// currentFilesOnDisk needs to be updated after rollback action is performed
206-
fileAction.File.GetFileMeta().Hash = files.GenerateHash(content)
207-
fms.currentFilesOnDisk[fileAction.File.GetFileMeta().GetName()] = fileAction.File
209+
fileMeta.Hash = files.GenerateHash(content)
210+
fms.currentFilesOnDisk[fileMeta.GetName()] = fileAction.File
208211
case model.Unchanged:
209212
fallthrough
210213
default:
@@ -448,21 +451,29 @@ func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, m
448451
}
449452

450453
func (fms *FileManagerService) executeFileActions(ctx context.Context) error {
454+
tempDir := os.TempDir()
455+
456+
// Download files to temporary location
457+
downloadError := fms.downloadUpdatedFilesToTempLocation(ctx, tempDir)
458+
if downloadError != nil {
459+
return downloadError
460+
}
461+
462+
// Move/Delete files
451463
for _, fileAction := range fms.fileActions {
452464
switch fileAction.Action {
453465
case model.Delete:
454-
slog.DebugContext(ctx, "File action, deleting file", "file", fileAction.File.GetFileMeta().GetName())
466+
slog.DebugContext(ctx, "Deleting file", "file", fileAction.File.GetFileMeta().GetName())
455467
if err := os.Remove(fileAction.File.GetFileMeta().GetName()); err != nil && !os.IsNotExist(err) {
456468
return fmt.Errorf("error deleting file: %s error: %w",
457469
fileAction.File.GetFileMeta().GetName(), err)
458470
}
459471

460472
continue
461473
case model.Add, model.Update:
462-
slog.DebugContext(ctx, "File action, add or update file", "file", fileAction.File.GetFileMeta().GetName())
463-
updateErr := fms.fileUpdate(ctx, fileAction.File)
464-
if updateErr != nil {
465-
return updateErr
474+
err := fms.moveFilesFromTempDirectory(ctx, fileAction, tempDir)
475+
if err != nil {
476+
return err
466477
}
467478
case model.Unchanged:
468479
slog.DebugContext(ctx, "File unchanged")
@@ -472,12 +483,64 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) error {
472483
return nil
473484
}
474485

475-
func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) error {
486+
func (fms *FileManagerService) moveFilesFromTempDirectory(
487+
ctx context.Context, fileAction *model.FileCache, tempDir string,
488+
) error {
489+
fileName := fileAction.File.GetFileMeta().GetName()
490+
slog.DebugContext(ctx, "Updating file", "file", fileName)
491+
tempFilePath := filepath.Join(tempDir, fileName)
492+
493+
// Create parent directories for the target file if they don't exist
494+
if err := os.MkdirAll(filepath.Dir(fileName), dirPerm); err != nil {
495+
return fmt.Errorf("failed to create directories for %s: %w", fileName, err)
496+
}
497+
498+
moveErr := os.Rename(tempFilePath, fileName)
499+
if moveErr != nil {
500+
return fmt.Errorf("failed to rename file: %w", moveErr)
501+
}
502+
503+
if removeError := os.Remove(tempFilePath); removeError != nil && !os.IsNotExist(removeError) {
504+
slog.ErrorContext(
505+
ctx,
506+
"Error deleting temp file",
507+
"file", fileName,
508+
"error", removeError,
509+
)
510+
}
511+
512+
return fms.fileServiceOperator.ValidateFileHash(fileName, fileAction.File.GetFileMeta().GetHash())
513+
}
514+
515+
func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Context, tempDir string) error {
516+
for _, fileAction := range fms.fileActions {
517+
if fileAction.Action == model.Add || fileAction.Action == model.Update {
518+
tempFilePath := filepath.Join(tempDir, fileAction.File.GetFileMeta().GetName())
519+
520+
slog.DebugContext(
521+
ctx,
522+
"Downloading file to temp location",
523+
"file", tempFilePath,
524+
)
525+
526+
updateErr := fms.fileUpdate(ctx, fileAction.File, tempFilePath)
527+
if updateErr != nil {
528+
return updateErr
529+
}
530+
}
531+
}
532+
533+
return nil
534+
}
535+
536+
func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File, tempFilePath string) error {
537+
expectedHash := fms.fileActions[file.GetFileMeta().GetName()].File.GetFileMeta().GetHash()
538+
476539
if file.GetFileMeta().GetSize() <= int64(fms.agentConfig.Client.Grpc.MaxFileSize) {
477-
return fms.fileServiceOperator.File(ctx, file, fms.fileActions)
540+
return fms.fileServiceOperator.File(ctx, file, tempFilePath, expectedHash)
478541
}
479542

480-
return fms.fileServiceOperator.ChunkedFile(ctx, file)
543+
return fms.fileServiceOperator.ChunkedFile(ctx, file, tempFilePath, expectedHash)
481544
}
482545

483546
func (fms *FileManagerService) checkAllowedDirectory(checkFiles []*mpi.File) error {

internal/file/file_manager_service_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,15 @@ func TestFileManagerService_ConfigApply_Add(t *testing.T) {
3333
tempDir := t.TempDir()
3434

3535
filePath := filepath.Join(tempDir, "nginx.conf")
36+
3637
fileContent := []byte("location /test {\n return 200 \"Test location\\n\";\n}")
3738
fileHash := files.GenerateHash(fileContent)
3839
defer helpers.RemoveFileWithErrorCheck(t, filePath)
3940

4041
overview := protos.FileOverview(filePath, fileHash)
4142

4243
manifestDirPath := tempDir
43-
manifestFilePath := manifestDirPath + "/manifest.json"
44+
manifestFilePath := filepath.Join(manifestDirPath, "manifest.json")
4445
helpers.CreateFileWithErrorCheck(t, manifestDirPath, "manifest.json")
4546

4647
fakeFileServiceClient := &v1fakes.FakeFileServiceClient{}
@@ -97,7 +98,7 @@ func TestFileManagerService_ConfigApply_Add_LargeFile(t *testing.T) {
9798
}
9899

99100
manifestDirPath := tempDir
100-
manifestFilePath := manifestDirPath + "/manifest.json"
101+
manifestFilePath := filepath.Join(manifestDirPath, "manifest.json")
101102

102103
fakeFileServiceClient.GetFileStreamReturns(fakeServerStreamingClient, nil)
103104
agentConfig := types.AgentConfig()

internal/file/file_operator.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,34 +39,33 @@ func NewFileOperator(manifestLock *sync.RWMutex) *FileOperator {
3939
}
4040
}
4141

42-
func (fo *FileOperator) Write(ctx context.Context, fileContent []byte, file *mpi.FileMeta) error {
43-
filePermission := files.FileMode(file.GetPermissions())
44-
err := fo.CreateFileDirectories(ctx, file, filePermission)
42+
func (fo *FileOperator) Write(ctx context.Context, fileContent []byte, fileName, filePermissions string) error {
43+
filePermission := files.FileMode(filePermissions)
44+
err := fo.CreateFileDirectories(ctx, fileName)
4545
if err != nil {
4646
return err
4747
}
4848

49-
writeErr := os.WriteFile(file.GetName(), fileContent, filePermission)
49+
writeErr := os.WriteFile(fileName, fileContent, filePermission)
5050
if writeErr != nil {
51-
return fmt.Errorf("error writing to file %s: %w", file.GetName(), writeErr)
51+
return fmt.Errorf("error writing to file %s: %w", fileName, writeErr)
5252
}
53-
slog.DebugContext(ctx, "Content written to file", "file_path", file.GetName())
53+
slog.DebugContext(ctx, "Content written to file", "file_path", fileName)
5454

5555
return nil
5656
}
5757

5858
func (fo *FileOperator) CreateFileDirectories(
5959
ctx context.Context,
60-
fileMeta *mpi.FileMeta,
61-
filePermission os.FileMode,
60+
fileName string,
6261
) error {
63-
if _, err := os.Stat(fileMeta.GetName()); os.IsNotExist(err) {
64-
parentDirectory := path.Dir(fileMeta.GetName())
62+
if _, err := os.Stat(fileName); os.IsNotExist(err) {
63+
parentDirectory := path.Dir(fileName)
6564
slog.DebugContext(
6665
ctx, "File does not exist, creating parent directory",
6766
"directory_path", parentDirectory,
6867
)
69-
err = os.MkdirAll(parentDirectory, filePermission)
68+
err = os.MkdirAll(parentDirectory, dirPerm)
7069
if err != nil {
7170
return fmt.Errorf("error creating directory %s: %w", parentDirectory, err)
7271
}
@@ -77,23 +76,22 @@ func (fo *FileOperator) CreateFileDirectories(
7776

7877
func (fo *FileOperator) WriteChunkedFile(
7978
ctx context.Context,
80-
file *mpi.File,
79+
fileName, filePermissions string,
8180
header *mpi.FileDataChunkHeader,
8281
stream grpc.ServerStreamingClient[mpi.FileDataChunk],
8382
) error {
84-
filePermissions := files.FileMode(file.GetFileMeta().GetPermissions())
85-
createFileDirectoriesError := fo.CreateFileDirectories(ctx, file.GetFileMeta(), filePermissions)
83+
createFileDirectoriesError := fo.CreateFileDirectories(ctx, fileName)
8684
if createFileDirectoriesError != nil {
8785
return createFileDirectoriesError
8886
}
8987

90-
fileToWrite, createError := os.Create(file.GetFileMeta().GetName())
88+
fileToWrite, createError := os.Create(fileName)
9189
defer func() {
9290
closeError := fileToWrite.Close()
9391
if closeError != nil {
9492
slog.WarnContext(
9593
ctx, "Failed to close file",
96-
"file", file.GetFileMeta().GetName(),
94+
"file", fileName,
9795
"error", closeError,
9896
)
9997
}
@@ -102,7 +100,12 @@ func (fo *FileOperator) WriteChunkedFile(
102100
return createError
103101
}
104102

105-
slog.DebugContext(ctx, "Writing chunked file", "file", file.GetFileMeta().GetName())
103+
filePermission := files.FileMode(filePermissions)
104+
if err := os.Chmod(fileName, filePermission); err != nil {
105+
return fmt.Errorf("error setting permissions for %s file: %w", fileName, err)
106+
}
107+
108+
slog.DebugContext(ctx, "Writing chunked file", "file", fileName)
106109
for range header.GetChunks() {
107110
chunk, recvError := stream.Recv()
108111
if recvError != nil {
@@ -111,7 +114,7 @@ func (fo *FileOperator) WriteChunkedFile(
111114

112115
_, chunkWriteError := fileToWrite.Write(chunk.GetContent().GetData())
113116
if chunkWriteError != nil {
114-
return fmt.Errorf("error writing chunk to file %s: %w", file.GetFileMeta().GetName(), chunkWriteError)
117+
return fmt.Errorf("error writing chunk to file %s: %w", fileName, chunkWriteError)
115118
}
116119
}
117120

internal/file/file_operator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestFileOperator_Write(t *testing.T) {
3333

3434
fileMeta := protos.FileMeta(filePath, files.GenerateHash(fileContent))
3535

36-
writeErr := fileOp.Write(ctx, fileContent, fileMeta)
36+
writeErr := fileOp.Write(ctx, fileContent, fileMeta.GetName(), fileMeta.GetPermissions())
3737
require.NoError(t, writeErr)
3838
assert.FileExists(t, filePath)
3939

internal/file/file_service_operator.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@ import (
2424
"github.com/nginx/agent/v3/internal/config"
2525
internalgrpc "github.com/nginx/agent/v3/internal/grpc"
2626
"github.com/nginx/agent/v3/internal/logger"
27-
"github.com/nginx/agent/v3/internal/model"
2827
"github.com/nginx/agent/v3/pkg/files"
2928
"github.com/nginx/agent/v3/pkg/id"
3029
"google.golang.org/grpc"
3130
"google.golang.org/protobuf/types/known/timestamppb"
3231
)
3332

34-
// File service operator handles requests to the grpc file service
35-
33+
// FileServiceOperator handles requests to the grpc file service
3634
type FileServiceOperator struct {
3735
fileServiceClient mpi.FileServiceClient
3836
agentConfig *config.Config
@@ -64,8 +62,10 @@ func (fso *FileServiceOperator) IsConnected() bool {
6462
return fso.isConnected.Load()
6563
}
6664

67-
func (fso *FileServiceOperator) File(ctx context.Context, file *mpi.File,
68-
fileActions map[string]*model.FileCache,
65+
func (fso *FileServiceOperator) File(
66+
ctx context.Context,
67+
file *mpi.File,
68+
tempFilePath, expectedHash string,
6969
) error {
7070
slog.DebugContext(ctx, "Getting file", "file", file.GetFileMeta().GetName())
7171

@@ -92,12 +92,16 @@ func (fso *FileServiceOperator) File(ctx context.Context, file *mpi.File,
9292
return fmt.Errorf("error getting file data for %s: %w", file.GetFileMeta(), getFileErr)
9393
}
9494

95-
if writeErr := fso.fileOperator.Write(ctx, getFileResp.GetContents().GetContents(),
96-
file.GetFileMeta()); writeErr != nil {
95+
if writeErr := fso.fileOperator.Write(
96+
ctx,
97+
getFileResp.GetContents().GetContents(),
98+
tempFilePath,
99+
file.GetFileMeta().GetPermissions(),
100+
); writeErr != nil {
97101
return writeErr
98102
}
99103

100-
return fso.validateFileHash(file.GetFileMeta().GetName(), fileActions)
104+
return fso.ValidateFileHash(tempFilePath, expectedHash)
101105
}
102106

103107
func (fso *FileServiceOperator) UpdateOverview(
@@ -210,7 +214,9 @@ func (fso *FileServiceOperator) UpdateOverview(
210214
return err
211215
}
212216

213-
func (fso *FileServiceOperator) ChunkedFile(ctx context.Context, file *mpi.File) error {
217+
func (fso *FileServiceOperator) ChunkedFile(
218+
ctx context.Context, file *mpi.File, tempFilePath, expectedHash string,
219+
) error {
214220
slog.DebugContext(ctx, "Getting chunked file", "file", file.GetFileMeta().GetName())
215221

216222
stream, err := fso.fileServiceClient.GetFileStream(ctx, &mpi.GetFileRequest{
@@ -235,12 +241,14 @@ func (fso *FileServiceOperator) ChunkedFile(ctx context.Context, file *mpi.File)
235241

236242
header := headerChunk.GetHeader()
237243

238-
writeChunkedFileError := fso.fileOperator.WriteChunkedFile(ctx, file, header, stream)
244+
writeChunkedFileError := fso.fileOperator.WriteChunkedFile(
245+
ctx, tempFilePath, file.GetFileMeta().GetPermissions(), header, stream,
246+
)
239247
if writeChunkedFileError != nil {
240248
return writeChunkedFileError
241249
}
242250

243-
return nil
251+
return fso.ValidateFileHash(tempFilePath, expectedHash)
244252
}
245253

246254
func (fso *FileServiceOperator) UpdateFile(
@@ -262,15 +270,18 @@ func (fso *FileServiceOperator) UpdateFile(
262270
return fso.sendUpdateFileStream(ctx, fileToUpdate, fso.agentConfig.Client.Grpc.FileChunkSize)
263271
}
264272

265-
func (fso *FileServiceOperator) validateFileHash(filePath string, fileActions map[string]*model.FileCache) error {
273+
func (fso *FileServiceOperator) ValidateFileHash(filePath, expectedHash string) error {
266274
content, err := os.ReadFile(filePath)
267275
if err != nil {
268276
return err
269277
}
270278
fileHash := files.GenerateHash(content)
271279

272-
if fileHash != fileActions[filePath].File.GetFileMeta().GetHash() {
273-
return fmt.Errorf("error writing file, file hash does not match for file %s", filePath)
280+
if fileHash != expectedHash {
281+
return fmt.Errorf(
282+
"error writing file, file hash does not match for file %s, expected hash: %s actual hash: %s",
283+
filePath, fileHash, expectedHash,
284+
)
274285
}
275286

276287
return nil

0 commit comments

Comments
 (0)