Skip to content

Commit 3b9c2c9

Browse files
Add start gcsfuse
1 parent 517f728 commit 3b9c2c9

6 files changed

Lines changed: 144 additions & 21 deletions

File tree

cmd/csi_driver/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ func main() {
246246
},
247247
SharedMountOptions: &driver.SharedMountOptions{
248248
MounterPodImage: *mounterPodImage,
249+
FuseSocketDir: *fuseSocketDir,
249250
},
250251
}
251252

pkg/csi_driver/gcs_fuse_driver.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ type GoMemLimitOptions struct {
5353
}
5454

5555
type SharedMountOptions struct {
56-
MounterPodImage string
56+
MounterPodImage string
57+
FuseSocketDir string
58+
EmptyDirBasePath func(podUID string) string
5759
}
5860

5961
type GCSDriverFeatureOptions struct {
@@ -82,7 +84,6 @@ type GCSDriverConfig struct {
8284
FeatureOptions *GCSDriverFeatureOptions
8385
AssumeGoodSidecarVersion bool
8486
UniverseDomain string
85-
EmptyDirBasePath func(podUID string) string
8687
}
8788

8889
type GCSDriver struct {
@@ -112,8 +113,14 @@ func NewGCSDriver(config *GCSDriverConfig, recorder record.EventRecorder) (*GCSD
112113
if !config.RunController && !config.RunNode {
113114
return nil, errors.New("must run at least one controller or node service")
114115
}
115-
if config.EmptyDirBasePath == nil {
116-
config.EmptyDirBasePath = func(podUID string) string {
116+
if config.FeatureOptions == nil {
117+
config.FeatureOptions = &GCSDriverFeatureOptions{}
118+
}
119+
if config.FeatureOptions.SharedMountOptions == nil {
120+
config.FeatureOptions.SharedMountOptions = &SharedMountOptions{}
121+
}
122+
if config.FeatureOptions.SharedMountOptions.EmptyDirBasePath == nil {
123+
config.FeatureOptions.SharedMountOptions.EmptyDirBasePath = func(podUID string) string {
117124
return filepath.Join(util.KubeletDir, "pods", podUID, "volumes", "kubernetes.io~empty-dir", util.SidecarContainerTmpVolumeName)
118125
}
119126
}

pkg/csi_driver/gcs_fuse_driver_test.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func initTestDriver(t *testing.T, fm *mount.FakeMounter, clientset clientset.Int
4747
FeatureGCSFuseProfiles: &FeatureGCSFuseProfiles{},
4848
SharedMountOptions: &SharedMountOptions{
4949
MounterPodImage: testImage,
50+
FuseSocketDir: "/tmp/fuse-sockets",
5051
},
5152
},
5253
}
@@ -65,19 +66,25 @@ func initTestDriver(t *testing.T, fm *mount.FakeMounter, clientset clientset.Int
6566
func initTestDriverWithCustomNodeServer(t *testing.T, fm *mount.FakeMounter, clientSet *clientset.FakeClientset, wiNodeLabelCheck bool) *GCSDriver {
6667
t.Helper()
6768
config := &GCSDriverConfig{
68-
Name: "test-driver",
69-
NodeID: "test-node",
70-
Version: "test-version",
71-
RunController: true,
72-
RunNode: true,
73-
StorageServiceManager: storage.NewFakeServiceManager(),
74-
TokenManager: auth.NewFakeTokenManager(),
75-
Mounter: fm,
76-
NetworkManager: &fakeNetworkManager{},
77-
K8sClients: clientSet,
78-
MetricsManager: metrics.NewFakeMetricsManager(),
79-
WINodeLabelCheck: wiNodeLabelCheck,
80-
FeatureOptions: &GCSDriverFeatureOptions{FeatureGCSFuseProfiles: &FeatureGCSFuseProfiles{}},
69+
Name: "test-driver",
70+
NodeID: "test-node",
71+
Version: "test-version",
72+
RunController: true,
73+
RunNode: true,
74+
StorageServiceManager: storage.NewFakeServiceManager(),
75+
TokenManager: auth.NewFakeTokenManager(),
76+
Mounter: fm,
77+
NetworkManager: &fakeNetworkManager{},
78+
K8sClients: clientSet,
79+
MetricsManager: metrics.NewFakeMetricsManager(),
80+
WINodeLabelCheck: wiNodeLabelCheck,
81+
FeatureOptions: &GCSDriverFeatureOptions{
82+
FeatureGCSFuseProfiles: &FeatureGCSFuseProfiles{},
83+
SharedMountOptions: &SharedMountOptions{
84+
MounterPodImage: testImage,
85+
FuseSocketDir: "/tmp/fuse-sockets",
86+
},
87+
},
8188
AssumeGoodSidecarVersion: true,
8289
}
8390
driver, err := NewGCSDriver(config, nil)

pkg/csi_driver/node.go

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package driver
2020
import (
2121
"fmt"
2222
"os"
23+
"path/filepath"
2324
"strconv"
2425
"strings"
2526
"time"
@@ -32,7 +33,9 @@ import (
3233
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook"
3334
"golang.org/x/net/context"
3435
"golang.org/x/time/rate"
36+
"google.golang.org/grpc"
3537
"google.golang.org/grpc/codes"
38+
"google.golang.org/grpc/credentials/insecure"
3639
"google.golang.org/grpc/status"
3740
corev1 "k8s.io/api/core/v1"
3841
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -727,13 +730,68 @@ func (s *nodeServer) executeNodeStageVolume(ctx context.Context, req *csi.NodeSt
727730
}
728731

729732
// Wait for the mounter pod grpc server to be ready.
730-
if err := waitForMounterServer(ctx, clientset, podNamespace, podName, string(pod.UID), s.driver.config.EmptyDirBasePath); err != nil {
733+
if err := waitForMounterServer(ctx, clientset, podNamespace, podName, string(pod.UID), s.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath); err != nil {
734+
return nil, err
735+
}
736+
737+
podUID := string(pod.UID)
738+
739+
// Send GRPC to mounter pod to start GCSFuse.
740+
if err := s.mountToNode(ctx, podUID, stagingPath, req.GetVolumeId()); err != nil {
731741
return nil, err
732742
}
733743

734-
// TODO(FUECHR) Add start gcsfuse flow.
735744
klog.Infof("Mounter pod %s/%s is running and staging path %s is mounted", podNamespace, podName, stagingPath)
736745

737746
klog.Infof("NodeStageVolume succeeded on staging path %q for volume %q", stagingPath, req.GetVolumeId())
738747
return &csi.NodeStageVolumeResponse{}, nil
739748
}
749+
750+
// mountToNode connects to the mounter server, at which point it initializes the GCSFuse process.
751+
func (s *nodeServer) mountToNode(ctx context.Context, podUID, stagingPath, volumeID string) error {
752+
emptyDirBasePath := s.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath(podUID)
753+
socketFile := filepath.Join(emptyDirBasePath, mounterPodSocketFile)
754+
755+
// Create a symlink to bypass the 108-character limit for Unix domain sockets
756+
// when dialing the connection from the Node Server.
757+
if s.driver.config.FeatureOptions == nil || s.driver.config.FeatureOptions.SharedMountOptions == nil || s.driver.config.FeatureOptions.SharedMountOptions.FuseSocketDir == "" {
758+
return status.Errorf(codes.Internal, "fuse socket directory is not configured")
759+
}
760+
symlink := filepath.Join(s.driver.config.FeatureOptions.SharedMountOptions.FuseSocketDir, mounterPodSocketDir, podUID)
761+
if err := os.MkdirAll(filepath.Dir(symlink), 0750); err != nil {
762+
return status.Errorf(codes.Internal, "failed to create dir for symlink %q: %w", symlink, err)
763+
}
764+
765+
if err := os.Remove(symlink); err != nil && !os.IsNotExist(err) {
766+
klog.Errorf("failed to remove stale symlink %q: %v", symlink, err)
767+
}
768+
769+
if err := os.Symlink(socketFile, symlink); err != nil {
770+
return status.Errorf(codes.Internal, "failed to create symlink to %q: %w", socketFile, err)
771+
}
772+
defer os.Remove(symlink)
773+
774+
// Connect to the socket using the short symlink path.
775+
socketPath := fmt.Sprintf("unix:%s", symlink)
776+
conn, err := grpc.NewClient(socketPath, grpc.WithTransportCredentials(insecure.NewCredentials()))
777+
if err != nil {
778+
klog.Errorf("Failed to connect to the server: %v", err)
779+
return status.Errorf(codes.Internal, "failed to connect to the mounter pod grpc server: %v", err)
780+
}
781+
klog.Infof("Connected to MounterServer at %s", socketPath)
782+
defer conn.Close()
783+
784+
/*
785+
TODO(FUECHR): Implement the mounter client and mount request once we have a mounter service defined.
786+
c := mounter.NewMounterClient(conn)
787+
if _, err := c.Mount(ctx, &mounter.MountRequest{
788+
Mountpoint: stagingPath,
789+
VolumeID: volumeID,
790+
}); err != nil {
791+
return status.Errorf(codes.Internal, "failed to mount: %v", err)
792+
}
793+
*/
794+
795+
klog.Infof("Mount succeeded at staging target path %s", stagingPath)
796+
return nil
797+
}

pkg/csi_driver/node_test.go

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func TestExecuteNodeStageVolume(t *testing.T) {
325325
if !ok {
326326
t.Fatalf("Failed to cast NodeServer to *nodeServer")
327327
}
328-
ns.driver.config.EmptyDirBasePath = func(uid string) string {
328+
ns.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath = func(uid string) string {
329329
return filepath.Join(kubeletDir, "pods", uid, "volumes", "kubernetes.io~empty-dir", util.SidecarContainerTmpVolumeName)
330330
}
331331

@@ -982,7 +982,7 @@ func TestNodeStageVolume(t *testing.T) {
982982
if !ok {
983983
t.Fatalf("Failed to cast NodeServer to *nodeServer")
984984
}
985-
ns.driver.config.EmptyDirBasePath = func(uid string) string {
985+
ns.driver.config.FeatureOptions.SharedMountOptions.EmptyDirBasePath = func(uid string) string {
986986
return filepath.Join(kubeletDir, "pods", uid, "volumes", "kubernetes.io~empty-dir", util.SidecarContainerTmpVolumeName)
987987
}
988988

@@ -1001,6 +1001,55 @@ func TestNodeStageVolume(t *testing.T) {
10011001
}
10021002
}
10031003

1004+
func TestMountToNode(t *testing.T) {
1005+
tests := []struct {
1006+
name string
1007+
wantErr bool
1008+
}{
1009+
{
1010+
name: "successful mount to node - should succeed",
1011+
wantErr: false,
1012+
},
1013+
}
1014+
1015+
for _, tc := range tests {
1016+
t.Run(tc.name, func(t *testing.T) {
1017+
ctx := context.Background()
1018+
podUID := "test-pod-uid"
1019+
volumeID := "test-volume-id"
1020+
stagingPath := filepath.Join(t.TempDir(), "var/lib/kubelet/pods", podUID, "volumes/kubernetes.io~csi", volumeID, "mount")
1021+
1022+
testEnv := initTestNodeServer(t)
1023+
ns, ok := testEnv.ns.(*nodeServer)
1024+
if !ok {
1025+
t.Fatalf("Failed to cast NodeServer to *nodeServer")
1026+
}
1027+
1028+
emptyDirBasePath, err := util.PrepareEmptyDir(stagingPath, true)
1029+
if err != nil {
1030+
t.Fatalf("failed to prepare emptyDir path: %v", err)
1031+
}
1032+
1033+
// Create a dummy socket file for the client to connect to
1034+
socketFile := filepath.Join(emptyDirBasePath, mounterPodSocketFile)
1035+
if file, err := os.Create(socketFile); err == nil {
1036+
file.Close()
1037+
}
1038+
1039+
err = ns.mountToNode(ctx, podUID, stagingPath, volumeID)
1040+
if (err != nil) != tc.wantErr {
1041+
t.Fatalf("mountToNode() error = %v, wantErr %v", err, tc.wantErr)
1042+
}
1043+
1044+
// Verify that the symlink was removed (because it is deferred inside mountToNode)
1045+
symlink := filepath.Join(ns.driver.config.FeatureOptions.SharedMountOptions.FuseSocketDir, mounterPodSocketDir, podUID)
1046+
if _, err := os.Lstat(symlink); !os.IsNotExist(err) {
1047+
t.Errorf("expected symlink %q to be removed, but it still exists or had another error", symlink)
1048+
}
1049+
})
1050+
}
1051+
}
1052+
10041053
func validateMountPoint(t *testing.T, fm *mount.FakeMounter, e *mount.MountPoint, unexpectedOpts []string) {
10051054
t.Helper()
10061055
if e == nil {

pkg/csi_driver/shared_mount.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ const (
4646
mounterPodMountDir = "mount-dir"
4747
mounterPodSocketFile = "mounter.sock"
4848
mounterPodManagedImageKeyword = "managed"
49+
mounterPodSocketDir = "mount-socket"
4950
)
5051

5152
var (

0 commit comments

Comments
 (0)