Skip to content

Commit 2a46ac7

Browse files
committed
fix: address comments and apply fixes (#77)
* Ensure the mutex is not copied, even when the nodeServer is copied by storing a pointer to the mutex, instead of the mutex itself. * Use Mutex instead of RWMutex, as having two readers of the variable at the same time means we are going to write the state at the same time, corrupting the state file on storage. * Mutex / RWMutex are not recursive / re-entrant in Go, so in two cases we do not call `Unlock()` through `defer` as `persistState()` also takes the lock. * As a rule of thumb, Locking a Mutex should be as close as possible to the resource requiring it, to minimize the size of the critical section / the time spent holding the lock. * Remount each volume in a goroutine, with a rate limits of the number of active routine to prevent contention, and keep under control startup times.
1 parent d61d880 commit 2a46ac7

3 files changed

Lines changed: 116 additions & 72 deletions

File tree

.devcontainer/rclone/install.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,6 @@ rm -rf /tmp/rclone
1919
# Fix the $GOPATH folder
2020
chown -R "${USERNAME}:golang" /go
2121
chmod -R g+r+w /go
22+
23+
# Make sure the default folders exists
24+
mkdir -p /var/lib/kubelet/plugins/csi-rclone/

pkg/rclone/driver.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package rclone
22

33
import (
4+
"context"
45
"fmt"
56
"net"
67
"os"
8+
"path/filepath"
79
"sync"
810

911
"github.com/SwissDataScienceCenter/csi-rclone/pkg/kube"
@@ -87,13 +89,22 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st
8789
Interface: mount.New(""),
8890
Exec: utilexec.New(),
8991
},
90-
RcloneOps: rcloneOps,
91-
mountedVolumes: make(map[string]*MountedVolume),
92+
RcloneOps: rcloneOps,
93+
mountedVolumes: make(map[string]MountedVolume),
94+
mutex: &sync.Mutex{},
9295
stateFile: stateFile,
9396
}
9497

98+
// Ensure the folder exists
99+
if err = os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil {
100+
return nil, fmt.Errorf("failed to create state directory: %v", err)
101+
}
102+
95103
// Load persisted state on startup
96-
if err := ns.loadState(); err != nil {
104+
ns.mutex.Lock()
105+
defer ns.mutex.Unlock()
106+
107+
if ns.mountedVolumes, err = readVolumeMap(ns.stateFile); err != nil {
97108
klog.Warningf("Failed to load persisted volume state: %v", err)
98109
}
99110

pkg/rclone/nodeserver.go

Lines changed: 99 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"errors"
1212
"fmt"
1313
"os"
14-
"path/filepath"
14+
"runtime"
1515
"strings"
1616
"sync"
1717
"time"
@@ -42,20 +42,20 @@ type nodeServer struct {
4242
RcloneOps Operations
4343

4444
// Track mounted volumes for automatic remounting
45-
mountedVolumes map[string]*MountedVolume
46-
mutex sync.RWMutex
45+
mountedVolumes map[string]MountedVolume
46+
mutex *sync.Mutex
4747
stateFile string
4848
}
4949

5050
type MountedVolume struct {
51-
VolumeId string
52-
TargetPath string
53-
Remote string
54-
RemotePath string
55-
ConfigData string
56-
ReadOnly bool
57-
Parameters map[string]string
58-
SecretName string
51+
VolumeId string
52+
TargetPath string
53+
Remote string
54+
RemotePath string
55+
ConfigData string
56+
ReadOnly bool
57+
Parameters map[string]string
58+
SecretName string
5959
SecretNamespace string
6060
}
6161

@@ -377,20 +377,20 @@ func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePat
377377
ns.mutex.Lock()
378378
defer ns.mutex.Unlock()
379379

380-
ns.mountedVolumes[volumeId] = &MountedVolume{
381-
VolumeId: volumeId,
382-
TargetPath: targetPath,
383-
Remote: remote,
384-
RemotePath: remotePath,
385-
ConfigData: configData,
386-
ReadOnly: readOnly,
387-
Parameters: parameters,
388-
SecretName: secretName,
380+
ns.mountedVolumes[volumeId] = MountedVolume{
381+
VolumeId: volumeId,
382+
TargetPath: targetPath,
383+
Remote: remote,
384+
RemotePath: remotePath,
385+
ConfigData: configData,
386+
ReadOnly: readOnly,
387+
Parameters: parameters,
388+
SecretName: secretName,
389389
SecretNamespace: secretNamespace,
390390
}
391391
klog.Infof("Tracked mounted volume %s at path %s", volumeId, targetPath)
392392

393-
if err := ns.persistState(); err != nil {
393+
if err := writeVolumeMap(ns.stateFile, ns.mountedVolumes); err != nil {
394394
klog.Errorf("Failed to persist volume state: %v", err)
395395
}
396396
}
@@ -403,15 +403,20 @@ func (ns *nodeServer) removeTrackedVolume(volumeId string) {
403403
delete(ns.mountedVolumes, volumeId)
404404
klog.Infof("Removed tracked volume %s", volumeId)
405405

406-
if err := ns.persistState(); err != nil {
406+
if err := writeVolumeMap(ns.stateFile, ns.mountedVolumes); err != nil {
407407
klog.Errorf("Failed to persist volume state: %v", err)
408408
}
409409
}
410410

411411
// Automatically remount all tracked volumes after daemon restart
412412
func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error {
413-
ns.mutex.RLock()
414-
defer ns.mutex.RUnlock()
413+
type mountResult struct {
414+
volumeID string
415+
err error
416+
}
417+
418+
ns.mutex.Lock()
419+
defer ns.mutex.Unlock()
415420

416421
if len(ns.mountedVolumes) == 0 {
417422
klog.Info("No tracked volumes to remount")
@@ -420,32 +425,67 @@ func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error {
420425

421426
klog.Infof("Remounting %d tracked volumes", len(ns.mountedVolumes))
422427

428+
// Limit the number of active workers to the number of CPU threads (arbitrarily chosen)
429+
limits := make(chan bool, runtime.GOMAXPROCS(0))
430+
defer close(limits)
431+
432+
volumesCount := len(ns.mountedVolumes)
433+
results := make(chan mountResult, volumesCount)
434+
defer close(results)
435+
436+
ctxWithTimeout, cancel := context.WithTimeout(ctx, 60*time.Second)
437+
defer cancel()
438+
423439
for volumeId, mv := range ns.mountedVolumes {
424-
klog.Infof("Remounting volume %s to %s", volumeId, mv.TargetPath)
440+
go func() {
441+
limits <- true // block until there is a free slot in the queue
442+
defer func() {
443+
<-limits // free a slot in the queue when we exit
444+
}()
425445

426-
// Create the mount directory if it doesn't exist
427-
if err := os.MkdirAll(mv.TargetPath, 0750); err != nil {
428-
klog.Errorf("Failed to create mount directory %s: %v", mv.TargetPath, err)
429-
continue
430-
}
446+
ctxWithMountTimeout, cancel := context.WithTimeout(ctxWithTimeout, 30*time.Second)
447+
defer cancel()
431448

432-
// Remount the volume
433-
rcloneVol := &RcloneVolume{
434-
ID: mv.VolumeId,
435-
Remote: mv.Remote,
436-
RemotePath: mv.RemotePath,
437-
}
449+
klog.Infof("Remounting volume %s to %s", volumeId, mv.TargetPath)
438450

439-
err := ns.RcloneOps.Mount(ctx, rcloneVol, mv.TargetPath, mv.ConfigData, mv.ReadOnly, mv.Parameters)
440-
if err != nil {
441-
klog.Errorf("Failed to remount volume %s: %v", volumeId, err)
442-
// Don't return error here - continue with other volumes
443-
} else {
444-
klog.Infof("Successfully remounted volume %s", volumeId)
445-
}
451+
// Create the mount directory if it doesn't exist
452+
var err error
453+
if err = os.MkdirAll(mv.TargetPath, 0750); err != nil {
454+
klog.Errorf("Failed to create mount directory %s: %v", mv.TargetPath, err)
455+
} else {
456+
// Remount the volume
457+
rcloneVol := &RcloneVolume{
458+
ID: mv.VolumeId,
459+
Remote: mv.Remote,
460+
RemotePath: mv.RemotePath,
461+
}
462+
463+
err = ns.RcloneOps.Mount(ctxWithMountTimeout, rcloneVol, mv.TargetPath, mv.ConfigData, mv.ReadOnly, mv.Parameters)
464+
}
465+
466+
results <- mountResult{volumeId, err}
467+
}()
446468
}
447469

448-
return nil
470+
for {
471+
select {
472+
case result := <-results:
473+
volumesCount--
474+
if result.err != nil {
475+
klog.Errorf("Failed to remount volume %s: %v", result.volumeID, result.err)
476+
// Don't return error here, continue with other volumes not to block all users because of a failed mount.
477+
delete(ns.mountedVolumes, result.volumeID)
478+
// Should we keep it on disk? This will be lost on the first new mount which will override the file.
479+
} else {
480+
klog.Infof("Successfully remounted volume %s", result.volumeID)
481+
}
482+
if volumesCount == 0 {
483+
return nil
484+
}
485+
case <-ctxWithTimeout.Done():
486+
return ctxWithTimeout.Err()
487+
}
488+
}
449489
}
450490

451491
func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error {
@@ -463,55 +503,45 @@ func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error {
463503
}
464504

465505
// Persist volume state to disk
466-
func (ns *nodeServer) persistState() error {
467-
ns.mutex.RLock()
468-
defer ns.mutex.RUnlock()
469-
470-
if ns.stateFile == "" {
506+
func writeVolumeMap(filename string, volumes map[string]MountedVolume) error {
507+
if filename == "" {
471508
return nil
472509
}
473510

474-
data, err := json.Marshal(ns.mountedVolumes)
511+
data, err := json.Marshal(volumes)
475512
if err != nil {
476513
return fmt.Errorf("failed to marshal volume state: %v", err)
477514
}
478515

479-
if err := os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil {
480-
return fmt.Errorf("failed to create state directory: %v", err)
481-
}
482-
483-
if err := os.WriteFile(ns.stateFile, data, 0600); err != nil {
516+
if err := os.WriteFile(filename, data, 0600); err != nil {
484517
return fmt.Errorf("failed to write state file: %v", err)
485518
}
486519

487-
klog.Infof("Persisted volume state to %s", ns.stateFile)
520+
klog.Infof("Persisted volume state to %s", filename)
488521
return nil
489522
}
490523

491524
// Load volume state from disk
492-
func (ns *nodeServer) loadState() error {
493-
ns.mutex.Lock()
494-
defer ns.mutex.Unlock()
525+
func readVolumeMap(filename string) (map[string]MountedVolume, error) {
526+
volumes := make(map[string]MountedVolume)
495527

496-
if ns.stateFile == "" {
497-
return nil
528+
if filename == "" {
529+
return volumes, nil
498530
}
499531

500-
data, err := os.ReadFile(ns.stateFile)
532+
data, err := os.ReadFile(filename)
501533
if err != nil {
502534
if os.IsNotExist(err) {
503535
klog.Info("No persisted volume state found, starting fresh")
504-
return nil
536+
return volumes, nil
505537
}
506-
return fmt.Errorf("failed to read state file: %v", err)
538+
return volumes, fmt.Errorf("failed to read state file: %v", err)
507539
}
508540

509-
var volumes map[string]*MountedVolume
510541
if err := json.Unmarshal(data, &volumes); err != nil {
511-
return fmt.Errorf("failed to unmarshal volume state: %v", err)
542+
return nil, fmt.Errorf("failed to unmarshal volume state: %v", err)
512543
}
513544

514-
ns.mountedVolumes = volumes
515-
klog.Infof("Loaded %d tracked volumes from %s", len(ns.mountedVolumes), ns.stateFile)
516-
return nil
545+
klog.Infof("Loaded %d tracked volumes from %s", len(volumes), filename)
546+
return volumes, nil
517547
}

0 commit comments

Comments
 (0)