Skip to content

Commit fad2204

Browse files
authored
Move mount, unmount and isMountPoint functionalities to github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint/mounter (#444)
This is part of the effort of consolidating Mountpoint related functionalities in `github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint` package. --- By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --------- Signed-off-by: Burak Varlı <burakvar@amazon.co.uk>
1 parent 8677df5 commit fad2204

10 files changed

Lines changed: 208 additions & 100 deletions

File tree

pkg/driver/driver.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,19 @@ import (
2323
"os"
2424
"time"
2525

26+
"github.com/container-storage-interface/spec/lib/go/csi"
27+
"google.golang.org/grpc"
28+
"k8s.io/client-go/kubernetes"
29+
"k8s.io/client-go/rest"
30+
"k8s.io/klog/v2"
31+
2632
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node"
2733
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/credentialprovider"
2834
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/mounter"
2935
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/version"
36+
mpmounter "github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint/mounter"
3037
"github.com/awslabs/aws-s3-csi-driver/pkg/podmounter/mppod/watcher"
3138
"github.com/awslabs/aws-s3-csi-driver/pkg/util"
32-
"github.com/container-storage-interface/spec/lib/go/csi"
33-
"google.golang.org/grpc"
34-
"k8s.io/client-go/kubernetes"
35-
"k8s.io/client-go/rest"
36-
"k8s.io/klog/v2"
37-
"k8s.io/mount-utils"
3839
)
3940

4041
const (
@@ -90,20 +91,21 @@ func NewDriver(endpoint string, mpVersion string, nodeID string) (*Driver, error
9091
stopCh := make(chan struct{})
9192

9293
var mounterImpl mounter.Mounter
94+
mpMounter := mpmounter.New()
9395
if util.UsePodMounter() {
9496
podWatcher := watcher.New(clientset, mountpointPodNamespace, podWatcherResyncPeriod)
9597
err = podWatcher.Start(stopCh)
9698
if err != nil {
9799
klog.Fatalf("Failed to start Pod watcher: %v\n", err)
98100
}
99101

100-
mounterImpl, err = mounter.NewPodMounter(podWatcher, credProvider, mount.New(""), nil, kubernetesVersion)
102+
mounterImpl, err = mounter.NewPodMounter(podWatcher, credProvider, mpMounter, nil, kubernetesVersion)
101103
if err != nil {
102104
klog.Fatalln(err)
103105
}
104106
klog.Infoln("Using pod mounter")
105107
} else {
106-
mounterImpl, err = mounter.NewSystemdMounter(credProvider, mpVersion, kubernetesVersion)
108+
mounterImpl, err = mounter.NewSystemdMounter(credProvider, mpMounter, mpVersion, kubernetesVersion)
107109
if err != nil {
108110
klog.Fatalln(err)
109111
}

pkg/driver/node/mounter/mounter.go

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,13 @@ package mounter
33

44
import (
55
"context"
6-
"fmt"
76
"os"
87

9-
"k8s.io/klog/v2"
10-
"k8s.io/mount-utils"
11-
128
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/credentialprovider"
139
"github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint"
1410
"github.com/awslabs/aws-s3-csi-driver/pkg/system"
1511
)
1612

17-
// https://github.com/awslabs/mountpoint-s3/blob/9ed8b6243f4511e2013b2f4303a9197c3ddd4071/mountpoint-s3/src/cli.rs#L421
18-
const mountpointDeviceName = "mountpoint-s3"
19-
2013
type ServiceRunner interface {
2114
StartService(ctx context.Context, config *system.ExecConfig) (string, error)
2215
RunOneshot(ctx context.Context, config *system.ExecConfig) (string, error)
@@ -39,29 +32,3 @@ func MountS3Path() string {
3932
}
4033
return mountS3Path
4134
}
42-
43-
// isMountPoint returns whether given `target` is a `mount-s3` mount.
44-
// We implement additional check on top of `mounter.IsMountPoint` because we need
45-
// to verify not only that the target is a mount point but also that it is specifically a mount-s3 mount point.
46-
// This is achieved by calling the `mounter.List()` method to enumerate all mount points.
47-
func isMountPoint(mounter mount.Interface, target string) (bool, error) {
48-
if _, err := os.Stat(target); os.IsNotExist(err) {
49-
return false, err
50-
}
51-
52-
mountPoints, err := mounter.List()
53-
if err != nil {
54-
return false, fmt.Errorf("Failed to list mounts: %w", err)
55-
}
56-
for _, mp := range mountPoints {
57-
if mp.Path == target {
58-
if mp.Device != mountpointDeviceName {
59-
klog.V(4).Infof("IsMountPoint: %s is not a `mount-s3` mount. Expected device type to be %s but got %s, skipping unmount", target, mountpointDeviceName, mp.Device)
60-
continue
61-
}
62-
63-
return true, nil
64-
}
65-
}
66-
return false, nil
67-
}

pkg/driver/node/mounter/pod_mounter.go

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,17 @@ import (
77
"io/fs"
88
"os"
99
"path/filepath"
10-
"syscall"
1110
"time"
1211

1312
corev1 "k8s.io/api/core/v1"
1413
"k8s.io/apimachinery/pkg/util/wait"
1514
"k8s.io/klog/v2"
16-
"k8s.io/mount-utils"
1715

1816
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/credentialprovider"
1917
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/envprovider"
2018
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/targetpath"
2119
"github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint"
20+
mpmounter "github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint/mounter"
2221
"github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint/mountoptions"
2322
"github.com/awslabs/aws-s3-csi-driver/pkg/podmounter/mppod"
2423
"github.com/awslabs/aws-s3-csi-driver/pkg/podmounter/mppod/watcher"
@@ -33,21 +32,21 @@ const targetDirPerm = fs.FileMode(0755)
3332

3433
// mountSyscall is the function that performs `mount` operation for given `target` with given Mountpoint `args`.
3534
// It returns mounted FUSE file descriptor as a result.
36-
// This is mainly exposed for testing, in production platform-native function (`mountSyscallDefault`) will be used.
35+
// This is mainly exposed for testing, in production platform-native function (`mpmounter.Mount`) will be used.
3736
type mountSyscall func(target string, args mountpoint.Args) (fd int, err error)
3837

3938
// A PodMounter is a [Mounter] that mounts Mountpoint on pre-created Kubernetes Pod running in the same node.
4039
type PodMounter struct {
4140
podWatcher *watcher.Watcher
42-
mount mount.Interface
41+
mount *mpmounter.Mounter
4342
kubeletPath string
4443
mountSyscall mountSyscall
4544
kubernetesVersion string
4645
credProvider *credentialprovider.Provider
4746
}
4847

4948
// NewPodMounter creates a new [PodMounter] with given Kubernetes client.
50-
func NewPodMounter(podWatcher *watcher.Watcher, credProvider *credentialprovider.Provider, mount mount.Interface, mountSyscall mountSyscall, kubernetesVersion string) (*PodMounter, error) {
49+
func NewPodMounter(podWatcher *watcher.Watcher, credProvider *credentialprovider.Provider, mount *mpmounter.Mounter, mountSyscall mountSyscall, kubernetesVersion string) (*PodMounter, error) {
5150
return &PodMounter{
5251
podWatcher: podWatcher,
5352
credProvider: credProvider,
@@ -77,14 +76,12 @@ func (pm *PodMounter) Mount(ctx context.Context, bucketName string, target strin
7776

7877
podID := credentialCtx.PodID
7978

80-
err = pm.verifyOrSetupMountTarget(target)
81-
if err != nil {
82-
return fmt.Errorf("Failed to verify target path can be used as a mount point %q: %w", target, err)
83-
}
84-
8579
isMountPoint, err := pm.IsMountPoint(target)
8680
if err != nil {
87-
return fmt.Errorf("Could not check if %q is already a mount point: %w", target, err)
81+
err = pm.verifyOrSetupMountTarget(target, err)
82+
if err != nil {
83+
return fmt.Errorf("Failed to verify target path can be used as a mount point %q: %w", target, err)
84+
}
8885
}
8986

9087
// TODO: If `target` is a `systemd`-mounted Mountpoint, this would return an error,
@@ -232,8 +229,7 @@ func (pm *PodMounter) Unmount(ctx context.Context, target string, credentialCtx
232229

233230
// IsMountPoint returns whether given `target` is a `mount-s3` mount.
234231
func (pm *PodMounter) IsMountPoint(target string) (bool, error) {
235-
// TODO: Can we just use regular `IsMountPoint` check from `mounter` with containerization?
236-
return isMountPoint(pm.mount, target)
232+
return pm.mount.CheckMountPoint(target)
237233
}
238234

239235
// waitForMountpointPod waints until Mountpoint Pod for given `podID` and `volumeName` is in `Running` state.
@@ -300,7 +296,7 @@ func (pm *PodMounter) waitForMount(parentCtx context.Context, target, podName, p
300296

301297
// closeFUSEDevFD closes given FUSE file descriptor.
302298
func (pm *PodMounter) closeFUSEDevFD(fd int) {
303-
err := syscall.Close(fd)
299+
err := mpmounter.CloseFD(fd)
304300
if err != nil {
305301
klog.V(4).Infof("Mount: Failed to close /dev/fuse file descriptor %d: %v\n", fd, err)
306302
}
@@ -309,20 +305,15 @@ func (pm *PodMounter) closeFUSEDevFD(fd int) {
309305
// verifyOrSetupMountTarget checks target path for existence and corrupted mount error.
310306
// If the target dir does not exists it tries to create it.
311307
// If the target dir is corrupted (decided with `mount.IsCorruptedMnt`) it tries to unmount it to have a clean mount.
312-
func (pm *PodMounter) verifyOrSetupMountTarget(target string) error {
313-
err := verifyMountPointStatx(target)
314-
if err == nil {
315-
return nil
316-
}
317-
308+
func (pm *PodMounter) verifyOrSetupMountTarget(target string, err error) error {
318309
if errors.Is(err, fs.ErrNotExist) {
319310
klog.V(5).Infof("Target path does not exists %s, trying to create", target)
320311
if err := os.MkdirAll(target, targetDirPerm); err != nil {
321312
return fmt.Errorf("Failed to create target directory: %w", err)
322313
}
323314

324315
return nil
325-
} else if mount.IsCorruptedMnt(err) {
316+
} else if pm.mount.IsMountPointCorrupted(err) {
326317
klog.V(4).Infof("Target path %q is a corrupted mount. Trying to unmount", target)
327318
if unmountErr := pm.unmountTarget(target); unmountErr != nil {
328319
klog.V(4).Infof("Failed to unmount target path %q: %v, original failure of stat: %v", target, unmountErr, err)
@@ -332,6 +323,7 @@ func (pm *PodMounter) verifyOrSetupMountTarget(target string) error {
332323
return nil
333324
}
334325

326+
// Some other error that we cannot recover from, just propagate it.
335327
return err
336328
}
337329

@@ -358,13 +350,17 @@ func (pm *PodMounter) podPath(pod *corev1.Pod) string {
358350
return filepath.Join(pm.kubeletPath, "pods", string(pod.UID))
359351
}
360352

361-
// mountSyscallWithDefault delegates to `mountSyscall` if set, or fallbacks to platform-native `mountSyscallDefault`.
353+
// mountSyscallWithDefault delegates to `mountSyscall` if set, or fallbacks to platform-native `mpmounter.Mount`.
362354
func (pm *PodMounter) mountSyscallWithDefault(target string, args mountpoint.Args) (int, error) {
363355
if pm.mountSyscall != nil {
364356
return pm.mountSyscall(target, args)
365357
}
366358

367-
return pm.mountSyscallDefault(target, args)
359+
opts := mpmounter.MountOptions{
360+
ReadOnly: args.Has(mountpoint.ArgReadOnly),
361+
AllowOther: args.Has(mountpoint.ArgAllowOther) || args.Has(mountpoint.ArgAllowRoot),
362+
}
363+
return pm.mount.Mount(target, opts)
368364
}
369365

370366
// unmountTarget calls `unmount` syscall on `target`.

pkg/driver/node/mounter/pod_mounter_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/mounter"
2525
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/mounter/mountertest"
2626
"github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint"
27+
mpmounter "github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint/mounter"
2728
"github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint/mountoptions"
2829
"github.com/awslabs/aws-s3-csi-driver/pkg/podmounter/mppod"
2930
"github.com/awslabs/aws-s3-csi-driver/pkg/podmounter/mppod/watcher"
@@ -119,7 +120,7 @@ func setup(t *testing.T) *testCtx {
119120
err = podWatcher.Start(stopCh)
120121
assert.NoError(t, err)
121122

122-
podMounter, err := mounter.NewPodMounter(podWatcher, credProvider, mount, mountSyscall, testK8sVersion)
123+
podMounter, err := mounter.NewPodMounter(podWatcher, credProvider, mpmounter.NewWithMount(mount), mountSyscall, testK8sVersion)
123124
assert.NoError(t, err)
124125

125126
testCtx.podMounter = podMounter

pkg/driver/node/mounter/systemd_mounter.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,29 @@ import (
1313
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/credentialprovider"
1414
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/envprovider"
1515
"github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint"
16+
mpmounter "github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint/mounter"
1617
"github.com/awslabs/aws-s3-csi-driver/pkg/system"
1718
)
1819

1920
type SystemdMounter struct {
2021
Runner ServiceRunner
2122
Mounter mount.Interface
23+
MpMounter *mpmounter.Mounter
2224
MpVersion string
2325
MountS3Path string
2426
kubernetesVersion string
2527
credProvider *credentialprovider.Provider
2628
}
2729

28-
func NewSystemdMounter(credProvider *credentialprovider.Provider, mpVersion string, kubernetesVersion string) (*SystemdMounter, error) {
30+
func NewSystemdMounter(credProvider *credentialprovider.Provider, mpMounter *mpmounter.Mounter, mpVersion string, kubernetesVersion string) (*SystemdMounter, error) {
2931
runner, err := system.StartOsSystemdSupervisor()
3032
if err != nil {
3133
return nil, fmt.Errorf("failed to start systemd supervisor: %w", err)
3234
}
3335
return &SystemdMounter{
3436
Runner: runner,
3537
Mounter: mount.New(""),
38+
MpMounter: mpMounter,
3639
MpVersion: mpVersion,
3740
MountS3Path: MountS3Path(),
3841
kubernetesVersion: kubernetesVersion,
@@ -42,7 +45,7 @@ func NewSystemdMounter(credProvider *credentialprovider.Provider, mpVersion stri
4245

4346
// IsMountPoint returns whether given `target` is a `mount-s3` mount.
4447
func (m *SystemdMounter) IsMountPoint(target string) (bool, error) {
45-
return isMountPoint(m.Mounter, target)
48+
return m.MpMounter.CheckMountPoint(target)
4649
}
4750

4851
// Mount mounts the given bucket at the target path using provided credentials.
@@ -69,11 +72,11 @@ func (m *SystemdMounter) Mount(ctx context.Context, bucketName string, target st
6972

7073
cleanupDir := false
7174

75+
isMountPoint, err := m.IsMountPoint(target)
7276
// check if the target path exists and is a directory
73-
mountpointErr := verifyMountPointStatx(target)
74-
if mountpointErr != nil {
77+
if err != nil {
7578
// does not exist, create the directory
76-
if os.IsNotExist(mountpointErr) {
79+
if os.IsNotExist(err) {
7780
if err := os.MkdirAll(target, 0755); err != nil {
7881
return fmt.Errorf("Failed to create target directory: %w", err)
7982
}
@@ -85,25 +88,21 @@ func (m *SystemdMounter) Mount(ctx context.Context, bucketName string, target st
8588
}
8689
}
8790
}()
88-
}
89-
// Corrupted mount, try unmounting
90-
if mount.IsCorruptedMnt(mountpointErr) {
91+
// Corrupted mount, try unmounting
92+
} else if m.MpMounter.IsMountPointCorrupted(err) {
9193
klog.V(4).Infof("Mount: Target path %q is a corrupted mount. Trying to unmount.", target)
9294
if mntErr := m.Unmount(ctx, target, credentialprovider.CleanupContext{
9395
WritePath: credentialCtx.WritePath,
9496
PodID: credentialCtx.PodID,
9597
VolumeID: credentialCtx.VolumeID,
9698
}); mntErr != nil {
97-
return fmt.Errorf("Unable to unmount the target %q : %v, %v", target, mountpointErr, mntErr)
99+
return fmt.Errorf("Unable to unmount the target %q : %v, %v", target, err, mntErr)
98100
}
101+
} else {
102+
return fmt.Errorf("Could not check if %q is a mount point: %v", target, err)
99103
}
100104
}
101105

102-
isMountPoint, err := m.IsMountPoint(target)
103-
if err != nil {
104-
return fmt.Errorf("Could not check if %q is a mount point: %v, %v", target, mountpointErr, err)
105-
}
106-
107106
credEnv, authenticationSource, err := m.credProvider.Provide(ctx, credentialCtx)
108107
if err != nil {
109108
klog.V(4).Infof("NodePublishVolume: Failed to provide credentials for %s: %v", target, err)

pkg/driver/node/mounter/systemd_mounter_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,18 @@ import (
88
"strings"
99
"testing"
1010

11+
"slices"
12+
13+
"github.com/golang/mock/gomock"
14+
"k8s.io/mount-utils"
15+
1116
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/credentialprovider"
1217
"github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/mounter"
1318
mock_driver "github.com/awslabs/aws-s3-csi-driver/pkg/driver/node/mounter/mocks"
1419
"github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint"
20+
mpmounter "github.com/awslabs/aws-s3-csi-driver/pkg/mountpoint/mounter"
1521
"github.com/awslabs/aws-s3-csi-driver/pkg/system"
1622
"github.com/awslabs/aws-s3-csi-driver/pkg/util/testutil/assert"
17-
"github.com/golang/mock/gomock"
18-
"k8s.io/mount-utils"
19-
"slices"
2023
)
2124

2225
type mounterTestEnv struct {
@@ -40,6 +43,7 @@ func initMounterTestEnv(t *testing.T) *mounterTestEnv {
4043
mounter: &mounter.SystemdMounter{
4144
Runner: mockRunner,
4245
Mounter: mount.NewFakeMounter(nil),
46+
MpMounter: mpmounter.NewWithMount(mount.NewFakeMounter(nil)),
4347
MpVersion: mountpointVersion,
4448
MountS3Path: mounter.MountS3Path(),
4549
},
@@ -235,7 +239,7 @@ func TestIsMountPoint(t *testing.T) {
235239

236240
for name, test := range tests {
237241
t.Run(name, func(t *testing.T) {
238-
mounter := &mounter.SystemdMounter{Mounter: mount.NewFakeMounter(test.procMountsContent)}
242+
mounter := &mounter.SystemdMounter{MpMounter: mpmounter.NewWithMount(mount.NewFakeMounter(test.procMountsContent))}
239243
isMountPoint, err := mounter.IsMountPoint(test.target)
240244
assert.Equals(t, test.isMountPoint, isMountPoint)
241245
assert.Equals(t, test.expectErr, err != nil)

0 commit comments

Comments
 (0)