Skip to content

Commit 510043d

Browse files
committed
feat: Split stage & publish operations
1 parent 26767ca commit 510043d

3 files changed

Lines changed: 178 additions & 110 deletions

File tree

.devcontainer/devcontainer.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
{
22
"name": "CSI rclone devcontainer",
33
"image": "mcr.microsoft.com/devcontainers/base:bookworm",
4+
"remoteUser": "root",
5+
"containerUser": "root",
46
"features": {
57
"ghcr.io/devcontainers/features/git:1": {},
68
"ghcr.io/devcontainers/features/go:1": {},

pkg/rclone/nodeserver.go

Lines changed: 173 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st
115115
RcloneOps: NewRclone(kubeClient, rclonePort, cacheDir, cacheSize),
116116
mountedVolumes: make(map[string]MountedVolume),
117117
mutex: &sync.Mutex{},
118-
stateFile: "/run/csi-rclone/mounted_volumes.json",
118+
stateFile: "/run/csi-rclone/mounted_volumes.json",
119119
}
120120

121121
// Ensure the folder exists
@@ -134,6 +134,20 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st
134134
return ns, nil
135135
}
136136

137+
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
138+
return &csi.NodeGetCapabilitiesResponse{
139+
Capabilities: []*csi.NodeServiceCapability{
140+
{
141+
Type: &csi.NodeServiceCapability_Rpc{
142+
Rpc: &csi.NodeServiceCapability_RPC{
143+
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
144+
},
145+
},
146+
},
147+
},
148+
}, nil
149+
}
150+
137151
func (ns *NodeServer) Run(ctx context.Context) error {
138152
defer ns.Stop()
139153
return ns.RcloneOps.Run(ctx, func() error {
@@ -216,24 +230,38 @@ type MountedVolume struct {
216230
}
217231

218232
// Mounting Volume (Preparation)
219-
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
220-
return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not implemented")
221-
}
233+
func validateNodeStageVolumeRequest(req *csi.NodeStageVolumeRequest) error {
234+
if req.GetVolumeId() == "" {
235+
return status.Error(codes.InvalidArgument, "empty volume id")
236+
}
222237

223-
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
224-
return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not implemented")
225-
}
238+
if req.GetStagingTargetPath() == "" {
239+
return status.Error(codes.InvalidArgument, "empty staging path")
240+
}
226241

227-
// Mounting Volume (Actual Mounting)
228-
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
229-
if err := validatePublishVolumeRequest(req); err != nil {
230-
return nil, err
242+
capability := req.GetVolumeCapability()
243+
if capability == nil {
244+
return status.Error(codes.InvalidArgument, "no volume capability set")
245+
}
246+
247+
switch capability.GetAccessMode().GetMode() {
248+
case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
249+
return nil
250+
case csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER:
251+
return nil
252+
case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
253+
return nil
254+
default:
255+
return status.Errorf(codes.FailedPrecondition, "Volume access mode not supported %v", capability.GetAccessMode().GetMode())
231256
}
257+
}
232258

233-
targetPath := req.GetTargetPath()
234-
volumeId := req.GetVolumeId()
259+
func isNodeStageReqReadOnly(req *csi.NodeStageVolumeRequest) bool {
260+
return req.GetVolumeCapability().GetAccessMode().GetMode() == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY
261+
}
262+
263+
func getVolumeConfig(ctx context.Context, req *csi.NodeStageVolumeRequest) (*MountedVolume, error) {
235264
volumeContext := req.GetVolumeContext()
236-
readOnly := req.GetReadonly()
237265
secretName, foundSecret := volumeContext["secretName"]
238266
secretNamespace, foundSecretNamespace := volumeContext["secretNamespace"]
239267
// For backwards compatibility - prior to the change in #20 this field held the namespace
@@ -271,43 +299,44 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
271299
remote, remotePath, configData, flags, e := extractFlags(req.GetVolumeContext(), req.GetSecrets(), pvcSecret, savedPvcSecret)
272300
delete(flags, "secretName")
273301
delete(flags, "namespace")
274-
if e != nil {
275-
klog.Warningf("storage parameter error: %s", e)
276-
return nil, e
277-
}
278-
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
279-
if err != nil {
280-
if os.IsNotExist(err) {
281-
if err := os.MkdirAll(targetPath, 0750); err != nil {
282-
return nil, status.Error(codes.Internal, err.Error())
283-
}
284-
notMnt = true
285-
} else {
286-
return nil, status.Error(codes.Internal, err.Error())
287-
}
302+
303+
return &MountedVolume{
304+
VolumeId: req.GetVolumeId(),
305+
TargetPath: req.GetStagingTargetPath(),
306+
Remote: remote,
307+
RemotePath: remotePath,
308+
ConfigData: configData,
309+
ReadOnly: isNodeStageReqReadOnly(req),
310+
Parameters: flags,
311+
SecretName: secretName,
312+
SecretNamespace: secretNamespace,
313+
}, e
314+
}
315+
316+
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
317+
if err := validateNodeStageVolumeRequest(req); err != nil {
318+
return nil, err
288319
}
289320

290-
if !notMnt {
291-
// testing original mount point, make sure the mount link is valid
292-
if _, err := os.ReadDir(targetPath); err == nil {
293-
klog.Infof("already mounted to target %s", targetPath)
294-
return &csi.NodePublishVolumeResponse{}, nil
321+
// Already staged ?
322+
if volume, ok := ns.mountedVolumes[req.GetVolumeId()]; ok {
323+
if volume.TargetPath == req.GetStagingTargetPath() && volume.ReadOnly == isNodeStageReqReadOnly(req) {
324+
return &csi.NodeStageVolumeResponse{}, nil
295325
}
296-
// todo: mount link is invalid, now unmount and remount later (built-in functionality)
297-
klog.Warningf("ReadDir %s failed with %v, unmount this directory", targetPath, err)
326+
return nil, status.Error(codes.AlreadyExists, "Requested Volume capability incompatible with currently staged volume")
327+
}
298328

299-
if err := ns.mounter.Unmount(targetPath); err != nil {
300-
klog.Errorf("Unmount directory %s failed with %v", targetPath, err)
301-
return nil, err
302-
}
329+
volume, err := getVolumeConfig(ctx, req)
330+
if err != nil {
331+
return nil, err
303332
}
304333

305334
rcloneVol := &RcloneVolume{
306-
ID: volumeId,
307-
Remote: remote,
308-
RemotePath: remotePath,
335+
ID: volume.VolumeId,
336+
Remote: volume.Remote,
337+
RemotePath: volume.RemotePath,
309338
}
310-
err = ns.RcloneOps.Mount(ctx, rcloneVol, targetPath, configData, readOnly, flags)
339+
err = ns.RcloneOps.Mount(ctx, rcloneVol, volume.TargetPath, volume.ConfigData, volume.ReadOnly, volume.Parameters)
311340
if err != nil {
312341
if os.IsPermission(err) {
313342
return nil, status.Error(codes.PermissionDenied, err.Error())
@@ -319,12 +348,100 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
319348
}
320349

321350
// Track the mounted volume for automatic remounting
322-
ns.trackMountedVolume(volumeId, targetPath, remote, remotePath, configData, readOnly, flags, secretName, secretNamespace)
351+
ns.trackMountedVolume(volume)
352+
353+
return &csi.NodeStageVolumeResponse{}, nil
354+
}
355+
356+
func validateNodeUnstageVolumeRequest(req *csi.NodeUnstageVolumeRequest) error {
357+
if req.GetVolumeId() == "" {
358+
return status.Error(codes.InvalidArgument, "empty volume id")
359+
}
360+
if req.GetStagingTargetPath() == "" {
361+
return status.Error(codes.InvalidArgument, "empty staging path")
362+
}
363+
364+
return nil
365+
}
366+
367+
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
368+
if err := validateNodeUnstageVolumeRequest(req); err != nil {
369+
return nil, err
370+
}
371+
372+
if volume, ok := ns.mountedVolumes[req.GetVolumeId()]; ok {
373+
if err := ns.RcloneOps.Unmount(ctx, volume.VolumeId, volume.TargetPath); err != nil {
374+
return nil, status.Error(codes.Internal, err.Error())
375+
}
376+
377+
// Remove the volume from tracking
378+
ns.removeTrackedVolume(req.GetVolumeId())
379+
} else {
380+
return nil, status.Error(codes.NotFound, "volume not found")
381+
}
382+
383+
return &csi.NodeUnstageVolumeResponse{}, nil
384+
}
385+
386+
func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
387+
if req.GetVolumeId() == "" {
388+
return status.Error(codes.InvalidArgument, "empty volume id")
389+
}
390+
391+
if req.GetStagingTargetPath() == "" {
392+
return status.Error(codes.InvalidArgument, "empty staging path")
393+
}
394+
395+
if req.GetTargetPath() == "" {
396+
return status.Error(codes.InvalidArgument, "empty target path")
397+
}
398+
399+
capability := req.GetVolumeCapability()
400+
if capability == nil {
401+
return status.Error(codes.InvalidArgument, "no volume capability set")
402+
}
403+
404+
switch capability.GetAccessMode().GetMode() {
405+
case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
406+
return nil
407+
case csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER:
408+
return nil
409+
case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
410+
return nil
411+
default:
412+
return status.Errorf(codes.FailedPrecondition, "Volume access mode not supported %v", capability.GetAccessMode().GetMode())
413+
}
414+
}
415+
416+
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
417+
if err := validateNodePublishVolumeRequest(req); err != nil {
418+
return nil, err
419+
}
420+
421+
volume, ok := ns.mountedVolumes[req.GetVolumeId()]
422+
if !ok {
423+
return nil, status.Error(codes.NotFound, "Volume not found")
424+
}
425+
if volume.ReadOnly && !req.GetReadonly() {
426+
return nil, status.Error(codes.AlreadyExists, "Volume is already published")
427+
}
428+
429+
if mounts, err := ns.mounter.GetMountRefs(req.GetTargetPath()); err == nil && len(mounts) > 0 {
430+
return &csi.NodePublishVolumeResponse{}, nil
431+
}
432+
433+
if err := os.MkdirAll(req.GetTargetPath(), 0755); err != nil {
434+
return nil, status.Error(codes.Internal, err.Error())
435+
}
436+
437+
options := []string{"bind"}
438+
if req.GetReadonly() {
439+
options = append(options, "remount", "ro")
440+
}
441+
if err := ns.mounter.Mount(req.GetStagingTargetPath(), req.GetTargetPath(), "", options); err != nil {
442+
return nil, status.Error(codes.Internal, err.Error())
443+
}
323444

324-
// err = ns.WaitForMountAvailable(targetPath)
325-
// if err != nil {
326-
// return nil, status.Error(codes.Internal, err.Error())
327-
// }
328445
return &csi.NodePublishVolumeResponse{}, nil
329446
}
330447

@@ -356,21 +473,6 @@ func getPVC(ctx context.Context, namespace, name string) (*v1.PersistentVolumeCl
356473
return cs.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, name, metav1.GetOptions{})
357474
}
358475

359-
func validatePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
360-
if req.GetVolumeId() == "" {
361-
return status.Error(codes.InvalidArgument, "empty volume id")
362-
}
363-
364-
if req.GetTargetPath() == "" {
365-
return status.Error(codes.InvalidArgument, "empty target path")
366-
}
367-
368-
if req.GetVolumeCapability() == nil {
369-
return status.Error(codes.InvalidArgument, "no volume capability set")
370-
}
371-
return nil
372-
}
373-
374476
func extractFlags(volumeContext map[string]string, secret map[string]string, pvcSecret *v1.Secret, savedPvcSecret *v1.Secret) (string, string, string, map[string]string, error) {
375477

376478
// Empty argument list
@@ -488,26 +590,14 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
488590
if err := validateUnPublishVolumeRequest(req); err != nil {
489591
return nil, err
490592
}
491-
targetPath := req.GetTargetPath()
492-
if len(targetPath) == 0 {
493-
klog.Warning("no target path provided for NodeUnpublishVolume")
494-
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided")
495-
}
496-
497-
if _, err := ns.RcloneOps.GetVolumeById(ctx, req.GetVolumeId()); err == ErrVolumeNotFound {
498-
klog.Warning("VolumeId not found for NodeUnpublishVolume")
499-
mount.CleanupMountPoint(req.GetTargetPath(), ns.mounter, false)
500-
return &csi.NodeUnpublishVolumeResponse{}, nil
501-
}
502593

503-
if err := ns.RcloneOps.Unmount(ctx, req.GetVolumeId(), targetPath); err != nil {
504-
klog.Warningf("Unmounting volume failed: %s", err)
594+
if err := mount.CleanupMountPoint(req.GetTargetPath(), ns.mounter, true); err != nil {
595+
if mounts, err := ns.mounter.GetMountRefs(req.GetTargetPath()); err == nil && len(mounts) == 0 {
596+
return &csi.NodeUnpublishVolumeResponse{}, nil
597+
}
598+
return nil, status.Error(codes.Internal, err.Error())
505599
}
506600

507-
// Remove the volume from tracking
508-
ns.removeTrackedVolume(req.GetVolumeId())
509-
510-
mount.CleanupMountPoint(req.GetTargetPath(), ns.mounter, false)
511601
return &csi.NodeUnpublishVolumeResponse{}, nil
512602
}
513603

@@ -529,22 +619,12 @@ func (*NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolu
529619
}
530620

531621
// Track mounted volume for automatic remounting
532-
func (ns *NodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePath, configData string, readOnly bool, parameters map[string]string, secretName, secretNamespace string) {
622+
func (ns *NodeServer) trackMountedVolume(volume *MountedVolume) {
533623
ns.mutex.Lock()
534624
defer ns.mutex.Unlock()
535625

536-
ns.mountedVolumes[volumeId] = MountedVolume{
537-
VolumeId: volumeId,
538-
TargetPath: targetPath,
539-
Remote: remote,
540-
RemotePath: remotePath,
541-
ConfigData: configData,
542-
ReadOnly: readOnly,
543-
Parameters: parameters,
544-
SecretName: secretName,
545-
SecretNamespace: secretNamespace,
546-
}
547-
klog.Infof("Tracked mounted volume %s at path %s", volumeId, targetPath)
626+
ns.mountedVolumes[volume.VolumeId] = *volume
627+
klog.Infof("Tracked mounted volume %s at path %s", volume.VolumeId, volume.TargetPath)
548628

549629
if err := writeVolumeMap(ns.stateFile, ns.mountedVolumes); err != nil {
550630
klog.Errorf("Failed to persist volume state: %v", err)
@@ -645,20 +725,6 @@ func (ns *NodeServer) remountTrackedVolumes(ctx context.Context) error {
645725
}
646726
}
647727

648-
func (ns *NodeServer) WaitForMountAvailable(mountpoint string) error {
649-
for {
650-
select {
651-
case <-time.After(100 * time.Millisecond):
652-
notMnt, _ := ns.mounter.IsLikelyNotMountPoint(mountpoint)
653-
if !notMnt {
654-
return nil
655-
}
656-
case <-time.After(3 * time.Second):
657-
return errors.New("wait for mount available timeout")
658-
}
659-
}
660-
}
661-
662728
// Persist volume state to disk
663729
func writeVolumeMap(filename string, volumes map[string]MountedVolume) error {
664730
if filename == "" {

test/sanity_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,8 @@ provider=AWS`},
189189
"remotePath": "giab/",
190190
"secretKey": "cw_0x689RpI-jtRR7oE8h_eQsKImvJapLeSbXpwF4e4=",
191191
"configData": `[my-s3]
192-
type=<sensitive>
193-
provider=AWS`},
192+
type=s3
193+
provider=AWS`}, // The type has to be set to something valid or rclone fails to find the proper backend plugin.
194194
Type: "Opaque",
195195
}, metav1.CreateOptions{})
196196
className := "csi-rclone-secret-annotation"
@@ -227,7 +227,7 @@ provider=AWS`},
227227
})
228228
})
229229

230-
Context("Serets from annotations with decryption", Ordered, func() {
230+
Context("Secrets from annotations with decryption", Ordered, func() {
231231
var cfg *sanity.TestConfig = &sanity.TestConfig{}
232232
var testCtx *sanity.TestContext = &sanity.TestContext{}
233233

0 commit comments

Comments
 (0)