Skip to content

Commit 85c4d66

Browse files
Add start gcsfuse
1 parent f574402 commit 85c4d66

6 files changed

Lines changed: 150 additions & 25 deletions

File tree

cmd/csi_driver/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"net/http/pprof"
2525
"os"
2626
"os/signal"
27+
"path/filepath"
2728
"syscall"
2829
"time"
2930

@@ -246,6 +247,10 @@ func main() {
246247
},
247248
SharedMountOptions: &driver.SharedMountOptions{
248249
MounterPodImage: *mounterPodImage,
250+
FuseSocketDir: *fuseSocketDir,
251+
EmptyDirBasePath: func(podUID string) string {
252+
return filepath.Join(util.KubeletDir, "pods", podUID, "volumes", "kubernetes.io~empty-dir", util.SidecarContainerTmpVolumeName)
253+
},
249254
},
250255
}
251256

pkg/csi_driver/gcs_fuse_driver.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,12 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24-
"path/filepath"
2524

2625
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/auth"
2726
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/clientset"
2827
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/storage"
2928
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics"
3029
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/profiles"
31-
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/util"
3230
"google.golang.org/grpc/codes"
3331
"google.golang.org/grpc/status"
3432
"k8s.io/apimachinery/pkg/runtime"
@@ -54,6 +52,9 @@ type GoMemLimitOptions struct {
5452

5553
type SharedMountOptions struct {
5654
MounterPodImage string
55+
FuseSocketDir string
56+
// Needed to override the mounter pods emptydir base path, otherwise tests will try to write to var/lib/kubelet which it won't have access to.
57+
EmptyDirBasePath func(podUID string) string
5758
}
5859

5960
type GCSDriverFeatureOptions struct {
@@ -82,7 +83,6 @@ type GCSDriverConfig struct {
8283
FeatureOptions *GCSDriverFeatureOptions
8384
AssumeGoodSidecarVersion bool
8485
UniverseDomain string
85-
EmptyDirBasePath func(podUID string) string
8686
}
8787

8888
type GCSDriver struct {
@@ -112,11 +112,6 @@ func NewGCSDriver(config *GCSDriverConfig, recorder record.EventRecorder) (*GCSD
112112
if !config.RunController && !config.RunNode {
113113
return nil, errors.New("must run at least one controller or node service")
114114
}
115-
if config.EmptyDirBasePath == nil {
116-
config.EmptyDirBasePath = func(podUID string) string {
117-
return filepath.Join(util.KubeletDir, "pods", podUID, "volumes", "kubernetes.io~empty-dir", util.SidecarContainerTmpVolumeName)
118-
}
119-
}
120115

121116
driver := &GCSDriver{
122117
config: config,

pkg/csi_driver/gcs_fuse_driver_test.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func initTestDriver(t *testing.T, fm *mount.FakeMounter, clientset clientset.Int
4747
FeatureGCSFuseProfiles: &FeatureGCSFuseProfiles{},
4848
SharedMountOptions: &SharedMountOptions{
4949
MounterPodImage: testImage,
50+
FuseSocketDir: "/tmp/fuse-sockets",
5051
},
5152
},
5253
}
@@ -65,19 +66,25 @@ func initTestDriver(t *testing.T, fm *mount.FakeMounter, clientset clientset.Int
6566
func initTestDriverWithCustomNodeServer(t *testing.T, fm *mount.FakeMounter, clientSet *clientset.FakeClientset, wiNodeLabelCheck bool) *GCSDriver {
6667
t.Helper()
6768
config := &GCSDriverConfig{
68-
Name: "test-driver",
69-
NodeID: "test-node",
70-
Version: "test-version",
71-
RunController: true,
72-
RunNode: true,
73-
StorageServiceManager: storage.NewFakeServiceManager(),
74-
TokenManager: auth.NewFakeTokenManager(),
75-
Mounter: fm,
76-
NetworkManager: &fakeNetworkManager{},
77-
K8sClients: clientSet,
78-
MetricsManager: metrics.NewFakeMetricsManager(),
79-
WINodeLabelCheck: wiNodeLabelCheck,
80-
FeatureOptions: &GCSDriverFeatureOptions{FeatureGCSFuseProfiles: &FeatureGCSFuseProfiles{}},
69+
Name: "test-driver",
70+
NodeID: "test-node",
71+
Version: "test-version",
72+
RunController: true,
73+
RunNode: true,
74+
StorageServiceManager: storage.NewFakeServiceManager(),
75+
TokenManager: auth.NewFakeTokenManager(),
76+
Mounter: fm,
77+
NetworkManager: &fakeNetworkManager{},
78+
K8sClients: clientSet,
79+
MetricsManager: metrics.NewFakeMetricsManager(),
80+
WINodeLabelCheck: wiNodeLabelCheck,
81+
FeatureOptions: &GCSDriverFeatureOptions{
82+
FeatureGCSFuseProfiles: &FeatureGCSFuseProfiles{},
83+
SharedMountOptions: &SharedMountOptions{
84+
MounterPodImage: testImage,
85+
FuseSocketDir: "/tmp/fuse-sockets",
86+
},
87+
},
8188
AssumeGoodSidecarVersion: true,
8289
}
8390
driver, err := NewGCSDriver(config, nil)

pkg/csi_driver/node.go

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package driver
2020
import (
2121
"fmt"
2222
"os"
23+
"path/filepath"
2324
"strconv"
2425
"strings"
2526
"time"
@@ -32,7 +33,9 @@ import (
3233
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook"
3334
"golang.org/x/net/context"
3435
"golang.org/x/time/rate"
36+
"google.golang.org/grpc"
3537
"google.golang.org/grpc/codes"
38+
"google.golang.org/grpc/credentials/insecure"
3639
"google.golang.org/grpc/status"
3740
corev1 "k8s.io/api/core/v1"
3841
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -727,13 +730,75 @@ func (s *nodeServer) executeNodeStageVolume(ctx context.Context, req *csi.NodeSt
727730
}
728731

729732
// Wait for the mounter pod grpc server to be ready.
730-
if err := waitForMounterServer(ctx, clientset, podNamespace, podName, string(pod.UID), s.driver.config.EmptyDirBasePath); err != nil {
733+
if err := waitForMounterServer(ctx, clientset, podNamespace, podName, string(pod.UID), s.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath); err != nil {
734+
return nil, err
735+
}
736+
737+
podUID := string(pod.UID)
738+
739+
// Send GRPC to mounter pod to start GCSFuse.
740+
if err := s.mountToNode(ctx, podUID, stagingPath, req.GetVolumeId()); err != nil {
731741
return nil, err
732742
}
733743

734-
// TODO(FUECHR) Add start gcsfuse flow.
735744
klog.Infof("Mounter pod %s/%s is running and staging path %s is mounted", podNamespace, podName, stagingPath)
736745

737746
klog.Infof("NodeStageVolume succeeded on staging path %q for volume %q", stagingPath, req.GetVolumeId())
738747
return &csi.NodeStageVolumeResponse{}, nil
739748
}
749+
750+
// mountToNode connects to the mounter server, at which point it initializes the GCSFuse process.
751+
func (s *nodeServer) mountToNode(ctx context.Context, podUID, stagingPath, volumeID string) error {
752+
if s.driver.config.FeatureOptions == nil || s.driver.config.FeatureOptions.SharedMountOptions == nil {
753+
return status.Errorf(codes.Internal, "shared mount options are not fully configured")
754+
}
755+
756+
if s.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath == nil {
757+
return status.Errorf(codes.Internal, "empty dir base path must be provided for shared mount")
758+
}
759+
emptyDirBasePath := s.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath(podUID)
760+
socketFile := filepath.Join(emptyDirBasePath, mounterPodSocketFile)
761+
762+
// Create a symlink to bypass the 108-character limit for Unix domain sockets
763+
// when dialing the connection from the Node Server.
764+
if s.driver.config.FeatureOptions.SharedMountOptions.FuseSocketDir == "" {
765+
return status.Errorf(codes.Internal, "fuse socket dir must be provided for shared mount")
766+
}
767+
symlink := filepath.Join(s.driver.config.FeatureOptions.SharedMountOptions.FuseSocketDir, mounterPodSocketDir, podUID)
768+
if err := os.MkdirAll(filepath.Dir(symlink), 0750); err != nil {
769+
return status.Errorf(codes.Internal, "failed to create dir for symlink %q: %v", symlink, err)
770+
}
771+
772+
if err := os.Remove(symlink); err != nil && !os.IsNotExist(err) {
773+
klog.Errorf("failed to remove stale symlink %q: %v", symlink, err)
774+
}
775+
776+
if err := os.Symlink(socketFile, symlink); err != nil {
777+
return status.Errorf(codes.Internal, "failed to create symlink to %q: %v", socketFile, err)
778+
}
779+
defer os.Remove(symlink)
780+
781+
// Connect to the socket using the short symlink path.
782+
socketPath := fmt.Sprintf("unix:%s", symlink)
783+
conn, err := grpc.NewClient(socketPath, grpc.WithTransportCredentials(insecure.NewCredentials()))
784+
if err != nil {
785+
klog.Errorf("Failed to connect to the server: %v", err)
786+
return status.Errorf(codes.Internal, "failed to connect to the mounter pod grpc server: %v", err)
787+
}
788+
klog.Infof("Connected to MounterServer at %s", socketPath)
789+
defer conn.Close()
790+
791+
/*
792+
TODO(FUECHR): Implement the mounter client and mount request once we have a mounter service defined.
793+
c := mounter.NewMounterClient(conn)
794+
if _, err := c.Mount(ctx, &mounter.MountRequest{
795+
Mountpoint: stagingPath,
796+
VolumeID: volumeID,
797+
}); err != nil {
798+
return status.Errorf(codes.Internal, "failed to mount: %v", err)
799+
}
800+
*/
801+
802+
klog.Infof("Mount succeeded at staging target path %s", stagingPath)
803+
return nil
804+
}

pkg/csi_driver/node_test.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func TestExecuteNodeStageVolume(t *testing.T) {
325325
if !ok {
326326
t.Fatalf("Failed to cast NodeServer to *nodeServer")
327327
}
328-
ns.driver.config.EmptyDirBasePath = func(uid string) string {
328+
ns.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath = func(uid string) string {
329329
return filepath.Join(kubeletDir, "pods", uid, "volumes", "kubernetes.io~empty-dir", util.SidecarContainerTmpVolumeName)
330330
}
331331

@@ -982,7 +982,7 @@ func TestNodeStageVolume(t *testing.T) {
982982
if !ok {
983983
t.Fatalf("Failed to cast NodeServer to *nodeServer")
984984
}
985-
ns.driver.config.EmptyDirBasePath = func(uid string) string {
985+
ns.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath = func(uid string) string {
986986
return filepath.Join(kubeletDir, "pods", uid, "volumes", "kubernetes.io~empty-dir", util.SidecarContainerTmpVolumeName)
987987
}
988988

@@ -1001,6 +1001,58 @@ func TestNodeStageVolume(t *testing.T) {
10011001
}
10021002
}
10031003

1004+
func TestMountToNode(t *testing.T) {
1005+
tests := []struct {
1006+
name string
1007+
wantErr bool
1008+
}{
1009+
{
1010+
name: "successful mount to node - should succeed",
1011+
wantErr: false,
1012+
},
1013+
}
1014+
1015+
for _, tc := range tests {
1016+
t.Run(tc.name, func(t *testing.T) {
1017+
ctx := context.Background()
1018+
podUID := "test-pod-uid"
1019+
volumeID := "test-volume-id"
1020+
stagingPath := filepath.Join(t.TempDir(), "var/lib/kubelet/pods", podUID, "volumes/kubernetes.io~csi", volumeID, "mount")
1021+
1022+
testEnv := initTestNodeServer(t)
1023+
ns, ok := testEnv.ns.(*nodeServer)
1024+
if !ok {
1025+
t.Fatalf("Failed to cast NodeServer to *nodeServer")
1026+
}
1027+
1028+
emptyDirBasePath, err := util.PrepareEmptyDir(stagingPath, true)
1029+
if err != nil {
1030+
t.Fatalf("failed to prepare emptyDir path: %v", err)
1031+
}
1032+
ns.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath = func(uid string) string {
1033+
return emptyDirBasePath
1034+
}
1035+
1036+
// Create a dummy socket file for the client to connect to
1037+
socketFile := filepath.Join(emptyDirBasePath, mounterPodSocketFile)
1038+
if file, err := os.Create(socketFile); err == nil {
1039+
file.Close()
1040+
}
1041+
1042+
err = ns.mountToNode(ctx, podUID, stagingPath, volumeID)
1043+
if (err != nil) != tc.wantErr {
1044+
t.Fatalf("mountToNode() error = %v, wantErr %v", err, tc.wantErr)
1045+
}
1046+
1047+
// Verify that the symlink was removed (because it is deferred inside mountToNode)
1048+
symlink := filepath.Join(ns.driver.config.FeatureOptions.SharedMountOptions.FuseSocketDir, mounterPodSocketDir, podUID)
1049+
if _, err := os.Lstat(symlink); !os.IsNotExist(err) {
1050+
t.Errorf("expected symlink %q to be removed, but it still exists or had another error", symlink)
1051+
}
1052+
})
1053+
}
1054+
}
1055+
10041056
func validateMountPoint(t *testing.T, fm *mount.FakeMounter, e *mount.MountPoint, unexpectedOpts []string) {
10051057
t.Helper()
10061058
if e == nil {

pkg/csi_driver/shared_mount.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ const (
4646
mounterPodMountDir = "mount-dir"
4747
mounterPodSocketFile = "mounter.sock"
4848
mounterPodManagedImageKeyword = "managed"
49+
mounterPodSocketDir = "mount-socket"
4950
)
5051

5152
var (

0 commit comments

Comments
 (0)