Skip to content

Commit d69e59e

Browse files
authored
Enable CSI ephemeral volumes support. (#97)
Enabled CSI ephemeral volumes to work with hpp Signed-off-by: Alexander Wels <[email protected]>
1 parent 2524b6c commit d69e59e

File tree

6 files changed

+155
-22
lines changed

6 files changed

+155
-22
lines changed

hack/test-driver.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,6 @@ DriverInfo:
1919
capacity: true
2020
TopologyKeys:
2121
- topology.hostpath.csi/node
22+
InlineVolumes:
23+
- shared: true
24+

pkg/hostpath/controllerserver.go

+2-17
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ package hostpath
1818

1919
import (
2020
"fmt"
21-
"io/ioutil"
2221
"path/filepath"
23-
"sort"
2422

2523
"github.com/golang/protobuf/ptypes/wrappers"
2624
"golang.org/x/net/context"
@@ -172,7 +170,7 @@ func (hpc *hostPathController) DeleteVolume(ctx context.Context, req *csi.Delete
172170
}
173171
if volumePath != "" {
174172
if err := DeleteVolume(filepath.Dir(volumePath), req.GetVolumeId()); err != nil {
175-
return nil, fmt.Errorf("failed to delete volume %v: %w", req.GetVolumeId(), err)
173+
return nil, fmt.Errorf("failed to delete volume %s: %v", req.GetVolumeId(), err)
176174
}
177175
klog.V(4).Infof("volume %v successfully deleted", req.GetVolumeId())
178176
}
@@ -245,20 +243,7 @@ func (hpc *hostPathController) GetCapacity(ctx context.Context, req *csi.GetCapa
245243
}
246244

247245
func (hpc *hostPathController) getVolumeDirectories() ([]string, error) {
248-
directories := make([]string, 0)
249-
for _, path := range hpc.cfg.StoragePoolDataDir {
250-
files, err := ioutil.ReadDir(path)
251-
if err != nil {
252-
return nil, err
253-
}
254-
for _, file := range files {
255-
if file.IsDir() {
256-
directories = append(directories, filepath.Join(path, file.Name()))
257-
}
258-
}
259-
}
260-
sort.Strings(directories)
261-
return directories, nil
246+
return getVolumeDirectories(hpc.cfg.StoragePoolDataDir)
262247
}
263248

264249
func (hpc *hostPathController) validateListVolumesRequest(req *csi.ListVolumesRequest) error {

pkg/hostpath/hostpath.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type Config struct {
3939
Endpoint string
4040
NodeID string
4141
StoragePoolDataDir map[string]string
42+
DefaultStoragePoolName string
4243
Version string
4344
Mounter mount.Interface
4445
}
@@ -68,13 +69,16 @@ func NewHostPathDriver(cfg *Config, dataDir string) (*hostPath, error) {
6869
if cfg.Mounter == nil {
6970
cfg.Mounter = mount.New("")
7071
}
71-
cfg.StoragePoolDataDir = make(map[string]string, 0)
72+
cfg.StoragePoolDataDir = make(map[string]string)
7273

7374
storagePools := make([]StoragePoolInfo, 0)
7475
if err := json.Unmarshal([]byte(dataDir), &storagePools); err != nil {
7576
return nil, errors.New("unable to parse storage pool info")
7677
}
7778
for _, storagePool := range storagePools {
79+
if len(cfg.DefaultStoragePoolName) == 0 {
80+
cfg.DefaultStoragePoolName = storagePool.Name
81+
}
7882
cfg.StoragePoolDataDir[storagePool.Name] = storagePool.Path
7983
}
8084

pkg/hostpath/nodeserver.go

+67-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ import (
2929
"k8s.io/utils/mount"
3030
)
3131

32-
const TopologyKeyNode = "topology.hostpath.csi/node"
32+
const (
33+
TopologyKeyNode = "topology.hostpath.csi/node"
34+
ephemeralContextKey = "csi.storage.k8s.io/ephemeral"
35+
)
3336

3437
type hostPathNode struct {
3538
cfg *Config
@@ -135,7 +138,15 @@ func (hpn *hostPathNode) mountVolume(targetPath string, req *csi.NodePublishVolu
135138
storagePoolName := getStoragePoolNameFromMap(req.GetVolumeContext())
136139

137140
mounter := hpn.cfg.Mounter
138-
path := filepath.Join(hpn.cfg.StoragePoolDataDir[storagePoolName], volumeId)
141+
path := ""
142+
if isEphemeralVolumeRequest(req) {
143+
path = hpn.getEphemeralVolumePath(storagePoolName, volumeId)
144+
if err := CreateVolume(filepath.Dir(path), volumeId); err != nil {
145+
return fmt.Errorf("failed to create ephemeral volume %v: %w", volumeId, err)
146+
}
147+
} else {
148+
path = filepath.Join(hpn.cfg.StoragePoolDataDir[storagePoolName], volumeId)
149+
}
139150

140151
if err := mounter.Mount(path, targetPath, fsType, options); err != nil {
141152
var errList strings.Builder
@@ -145,11 +156,33 @@ func (hpn *hostPathNode) mountVolume(targetPath string, req *csi.NodePublishVolu
145156
errList.WriteString(err.Error())
146157
}
147158
errList.WriteString(fmt.Sprintf("%v", fileInfo.Mode()))
159+
if isEphemeralVolumeRequest(req) {
160+
if rmErr := os.RemoveAll(path); rmErr != nil && !os.IsNotExist(rmErr) {
161+
errList.WriteString(fmt.Sprintf(" :%s", rmErr.Error()))
162+
}
163+
}
164+
148165
return fmt.Errorf("failed to mount device: %s at %s: %s", path, targetPath, errList.String())
149166
}
150167
return nil
151168
}
152169

170+
func isEphemeralVolumeRequest(req *csi.NodePublishVolumeRequest) bool {
171+
return req.GetVolumeContext()[ephemeralContextKey] == "true"
172+
}
173+
174+
func isEphemeralVolumeId(volumeId string) bool {
175+
return strings.HasPrefix(volumeId, "csi")
176+
}
177+
178+
func (hpn *hostPathNode) getEphemeralVolumePath(storagePoolName, volumeId string) string {
179+
storagePoolPath := hpn.cfg.StoragePoolDataDir[storagePoolName]
180+
if len(storagePoolPath) == 0 {
181+
storagePoolPath = hpn.cfg.StoragePoolDataDir[hpn.cfg.DefaultStoragePoolName]
182+
}
183+
return filepath.Join(storagePoolPath, volumeId)
184+
}
185+
153186
func (hpn *hostPathNode) validateNodeUnpublishRequest(req *csi.NodeUnpublishVolumeRequest) error {
154187
// Check arguments
155188
if len(req.GetVolumeId()) == 0 {
@@ -162,6 +195,9 @@ func (hpn *hostPathNode) validateNodeUnpublishRequest(req *csi.NodeUnpublishVolu
162195
}
163196

164197
func (hpn *hostPathNode) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
198+
if req != nil {
199+
klog.V(3).Infof("Node Unpublish Request: %+v", *req)
200+
}
165201
if err := hpn.validateNodeUnpublishRequest(req); err != nil {
166202
return nil, err
167203
}
@@ -188,10 +224,38 @@ func (hpn *hostPathNode) NodeUnpublishVolume(ctx context.Context, req *csi.NodeU
188224
return nil, fmt.Errorf("remove target path: %w", err)
189225
}
190226
klog.V(4).Infof("hostpath: volume %s has been unpublished.", targetPath)
191-
227+
if isEphemeralVolumeId(req.GetVolumeId()) {
228+
if err := hpn.removeEphemeralPath(req.GetVolumeId()); err != nil {
229+
return nil, fmt.Errorf("failed to delete ephemeral volume: %s, %v", req.GetVolumeId(), err)
230+
}
231+
}
192232
return &csi.NodeUnpublishVolumeResponse{}, nil
193233
}
194234

235+
func (hpn *hostPathNode) removeEphemeralPath(volumeId string) error {
236+
volumeDirs, err := hpn.getVolumeDirectories()
237+
if err != nil {
238+
return err
239+
}
240+
volumePath := ""
241+
for _, volumeDir := range volumeDirs {
242+
if filepath.Base(volumeDir) == volumeId {
243+
volumePath = volumeDir
244+
}
245+
}
246+
if volumePath != "" {
247+
if err := DeleteVolume(filepath.Dir(volumePath), volumeId); err != nil {
248+
return fmt.Errorf("failed to delete ephemeral volume %s: %v", volumeId, err)
249+
}
250+
klog.V(4).Infof("ephemeral volume %v successfully deleted", volumeId)
251+
}
252+
return nil
253+
}
254+
255+
func (hpn *hostPathNode) getVolumeDirectories() ([]string, error) {
256+
return getVolumeDirectories(hpn.cfg.StoragePoolDataDir)
257+
}
258+
195259
func (hpn *hostPathNode) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
196260
return nil, status.Error(codes.Unimplemented, "NodeStageVolume is not supported")
197261
}

pkg/hostpath/nodeserver_test.go

+58
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,64 @@ func Test_NodePublishVolumeValidMountPoint(t *testing.T) {
387387
Expect(mountPoint.Opts).To(ContainElement("bind"))
388388
}
389389

390+
func Test_NodePublishVolumeEphemeral(t *testing.T) {
391+
RegisterTestingT(t)
392+
tempDir, err := ioutil.TempDir(os.TempDir(), "")
393+
Expect(err).ToNot(HaveOccurred())
394+
defer os.RemoveAll(tempDir)
395+
nodeServer := createNodeServer(testNode)
396+
fakeMounter := mount.NewFakeMounter([]mount.MountPoint{})
397+
nodeServer.cfg.Mounter = fakeMounter
398+
nodeServer.cfg.DefaultStoragePoolName = "test"
399+
nodeServer.cfg.StoragePoolDataDir = make(map[string]string)
400+
nodeServer.cfg.StoragePoolDataDir["test"] = tempDir
401+
_, err = nodeServer.NodePublishVolume(context.TODO(), &csi.NodePublishVolumeRequest{
402+
VolumeId: "csi-abcd",
403+
VolumeContext: map[string]string{
404+
ephemeralContextKey: "true",
405+
},
406+
TargetPath: filepath.Join(tempDir, validVolId),
407+
VolumeCapability: &csi.VolumeCapability{
408+
AccessType: &csi.VolumeCapability_Mount{
409+
Mount: &csi.VolumeCapability_MountVolume{
410+
FsType: "ext4",
411+
},
412+
},
413+
},
414+
})
415+
Expect(err).ToNot(HaveOccurred())
416+
info, err := os.Stat(filepath.Join(tempDir, "csi-abcd"))
417+
Expect(err).ToNot(HaveOccurred())
418+
Expect(info.IsDir()).To(BeTrue())
419+
Expect(len(fakeMounter.GetLog())).To(Equal(1))
420+
mountAction := fakeMounter.GetLog()[0]
421+
Expect(mountAction.Action).To(Equal(mount.FakeActionMount))
422+
Expect(mountAction.Target).To(Equal(filepath.Join(tempDir, validVolId)))
423+
Expect(mountAction.Source).To(Equal(filepath.Join(tempDir, "csi-abcd")))
424+
Expect(mountAction.FSType).To(Equal("ext4"))
425+
Expect(len(fakeMounter.MountPoints)).To(Equal(1))
426+
mountPoint := fakeMounter.MountPoints[0]
427+
Expect(mountPoint.Opts).To(ContainElement("bind"))
428+
t.Log("Unpublish ephemeral")
429+
_, err = nodeServer.NodeUnpublishVolume(context.TODO(), &csi.NodeUnpublishVolumeRequest{
430+
VolumeId: "csi-abcd",
431+
TargetPath: filepath.Join(tempDir, validVolId),
432+
})
433+
Expect(err).ToNot(HaveOccurred())
434+
_, err = os.Stat(filepath.Join(tempDir, "csi-abcd"))
435+
Expect(err).To(HaveOccurred())
436+
Expect(os.IsNotExist(err)).To(BeTrue())
437+
t.Log("Unpublish ephemeral again")
438+
_, err = nodeServer.NodeUnpublishVolume(context.TODO(), &csi.NodeUnpublishVolumeRequest{
439+
VolumeId: "csi-abcd",
440+
TargetPath: filepath.Join(tempDir, validVolId),
441+
})
442+
Expect(err).ToNot(HaveOccurred())
443+
_, err = os.Stat(filepath.Join(tempDir, "csi-abcd"))
444+
Expect(err).To(HaveOccurred())
445+
Expect(os.IsNotExist(err)).To(BeTrue())
446+
}
447+
390448
func Test_NodePublishVolumeValidMountPointReadOnly(t *testing.T) {
391449
RegisterTestingT(t)
392450
tempDir, err := ioutil.TempDir(os.TempDir(), "")

pkg/hostpath/utils.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ limitations under the License.
1616
package hostpath
1717

1818
import (
19+
"io/ioutil"
1920
"os"
2021
"path/filepath"
22+
"sort"
2123

2224
"k8s.io/klog/v2"
2325
)
@@ -105,4 +107,21 @@ func getStoragePoolNameFromMap(params map[string]string) string {
105107
return params[storagePoolName]
106108
}
107109
return legacyStoragePoolName
108-
}
110+
}
111+
112+
func getVolumeDirectories(storagePoolDataDirs map[string]string) ([]string, error) {
113+
directories := make([]string, 0)
114+
for _, path := range storagePoolDataDirs {
115+
files, err := ioutil.ReadDir(path)
116+
if err != nil {
117+
return nil, err
118+
}
119+
for _, file := range files {
120+
if file.IsDir() {
121+
directories = append(directories, filepath.Join(path, file.Name()))
122+
}
123+
}
124+
}
125+
sort.Strings(directories)
126+
return directories, nil
127+
}

0 commit comments

Comments
 (0)