Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions concurrency/dir/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ type Options struct {
Target string
}

// Dir atomically writes files to a given directory.
// Dir atomically (best-effort on Windows) writes files to a given directory.
type Dir struct {
log logger.Logger

base string
target string
targetDir string

// prev holds a path we should delete on the *next* successful Write.
// On Unix: the previously active versioned dir.
// On Windows: the last backup directory (target renamed aside).
prev *string
}

Expand All @@ -50,41 +53,45 @@ func New(opts Options) *Dir {
func (d *Dir) Write(files map[string][]byte) error {
newDir := filepath.Join(d.base, fmt.Sprintf("%d-%s", time.Now().UTC().UnixNano(), d.targetDir))

// Ensure base exists
if err := os.MkdirAll(d.base, 0o700); err != nil {
return err
}

// Create the new versioned directory
if err := os.MkdirAll(newDir, 0o700); err != nil {
return err
}

// Write all files into the new versioned directory
for file, b := range files {
path := filepath.Join(newDir, file)
// Ensure parent directories exist for nested files
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
return err
}
if err := os.WriteFile(path, b, 0o600); err != nil {
return err
}
d.log.Infof("Written file %s", file)
}

if err := os.Symlink(newDir, d.target+".new"); err != nil {
return err
}

d.log.Infof("Syslink %s to %s.new", newDir, d.target)

if err := os.Rename(d.target+".new", d.target); err != nil {
// Platform-specific switch into place. It returns what we should delete on the NEXT run.
nextPrev, err := d.switchTo(newDir)
if err != nil {
return err
}

d.log.Infof("Atomic write to %s", d.target)

// Best-effort cleanup from the *previous* run
if d.prev != nil {
if err := os.RemoveAll(*d.prev); err != nil {
return err
}
}

d.prev = &newDir
// Set what to delete on the *next* run.
d.prev = nextPrev

return nil
}
46 changes: 46 additions & 0 deletions concurrency/dir/switch_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//go:build !windows && !plan9

/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dir

import (
"os"
)

func (d *Dir) switchTo(newDir string) (*string, error) {
// Create a symlink and atomically rename it into place.
tmpLink := d.target + ".new"

// Remove any stale temp link
_ = os.Remove(tmpLink)

if err := os.Symlink(newDir, tmpLink); err != nil {
return nil, err
}
d.log.Debugf("Symlink %s -> %s", tmpLink, newDir)

// Atomically replace the target symlink (or create it if missing)
// On POSIX, rename on the same filesystem is atomic.
if err := os.Rename(tmpLink, d.target); err != nil {
// Clean up temp link if rename fails
_ = os.Remove(tmpLink)
return nil, err
}

d.log.Debugf("Atomic write to %s", d.target)

// On Unix we keep versioned dirs and delete the *previous* version on next run.
return &newDir, nil
}
61 changes: 61 additions & 0 deletions concurrency/dir/switch_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//go:build windows

/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dir

import (
"fmt"
"os"
"path/filepath"
"time"
)

func (d *Dir) switchTo(newDir string) (*string, error) {
// Windows notes:
// - os.Rename does NOT replace existing directories.
// - Directory symlinks/junctions are unreliable without privileges.
// Strategy:
// 1) If target exists, rename it to a timestamped backup alongside base.
// 2) Rename newDir -> target.
// 3) Return backup path so we delete it on the *next* run (avoids data loss if step 2 fails).
var backup *string

// If target exists, rename it aside
if fi, err := os.Lstat(d.target); err == nil && fi.IsDir() {
bak := filepath.Join(d.base, fmt.Sprintf("backup-%d-%s", time.Now().UTC().UnixNano(), d.targetDir))
// Be defensive: remove any stale leftover
_ = os.RemoveAll(bak)

if err := os.Rename(d.target, bak); err != nil {
return nil, err
}
d.log.Debugf("Renamed existing %s to backup %s", d.target, bak)
backup = &bak
}

// Move the freshly written versioned dir into place as the new target
if err := os.Rename(newDir, d.target); err != nil {
// Try to restore the backup if we created one
if backup != nil {
_ = os.Rename(*backup, d.target)
}
return nil, err
}

d.log.Debugf("Replaced directory at %s (Windows best-effort atomicity)", d.target)

// On Windows we delete the backup on the *next* run (so we don't risk losing data if a crash happens now).
return backup, nil
}
Loading