Skip to content

Commit fea65b7

Browse files
committed
Support local disk cache for ebs
1 parent b7cf453 commit fea65b7

5 files changed

Lines changed: 399 additions & 2 deletions

File tree

deploy/charts/alibaba-cloud-csi-driver/templates/plugin.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ spec:
208208
- mountPath: /host/sys/fs/cgroup
209209
name: cgroup
210210
mountPropagation: "HostToContainer"
211+
- mountPath: /var/alibaba-cloud-csi
212+
name: var-csi
213+
mountPropagation: "HostToContainer"
211214
- mountPath: /etc/csi-plugin/config
212215
name: csi-plugin-cm
213216
{{- if $nodePool.csi.local.enabled }}
@@ -377,6 +380,10 @@ spec:
377380
hostPath:
378381
path: /sys/fs/cgroup
379382
type: Directory
383+
- name: var-csi
384+
hostPath:
385+
path: /var/alibaba-cloud-csi
386+
type: DirectoryOrCreate
380387
- name: host-dev
381388
hostPath:
382389
path: /dev

pkg/disk/controllerserver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type diskVolumeArgs struct {
8181
ProvisionedIops int64
8282
BurstingEnabled bool
8383
RequestGB int64
84+
DataCache dataCache
8485
}
8586

8687
var delVolumeSnap sync.Map

pkg/disk/data_cache.go

Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
//go:build linux
2+
3+
package disk
4+
5+
import (
6+
"bytes"
7+
"errors"
8+
"fmt"
9+
"os"
10+
"path/filepath"
11+
"structs"
12+
"syscall"
13+
"unsafe"
14+
15+
"golang.org/x/sys/unix"
16+
"k8s.io/apimachinery/pkg/api/resource"
17+
"k8s.io/klog/v2"
18+
)
19+
20+
type DataCacheMode string
21+
22+
const (
23+
DataCacheWritethrough DataCacheMode = "writethrough"
24+
DataCacheWriteback DataCacheMode = "writeback"
25+
)
26+
27+
type dataCache struct {
28+
Size resource.Quantity
29+
Mode DataCacheMode
30+
}
31+
32+
const (
33+
DataCacheModeKey = "dataCacheMode"
34+
DataCacheSizeKey = "dataCacheSize"
35+
)
36+
37+
func getDataCacheOpts(opts map[string]string, d *dataCache) error {
38+
if s := opts[DataCacheSizeKey]; s != "" {
39+
size, err := resource.ParseQuantity(s)
40+
if err != nil {
41+
return fmt.Errorf("invalid %s: %w", DataCacheSizeKey, err)
42+
}
43+
d.Size = size
44+
}
45+
46+
switch m := DataCacheMode(opts[DataCacheModeKey]); m {
47+
case "", DataCacheWriteback, DataCacheWritethrough:
48+
d.Mode = m
49+
default:
50+
return fmt.Errorf("unrecognized %s: %s", DataCacheModeKey, m)
51+
}
52+
53+
if d.Mode != "" || !d.Size.IsZero() {
54+
if d.Size.IsZero() {
55+
return fmt.Errorf("must specify non-zero %s for dataCache", DataCacheSizeKey)
56+
}
57+
if d.Mode == "" {
58+
d.Mode = DataCacheWritethrough
59+
}
60+
}
61+
return nil
62+
}
63+
64+
func loggedClose(logger klog.Logger, fd int) {
65+
if err := unix.Close(fd); err != nil {
66+
logger.Error(err, "failed to close fd", "fd", fd)
67+
}
68+
}
69+
70+
func fallocate(path string, size int64) (int, error) {
71+
fd, err := unix.Open(path, unix.O_CREAT|unix.O_RDWR|unix.O_CLOEXEC, 0600)
72+
if err != nil {
73+
return 0, fmt.Errorf("failed to open %q: %w", path, err)
74+
}
75+
76+
err = unix.Fallocate(fd, 0, 0, size)
77+
if err != nil {
78+
unix.Close(fd)
79+
return 0, fmt.Errorf("failed to allocate space for %q: %w", path, err)
80+
}
81+
return fd, nil
82+
}
83+
84+
func loopGetFree() (string, error) {
85+
loopCtrl, err := unix.Open("/dev/loop-control", unix.O_RDWR|unix.O_CLOEXEC, 0)
86+
if err != nil {
87+
return "", fmt.Errorf("failed to open loop control device: %w", err)
88+
}
89+
90+
slot, err := unix.IoctlRetInt(loopCtrl, unix.LOOP_CTL_GET_FREE)
91+
errClose := unix.Close(loopCtrl)
92+
if err != nil {
93+
return "", errors.Join(fmt.Errorf("failed to get loop device slot: %w", err), errClose)
94+
}
95+
96+
return fmt.Sprintf("/dev/loop%d", slot), errClose
97+
}
98+
99+
func allocCacheFile(logger klog.Logger, path string, size int64) (string, int, error) {
100+
fd, err := fallocate(path, size)
101+
if err != nil {
102+
return "", 0, err
103+
}
104+
defer loggedClose(logger, fd)
105+
106+
loopPath, err := loopGetFree()
107+
if err != nil {
108+
return "", 0, err
109+
}
110+
111+
loop, err := unix.Open(loopPath, unix.O_RDWR|unix.O_CLOEXEC, 0)
112+
if err != nil {
113+
return "", 0, fmt.Errorf("failed to open loop device %s: %w", loopPath, err)
114+
}
115+
116+
// Close loop fd on error; skip close only on success.
117+
success := false
118+
defer func() {
119+
if !success {
120+
loggedClose(logger, loop)
121+
}
122+
}()
123+
124+
conf := unix.LoopConfig{
125+
Fd: uint32(fd),
126+
Size: 4 << 10,
127+
Info: unix.LoopInfo64{
128+
Flags: unix.LO_FLAGS_DIRECT_IO | unix.LO_FLAGS_AUTOCLEAR,
129+
},
130+
}
131+
copy(conf.Info.File_name[:], path)
132+
err = unix.IoctlLoopConfigure(loop, &conf) // Since Linux kernel 5.8
133+
if err != nil {
134+
return "", 0, fmt.Errorf("failed to configure loop device %s: %w", loopPath, err)
135+
}
136+
137+
success = true
138+
return loopPath, loop, nil
139+
}
140+
141+
func dmIoctl(fd int, action uintptr, volumeID string, flags uint32) syscall.Errno {
142+
dm := unix.DmIoctl{
143+
Version: [3]uint32{4, 0, 0},
144+
Data_size: unix.SizeofDmIoctl,
145+
Data_start: unix.SizeofDmIoctl,
146+
Flags: flags,
147+
}
148+
copy(dm.Name[:], volumeID)
149+
_, _, err := unix.Syscall(unix.SYS_IOCTL, uintptr(fd), action, uintptr(unsafe.Pointer(&dm)))
150+
return err
151+
}
152+
153+
type dmiT struct {
154+
structs.HostLayout
155+
unix.DmIoctl
156+
unix.DmTargetSpec
157+
Args [3744]byte // pad to make dmiT exactly 4096 bytes
158+
}
159+
160+
// Compile-time assertions: dmiT must be exactly 4096 bytes for dm-ioctl.
161+
var _ [4096 - unsafe.Sizeof(dmiT{})]byte
162+
var _ [unsafe.Sizeof(dmiT{}) - 4096]byte
163+
164+
func updateTable(dmCtrl int, volumeID string, size uint64, args string) error {
165+
dmi := dmiT{
166+
DmIoctl: unix.DmIoctl{
167+
Version: [3]uint32{4, 0, 0},
168+
Data_size: uint32(unsafe.Sizeof(dmiT{})),
169+
Data_start: unix.SizeofDmIoctl,
170+
Target_count: 1,
171+
},
172+
DmTargetSpec: unix.DmTargetSpec{
173+
Sector_start: 0,
174+
Length: size,
175+
Target_type: [16]byte{'c', 'a', 'c', 'h', 'e'},
176+
},
177+
}
178+
copy(dmi.Name[:], volumeID)
179+
copy(dmi.Args[:], args)
180+
_, _, errno := unix.Syscall(unix.SYS_IOCTL, uintptr(dmCtrl), unix.DM_TABLE_LOAD, uintptr(unsafe.Pointer(&dmi)))
181+
if errno != 0 {
182+
return fmt.Errorf("failed to load device-mapper table: %w", errno)
183+
}
184+
185+
errno = dmIoctl(dmCtrl, unix.DM_DEV_SUSPEND, volumeID, unix.DM_NOFLUSH_FLAG|unix.DM_SKIP_LOCKFS_FLAG)
186+
if errno != 0 {
187+
return fmt.Errorf("failed to resume device-mapper device: %w", errno)
188+
}
189+
return nil
190+
}
191+
192+
// setupDmCache creates a dm-cache device. size is cloud disk size in 512-byte sectors.
193+
func setupDmCache(logger klog.Logger, args string, size uint64, volumeID string) error {
194+
if len(args) > len(dmiT{}.Args) {
195+
return fmt.Errorf("args too long")
196+
}
197+
198+
dmCtrl, err := unix.Open("/dev/mapper/control", unix.O_RDWR|unix.O_CLOEXEC, 0)
199+
if err != nil {
200+
return fmt.Errorf("failed to open /dev/mapper/control: %w", err)
201+
}
202+
defer loggedClose(logger, dmCtrl)
203+
204+
errno := dmIoctl(dmCtrl, unix.DM_DEV_CREATE, volumeID, 0)
205+
if errno != 0 {
206+
return fmt.Errorf("failed to create device-mapper device: %w", errno)
207+
}
208+
209+
err = updateTable(dmCtrl, volumeID, size, args)
210+
if err != nil {
211+
errno := dmIoctl(dmCtrl, unix.DM_DEV_REMOVE, volumeID, 0)
212+
if errno != 0 {
213+
return fmt.Errorf("%w, cleanup also failed: %v, need manual cleanup", err, errno)
214+
}
215+
return err
216+
}
217+
logger.V(2).Info("setup dm-cache", "args", args, "size", size)
218+
return nil
219+
}
220+
221+
// resizeDmCache resizes an existing dm-cache device. size is in 512-byte sectors.
222+
func resizeDmCache(logger klog.Logger, size uint64, volumeID string) error {
223+
dmCtrl, err := unix.Open("/dev/mapper/control", unix.O_RDWR|unix.O_CLOEXEC, 0)
224+
if err != nil {
225+
return fmt.Errorf("failed to open /dev/mapper/control: %w", err)
226+
}
227+
defer loggedClose(logger, dmCtrl)
228+
229+
// Get current active table for args
230+
dmi := dmiT{
231+
DmIoctl: unix.DmIoctl{
232+
Version: [3]uint32{4, 0, 0},
233+
Data_size: uint32(unsafe.Sizeof(dmiT{})),
234+
Data_start: unix.SizeofDmIoctl,
235+
Flags: unix.DM_STATUS_TABLE_FLAG,
236+
},
237+
}
238+
copy(dmi.Name[:], volumeID)
239+
_, _, errno := unix.Syscall(unix.SYS_IOCTL, uintptr(dmCtrl), unix.DM_TABLE_STATUS, uintptr(unsafe.Pointer(&dmi)))
240+
if errno != 0 {
241+
return fmt.Errorf("failed to get current table: %w", errno)
242+
}
243+
244+
if dmi.Flags&unix.DM_ACTIVE_PRESENT_FLAG == 0 {
245+
return fmt.Errorf("device-mapper device is not active")
246+
}
247+
if dmi.Target_count != 1 {
248+
return fmt.Errorf("device-mapper device has %d targets", dmi.Target_count)
249+
}
250+
var args string
251+
nullIdx := bytes.IndexByte(dmi.Args[:], 0)
252+
if nullIdx == -1 {
253+
args = string(dmi.Args[:])
254+
} else {
255+
args = string(dmi.Args[:nullIdx])
256+
}
257+
258+
logger.V(2).Info("resize dm-cache", "args", args, "size", size, "oldSize", dmi.Length)
259+
return updateTable(dmCtrl, volumeID, size, args)
260+
}
261+
262+
const DataCachePath = "/var/alibaba-cloud-csi/data-cache"
263+
264+
func cacheFilePath(volumeID string) (meta, data string) {
265+
meta = filepath.Join(DataCachePath, volumeID+".meta")
266+
data = filepath.Join(DataCachePath, volumeID+".data")
267+
return meta, data
268+
}
269+
270+
func dataCacheDevicePath(volumeID string) string {
271+
return "/dev/mapper/" + volumeID
272+
}
273+
274+
func setupDataCache(logger klog.Logger, d *dataCache, device, volumeID string) (string, error) {
275+
if d.Size.IsZero() {
276+
return device, nil // Not enabled
277+
}
278+
279+
mapperDev := dataCacheDevicePath(volumeID)
280+
var st unix.Stat_t
281+
if err := unix.Stat(mapperDev, &st); err == nil {
282+
return mapperDev, nil // Already setup
283+
} else if err != unix.ENOENT {
284+
return "", fmt.Errorf("failed to stat %s: %w", mapperDev, err)
285+
}
286+
287+
if len(volumeID) > len(unix.DmIoctl{}.Name) {
288+
return "", fmt.Errorf("volume ID %q is too long", volumeID)
289+
}
290+
291+
size := d.Size.Value()
292+
meta, data := cacheFilePath(volumeID)
293+
294+
dataLoop, dataFd, err := allocCacheFile(logger, data, size)
295+
if err != nil {
296+
if errors.Is(err, os.ErrNotExist) {
297+
logger.V(1).Info("data cache path not exist on node, proceed without cache")
298+
return device, nil
299+
}
300+
return "", fmt.Errorf("failed to allocate cache file: %v", err)
301+
}
302+
// Close loop FDs after setupDmCache so loop devices are not auto-removed.
303+
defer loggedClose(logger, dataFd)
304+
305+
metaLoop, metaFd, err := allocCacheFile(logger, meta, 16<<20) // TODO: determine the real size requirement
306+
if err != nil {
307+
return "", fmt.Errorf("failed to allocate meta file: %v", err)
308+
}
309+
defer loggedClose(logger, metaFd)
310+
311+
dSize := getBlockDeviceCapacity(device)
312+
if dSize <= 0 {
313+
return "", fmt.Errorf("failed to get capacity for device %s", device)
314+
}
315+
316+
args := fmt.Sprintf("%s %s %s 512 2 metadata2 %s mq 2 migration_threshold 4096", metaLoop, dataLoop, device, d.Mode)
317+
return mapperDev, setupDmCache(logger, args, uint64(dSize/512), volumeID)
318+
}
319+
320+
func teardownDmCache(logger klog.Logger, volumeID string) error {
321+
dmCtrl, err := unix.Open("/dev/mapper/control", unix.O_RDWR|unix.O_CLOEXEC, 0)
322+
if err != nil {
323+
return fmt.Errorf("failed to open /dev/mapper/control: %w", err)
324+
}
325+
defer loggedClose(logger, dmCtrl)
326+
327+
dm := unix.DmIoctl{
328+
Version: [3]uint32{4, 0, 0},
329+
Data_size: unix.SizeofDmIoctl,
330+
Data_start: unix.SizeofDmIoctl,
331+
}
332+
copy(dm.Name[:], volumeID)
333+
_, _, errno := unix.Syscall(unix.SYS_IOCTL, uintptr(dmCtrl), unix.DM_DEV_REMOVE, uintptr(unsafe.Pointer(&dm)))
334+
if errno != 0 {
335+
if errno == unix.ENXIO {
336+
logger.V(2).Info("dm-cache already removed")
337+
return nil
338+
}
339+
return fmt.Errorf("failed to remove device-mapper device: %w", errno)
340+
}
341+
logger.V(2).Info("teardown dm-cache")
342+
return nil
343+
}
344+
345+
func cleanFile(path string) error {
346+
err := os.RemoveAll(path)
347+
if errors.Is(err, os.ErrNotExist) {
348+
return nil
349+
}
350+
return err
351+
}
352+
353+
func teardownDataCache(logger klog.Logger, volumeID string) error {
354+
err := teardownDmCache(logger, volumeID)
355+
if err != nil {
356+
return err
357+
}
358+
// Note: loop device has LO_FLAGS_AUTOCLEAR set, so it is auto removed after teardownDmCache.
359+
360+
meta, data := cacheFilePath(volumeID)
361+
return errors.Join(cleanFile(meta), cleanFile(data))
362+
}

0 commit comments

Comments
 (0)