Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
},
"ghcr.io/devcontainers-extra/features/kind:1": {},
"./rclone": {
"rclone_repository": "https://github.com/rclone/rclone.git",
"rclone_ref": "v1.65.2"
"rclone_repository": "https://github.com/SwissDataScienceCenter/rclone.git",
"rclone_ref": "v1.69.1+renku-1"
}
},
"overrideFeatureInstallOrder": [
Expand Down
16 changes: 6 additions & 10 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
FROM golang:1.23.0-bookworm AS build
ARG RCLONE_VERSION=v1.65.2
ARG RCLONE_ARCH=amd64
ARG RCLONE_OS=linux
ARG RCLONE_IMAGE_REPOSITORY="ghcr.io/swissdatasciencecenter/rclone"
ARG RCLONE_IMAGE_TAG="sha-7975d7a"
FROM ${RCLONE_IMAGE_REPOSITORY}:${RCLONE_IMAGE_TAG} AS rclone

FROM golang:1.23.8-bookworm AS build
COPY go.mod go.sum ./
COPY cmd/ ./cmd/
COPY pkg/ ./pkg/
RUN go build -o /csi-rclone cmd/csi-rclone-plugin/main.go
RUN apt-get update && apt-get install -y unzip && \
curl https://downloads.rclone.org/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-${RCLONE_OS}-${RCLONE_ARCH}.zip -o rclone.zip && \
unzip rclone.zip -d /rclone-unzip && \
chmod a+x /rclone-unzip/*/rclone && \
mv /rclone-unzip/*/rclone /

FROM debian:bookworm-slim
# NOTE: the rclone needs ca-certificates and fuse3 to successfully mount cloud storage
RUN apt-get update && apt-get install -y fuse3 ca-certificates && rm -rf /var/cache/apt/archives /var/lib/apt/lists/*
COPY --from=build /csi-rclone /csi-rclone
COPY --from=build /rclone /usr/bin/
COPY --from=rclone --chmod=755 /rclone /usr/bin/
ENTRYPOINT ["/csi-rclone"]
10 changes: 7 additions & 3 deletions cmd/csi-rclone-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
)

var (
endpoint string
nodeID string
endpoint string
nodeID string
cacheDir string
cacheSize string
)

func init() {
Expand Down Expand Up @@ -45,6 +47,8 @@ func main() {
runNode.MarkPersistentFlagRequired("nodeid")
runNode.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint")
runNode.MarkPersistentFlagRequired("endpoint")
runNode.PersistentFlags().StringVar(&cacheDir, "cachedir", "", "cache dir")
runNode.PersistentFlags().StringVar(&cacheSize, "cachesize", "", "cache size")
runCmd.AddCommand(runNode)
runController := &cobra.Command{
Use: "controller",
Expand Down Expand Up @@ -83,7 +87,7 @@ func handleNode() {
klog.Warningf("There was an error when trying to unmount old volumes: %v", err)
}
d := rclone.NewDriver(nodeID, endpoint)
ns, err := rclone.NewNodeServer(d.CSIDriver)
ns, err := rclone.NewNodeServer(d.CSIDriver, cacheDir, cacheSize)
if err != nil {
panic(err)
}
Expand Down
10 changes: 10 additions & 0 deletions deploy/csi-rclone/templates/csi-nodeplugin-rclone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ spec:
- node
- --nodeid=$(NODE_ID)
- --endpoint=$(CSI_ENDPOINT)
- --cachedir=$(CACHE_DIR)
- --cachesize=$(CACHE_SIZE)
env:
- name: NODE_ID
valueFrom:
Expand All @@ -76,6 +78,10 @@ spec:
value: {{ .Values.storageClassName | quote}}
- name: LOG_LEVEL
value: {{ .Values.logLevel | default "NOTICE" | quote }}
- name: CACHE_DIR
value: {{ .Values.csiNodepluginRclone.rclone.cache.dir | quote }}
- name: CACHE_SIZE
value: {{ .Values.csiNodepluginRclone.rclone.cache.size | quote }}
image: {{ .Values.csiNodepluginRclone.rclone.image.repository }}:{{ .Values.csiNodepluginRclone.rclone.image.tag | default .Chart.AppVersion }}
imagePullPolicy: {{ .Values.csiNodepluginRclone.rclone.imagePullPolicy }}
resources:
Expand All @@ -100,6 +106,8 @@ spec:
- mountPath: /var/lib/kubelet/pods
mountPropagation: Bidirectional
name: pods-mount-dir
- mountPath: /var/lib/rclone
name: cache-dir
{{- with .Values.csiNodepluginRclone.nodeSelector }}
nodeSelector:
{{ toYaml . | nindent 8 }}
Expand All @@ -125,3 +133,5 @@ spec:
path: {{ .Values.kubeletDir }}/plugins_registry
type: DirectoryOrCreate
name: registration-dir
- name: cache-dir
emptyDir:
6 changes: 6 additions & 0 deletions deploy/csi-rclone/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ csiNodepluginRclone:
# requests:
# cpu: 100m
# memory: 128M
# Options to configure rclone's caching with VFS
cache:
# The location of the cache directory
dir: /var/lib/rclone/cache
# The size allocated for the cache directory
size: 1G
serviceAccount:
annotations: {}
nodeSelector: {}
Expand Down
4 changes: 2 additions & 2 deletions pkg/rclone/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewDriver(nodeID, endpoint string) *Driver {
return d
}

func NewNodeServer(csiDriver *csicommon.CSIDriver) (*nodeServer, error) {
func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize string) (*nodeServer, error) {
kubeClient, err := kube.GetK8sClient()
if err != nil {
return nil, err
Expand All @@ -74,7 +74,7 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver) (*nodeServer, error) {
if err != nil {
return nil, fmt.Errorf("Cannot get a free TCP port to run rclone")
}
rcloneOps := NewRclone(kubeClient, rclonePort)
rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir, cacheSize)

return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver),
Expand Down
113 changes: 94 additions & 19 deletions pkg/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"os"
os_exec "os/exec"
"syscall"
"time"

"strings"

Expand Down Expand Up @@ -43,6 +42,8 @@ type Rclone struct {
kubeClient *kubernetes.Clientset
daemonCmd *os_exec.Cmd
port int
cacheDir string
cacheSize string
}

type RcloneVolume struct {
Expand All @@ -57,15 +58,62 @@ type MountRequest struct {
MountOpt MountOpt `json:"mountOpt"`
}

// VfsOpt is options for creating the vfs
//
// Note that the `Daemon` option has been removed as it is not accepted for rc calls.
type VfsOpt struct {
CacheMode string `json:"cacheMode"`
DirCacheTime time.Duration `json:"dirCacheTime"`
ReadOnly bool `json:"readOnly"`
NoSeek bool `json:",omitempty"` // don't allow seeking if set
NoChecksum bool `json:",omitempty"` // don't check checksums if set
ReadOnly bool `json:",omitempty"` // if set VFS is read only
NoModTime bool `json:",omitempty"` // don't read mod times for files
DirCacheTime string `json:",omitempty"` // how long to consider directory listing cache valid
Refresh bool `json:",omitempty"` // refreshes the directory listing recursively on start
PollInterval string `json:",omitempty"`
Umask int `json:",omitempty"`
UID uint32 `json:",omitempty"`
GID uint32 `json:",omitempty"`
DirPerms os.FileMode `json:",omitempty"`
FilePerms os.FileMode `json:",omitempty"`
ChunkSize string `json:",omitempty"` // if > 0 read files in chunks
ChunkSizeLimit string `json:",omitempty"` // if > ChunkSize double the chunk size after each chunk until reached
CacheMode string `json:",omitempty"`
CacheMaxAge string `json:",omitempty"`
CacheMaxSize string `json:",omitempty"`
CacheMinFreeSpace string `json:",omitempty"`
CachePollInterval string `json:",omitempty"`
CaseInsensitive bool `json:",omitempty"`
WriteWait string `json:",omitempty"` // time to wait for in-sequence write
ReadWait string `json:",omitempty"` // time to wait for in-sequence read
WriteBack string `json:",omitempty"` // time to wait before writing back dirty files
ReadAhead string `json:",omitempty"` // bytes to read ahead in cache mode "full"
UsedIsSize bool `json:",omitempty"` // if true, use the `rclone size` algorithm for Used size
FastFingerprint bool `json:",omitempty"` // if set use fast fingerprints
DiskSpaceTotalSize string `json:",omitempty"`
}

// Options for creating the mount
//
// Note that options not supported on Linux have been removed.
type MountOpt struct {
AllowNonEmpty bool `json:"allowNonEmpty"`
AllowOther bool `json:"allowOther"`
DebugFUSE bool `json:",omitempty"`
AllowNonEmpty bool `json:",omitempty"`
AllowRoot bool `json:",omitempty"`
AllowOther bool `json:",omitempty"`
DefaultPermissions bool `json:",omitempty"`
WritebackCache bool `json:",omitempty"`
DaemonWait string `json:",omitempty"` // time to wait for ready mount from daemon, maximum on Linux or constant on macOS/BSD
MaxReadAhead string `json:",omitempty"`
ExtraOptions []string `json:",omitempty"`
ExtraFlags []string `json:",omitempty"`
AttrTimeout string `json:",omitempty"` // how long the kernel caches attribute for
DeviceName string `json:",omitempty"`
VolumeName string `json:",omitempty"`
NoAppleDouble bool `json:",omitempty"`
NoAppleXattr bool `json:",omitempty"`
AsyncRead bool `json:",omitempty"`
CaseInsensitive string `json:",omitempty"`
}

type ConfigCreateRequest struct {
Name string `json:"name"`
Parameters map[string]string `json:"parameters"`
Expand Down Expand Up @@ -105,7 +153,7 @@ func (r *Rclone) Mount(ctx context.Context, rcloneVolume *RcloneVolume, targetPa
Parameters: params,
Opt: map[string]interface{}{"obscure": true},
}
klog.Infof("executing create config command args=%v, targetpath=%s", configName, targetPath)
klog.Infof("executing create config command name=%s, storageType=%s", configName, configOpts.StorageType)
postBody, err := json.Marshal(configOpts)
if err != nil {
return fmt.Errorf("mounting failed: couldn't create request body: %s", err)
Expand All @@ -121,31 +169,53 @@ func (r *Rclone) Mount(ctx context.Context, rcloneVolume *RcloneVolume, targetPa
}
klog.Infof("created config: %s", configName)

// VFS Mount parameters
vfsOpt := VfsOpt{
CacheMode: "writes",
DirCacheTime: "60s",
}
vfsOptStr := parameters["vfsOpt"]
if vfsOptStr != "" {
err = json.Unmarshal([]byte(vfsOptStr), &vfsOpt)
if err != nil {
return fmt.Errorf("could not parse vfsOpt: %w", err)
}
}
// The `ReadOnly` option is specified in the PVC
vfsOpt.ReadOnly = readOnly
// DiskSpaceTotalSize is not a global rclone option
vfsOpt.DiskSpaceTotalSize = r.cacheSize
// Mount parameters
mountOpt := MountOpt{
AllowNonEmpty: true,
AllowOther: true,
}
mountOptStr := parameters["mountOpt"]
if mountOptStr != "" {
err = json.Unmarshal([]byte(mountOptStr), &mountOpt)
if err != nil {
return fmt.Errorf("could not parse mountOpt: %w", err)
}
}

remoteWithPath := fmt.Sprintf("%s:%s", configName, rcloneVolume.RemotePath)
mountArgs := MountRequest{
Fs: remoteWithPath,
MountPoint: targetPath,
VfsOpt: VfsOpt{
CacheMode: "writes",
DirCacheTime: 60 * time.Second,
ReadOnly: readOnly,
},
MountOpt: MountOpt{
AllowNonEmpty: true,
AllowOther: true,
},
VfsOpt: vfsOpt,
MountOpt: mountOpt,
}

// create target, os.Mkdirall is noop if it exists
err = os.MkdirAll(targetPath, 0750)
if err != nil {
return err
}
klog.Infof("executing mount command args=%v, targetpath=%s", mountArgs, targetPath)
postBody, err = json.Marshal(mountArgs)
if err != nil {
return fmt.Errorf("mounting failed: couldn't create request body: %s", err)
}
klog.Infof("executing mount command args=%s", string(postBody))
requestBody = bytes.NewBuffer(postBody)
resp, err = http.Post(fmt.Sprintf("http://localhost:%d/mount/mount", r.port), "application/json", requestBody)
if err != nil {
Expand Down Expand Up @@ -282,11 +352,13 @@ func (r Rclone) GetVolumeById(ctx context.Context, volumeId string) (*RcloneVolu
return nil, ErrVolumeNotFound
}

func NewRclone(kubeClient *kubernetes.Clientset, port int) Operations {
func NewRclone(kubeClient *kubernetes.Clientset, port int, cacheDir string, cacheSize string) Operations {
rclone := &Rclone{
execute: exec.New(),
kubeClient: kubeClient,
port: port,
cacheDir: cacheDir,
cacheSize: cacheSize,
}
return rclone
}
Expand Down Expand Up @@ -348,13 +420,16 @@ func (r *Rclone) start_daemon() error {
rclone_args = append(rclone_args, "--cache-info-age=72h")
rclone_args = append(rclone_args, "--cache-chunk-clean-interval=15m")
rclone_args = append(rclone_args, "--rc-no-auth")
if r.cacheDir != "" {
rclone_args = append(rclone_args, fmt.Sprintf("--cache-dir=%s", r.cacheDir))
}
loglevel := os.Getenv("LOG_LEVEL")
if len(loglevel) == 0 {
loglevel = "NOTICE"
}
rclone_args = append(rclone_args, fmt.Sprintf("--log-level=%s", loglevel))
rclone_args = append(rclone_args, fmt.Sprintf("--config=%s", f.Name()))
klog.Infof("running rclone remote control daemon cmd=%s, args=%s, ", rclone_cmd, rclone_args)
klog.Infof("running rclone remote control daemon cmd=%s, args=%s", rclone_cmd, rclone_args)

env := os.Environ()
cmd := os_exec.Command(rclone_cmd, rclone_args...)
Expand Down
2 changes: 1 addition & 1 deletion test/sanity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var _ = Describe("Sanity CSI checks", Ordered, func() {
os.Setenv("DRIVER_NAME", "csi-rclone")
driver = rclone.NewDriver("hostname", endpoint)
cs := rclone.NewControllerServer(driver.CSIDriver)
ns, err := rclone.NewNodeServer(driver.CSIDriver)
ns, err := rclone.NewNodeServer(driver.CSIDriver, "", "")
Expect(err).ShouldNot(HaveOccurred())
driver.WithControllerServer(cs).WithNodeServer(ns)
go func() {
Expand Down