Skip to content

Commit bad5eb4

Browse files
committed
disk: replace conf file with xattr
We used to use /etc/kubernetes/volumes/disk/d-*.conf files to record the relationships between disks without serial number and the device path. But it has multiple drawbacks: - leak: we may fail to remove the files we created - inaccurate: if the disk is detached, switched driver, the device will gone without our knowledge. In this case, the conf file may points to a non-exist file, or even a wrong file. Replace the conf files with xattr, which we are already using to calculate number of volumes available on node. The xattrs are attached to the device inode, and will go with the inode. So that we don't need to worry about any cleanup or inaccuracy. Old conf files are migrated to xattr in one go, in the init container. As a bonus, the partition support is now more decoupled, and should work on more scenarios. xattrs are always attached to root block device, not partition.
1 parent 87b804e commit bad5eb4

File tree

8 files changed

+371
-167
lines changed

8 files changed

+371
-167
lines changed

build/lib/init.sh

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,46 @@ do
9292
fi
9393
done
9494

95+
migrate_disk_conf() {
96+
dev=$(cat "$1")
97+
if ! [ -e "$dev" ]; then
98+
echo "device $dev not found, skip"
99+
return 0
100+
fi
101+
if [ -h "$dev" ]; then
102+
echo "device $dev is a symlink, skip"
103+
return 0
104+
fi
105+
if ! [ -b "$dev" ]; then
106+
echo "device $dev is not a block device!"
107+
return 1
108+
fi
109+
sysfs_path=$(readlink -f "/sys/class/block/$(basename "$dev")")
110+
# check if dev is partition
111+
if [ "$(cat "$sysfs_path/partition")" = 1 ]; then
112+
# go up one level
113+
sysfs_path=$(dirname "$sysfs_path")
114+
dev=/dev/"$(basename "$sysfs_path")"
115+
fi
116+
117+
serial=$(cat "$sysfs_path/serial" || cat "$sysfs_path/device/serial")
118+
if [ -n "$serial" ]; then
119+
echo "device $dev has serial $serial"
120+
return 0
121+
fi
122+
123+
# device has no serial, it may be found by diff, we have to trust the conf
124+
disk_id=$(basename "$1")
125+
echo "device $dev has no serial, assigning disk ID $disk_id"
126+
setfattr -n trusted.csi-managed-disk -v "${disk_id}" "$dev"
127+
}
128+
129+
for conf in /host/etc/kubernetes/volumes/disk/d-*.conf; do
130+
echo "migrating disk conf: $conf"
131+
migrate_disk_conf "$conf" || exit 1
132+
rm -f "$conf"
133+
done
134+
95135
# config /etc/updatedb.config if needed
96136
if [ "$SKIP_UPDATEDB_CONFIG" != "true" ]; then
97137
## check cron.daily dir

pkg/disk/cloud.go

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ type DiskAttachDetach struct {
7474
detachThrottler *throttle.Throttler
7575
detaching sync.Map
7676

77-
dev *DeviceManager
77+
dev *DeviceManager
78+
devMap *devMap
7879
}
7980

8081
type DiskCreateDelete struct {
@@ -84,6 +85,28 @@ type DiskCreateDelete struct {
8485
deleteThrottler *throttle.Throttler
8586
}
8687

88+
// GetRootBlockDevice get device name
89+
func (ad *DiskAttachDetach) GetRootBlockDevice(logger klog.Logger, diskID string) (string, error) {
90+
device, err := ad.dev.GetRootBlockBySerial(strings.TrimPrefix(diskID, "d-"))
91+
if err == nil {
92+
return device, nil
93+
}
94+
device, err2 := ad.devMap.Get(logger, diskID)
95+
if device == "" {
96+
return "", errors.Join(err, err2) // err2 may be nil, which is OK
97+
}
98+
klog.Infof("GetRootBlockDevice: got disk %s device name %s from devMap", diskID, device)
99+
return device, nil
100+
}
101+
102+
func (ad *DiskAttachDetach) GetVolumeDeviceName(logger klog.Logger, diskID string) (string, error) {
103+
root, err := ad.GetRootBlockDevice(logger, diskID)
104+
if err != nil {
105+
return "", err
106+
}
107+
return ad.dev.adaptDevicePartition(root)
108+
}
109+
87110
func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) {
88111
after, err := ad.dev.ListBlocks()
89112
if err != nil {
@@ -123,6 +146,10 @@ func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial strin
123146
}
124147
if len(disks) == 1 {
125148
device = disks[0]
149+
err := ad.devMap.Add(diskID, device)
150+
if err != nil {
151+
return "", fmt.Errorf("failed to populate devMap: %v", err)
152+
}
126153
logger.V(2).Info("found device by diff", "device", device)
127154
break
128155
} else {
@@ -217,23 +244,18 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
217244
klog.Infof("AttachDisk: Disk %s is already attached to Instance %s, skipping", diskID, disk.InstanceId)
218245
return disk.SerialNumber, nil
219246
}
220-
deviceName, err := GetVolumeDeviceName(diskID)
221-
if err == nil && deviceName != "" && IsFileExisting(deviceName) {
222-
klog.Infof("AttachDisk: Disk %s is already attached to self Instance %s, and device is: %s", diskID, disk.InstanceId, deviceName)
223-
return deviceName, nil
224-
} else if disk.SerialNumber != "" {
225-
// wait for pci attach ready
226-
time.Sleep(5 * time.Second)
227-
klog.Infof("AttachDisk: find disk dev after 5 seconds")
228-
deviceName, err := GetVolumeDeviceName(diskID)
229-
if err == nil && deviceName != "" && IsFileExisting(deviceName) {
230-
klog.Infof("AttachDisk: Disk %s is already attached to self Instance %s, and device is: %s", diskID, disk.InstanceId, deviceName)
231-
return deviceName, nil
247+
if disk.SerialNumber != "" {
248+
return ad.dev.WaitRootBlock(ctx, disk.SerialNumber)
249+
} else {
250+
device, err := ad.devMap.Get(logger, diskID)
251+
if err != nil {
252+
return "", err
232253
}
233-
err = fmt.Errorf("AttachDisk: disk device cannot be found in node, diskid: %s, deviceName: %s, err: %+v", diskID, deviceName, err)
234-
return "", err
254+
if device != "" {
255+
return device, nil
256+
}
257+
klog.Warningf("AttachDisk: Disk (no serial) %s is already attached to instance %s, but device unknown, will be detached and try again", diskID, disk.InstanceId)
235258
}
236-
klog.Warningf("AttachDisk: Disk (no serial) %s is already attached to instance %s, but device unknown, will be detached and try again", diskID, disk.InstanceId)
237259
}
238260

239261
if GlobalConfigVar.DiskBdfEnable {

pkg/disk/csi_agent.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ func NewCSIAgent() *CSIAgent {
3232
k8smounter: k8smount.NewWithoutSystemd(""),
3333
podCGroup: podCgroup,
3434
locks: utils.NewVolumeLocks(),
35+
ad: DiskAttachDetach{
36+
dev: DefaultDeviceManager,
37+
devMap: &devMap{}, // Nobody will add to this map.
38+
},
3539
},
3640
}
3741
}
@@ -45,7 +49,7 @@ func (a *CSIAgent) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolum
4549
}
4650

4751
func (a *CSIAgent) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
48-
return localExpandVolume(ctx, req)
52+
return a.ns.localExpandVolume(ctx, req)
4953
}
5054

5155
func (a *CSIAgent) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {

pkg/disk/nodeserver.go

Lines changed: 29 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,8 @@ const (
8686
DiskAttachedKey = "k8s.aliyun.com"
8787
// DiskAttachedValue attached value
8888
DiskAttachedValue = "true"
89-
// VolumeDir volume dir
90-
VolumeDir = "/host/etc/kubernetes/volumes/disk/"
9189
// RundSocketDir dir
9290
RundSocketDir = "/host/etc/kubernetes/volumes/rund/"
93-
// VolumeDirRemove volume dir remove
94-
VolumeDirRemove = "/host/etc/kubernetes/volumes/disk/remove"
9591
// InputOutputErr tag
9692
InputOutputErr = "input/output error"
9793
// CreateDiskARN ARN parameter of the CreateDisk interface
@@ -127,15 +123,6 @@ var (
127123
// BLOCKVOLUMEPREFIX block volume mount prefix
128124
BLOCKVOLUMEPREFIX = filepath.Join(utils.KubeletRootDir, "/plugins/kubernetes.io/csi/volumeDevices/publish")
129125

130-
// DiskXattrName xattr is applied on the block device file to indicate that it is managed by the CSI driver.
131-
// Value is the disk ID.
132-
// Linux only support trusted namespace xattr in tmpfs before kernel v6.6,
133-
// but setting trusted xaattr requires CAP_SYS_ADMIN capability, we may use user namespace instead in unit tests.
134-
DiskXattrName = "trusted.csi-managed-disk"
135-
136-
// DiskXattrVirtioBlkName xattr is applied on the block device file to indicate that it is managed by the CSI driver in PVM ways.
137-
DiskXattrVirtioBlkName = "trusted.virtio-blk"
138-
139126
// BDFTypeDevice defines the regexp of bdf number
140127
BDFTypeDevice = regexp.MustCompile(`^[0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}`)
141128
// DFBusTypeDevice defines the regexp of dfnumber
@@ -173,17 +160,9 @@ func parseVolumeCountEnv() (int, error) {
173160
// NewNodeServer creates node server
174161
func NewNodeServer(m metadata.MetadataProvider) csi.NodeServer {
175162
// Create Directory
176-
err := os.MkdirAll(VolumeDir, os.FileMode(0755))
177-
if err != nil {
178-
klog.Errorf("Create Directory %s failed: %v", VolumeDir, err)
179-
}
180-
err = os.MkdirAll(VolumeDirRemove, os.FileMode(0755))
163+
err := os.MkdirAll(RundSocketDir, os.FileMode(0755))
181164
if err != nil {
182-
klog.Errorf("Create Directory %s failed: %v", VolumeDir, err)
183-
}
184-
err = os.MkdirAll(RundSocketDir, os.FileMode(0755))
185-
if err != nil {
186-
klog.Errorf("Create Directory %s failed: %v", VolumeDir, err)
165+
klog.Errorf("Create Directory %s failed: %v", RundSocketDir, err)
187166
}
188167

189168
if IsVFNode() {
@@ -215,6 +194,11 @@ func NewNodeServer(m metadata.MetadataProvider) csi.NodeServer {
215194
klog.Fatalf("Failed to initialize pod cgroup: %v", err)
216195
}
217196

197+
devMap, err := NewDevMap(utilsio.RealDevTmpFS{})
198+
if err != nil {
199+
klog.Fatalf("failed to list devices: %v", err)
200+
}
201+
218202
waiter, batcher := newBatcher(true)
219203
return &nodeServer{
220204
metadata: m,
@@ -232,7 +216,8 @@ func NewNodeServer(m metadata.MetadataProvider) csi.NodeServer {
232216
attachThrottler: defaultThrottler(),
233217
detachThrottler: defaultThrottler(),
234218

235-
dev: DefaultDeviceManager,
219+
dev: DefaultDeviceManager,
220+
devMap: devMap,
236221
},
237222
locks: utils.NewVolumeLocks(),
238223
GenericNodeServer: common.GenericNodeServer{
@@ -278,6 +263,7 @@ func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetC
278263

279264
// csi disk driver: bind directory from global to pod.
280265
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
266+
logger := klog.FromContext(ctx)
281267
if !ns.locks.TryAcquire(req.VolumeId) {
282268
return nil, status.Errorf(codes.Aborted, "There is already an operation for %s", req.VolumeId)
283269
}
@@ -388,7 +374,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
388374
}
389375

390376
// check device name available
391-
expectName, err := GetVolumeDeviceName(req.VolumeId)
377+
expectName, err := ns.ad.GetVolumeDeviceName(logger, req.VolumeId)
392378
if err != nil {
393379
return nil, status.Errorf(codes.Internal, "NodePublishVolume: VolumeId: %s, get device name error: %s", req.VolumeId, err.Error())
394380
}
@@ -596,10 +582,6 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
596582
klog.Errorf("NodeStageVolume: check device %s for volume %s with error: %s", device, req.VolumeId, err.Error())
597583
return nil, status.Error(defaultErrCode, err.Error())
598584
}
599-
if err := saveVolumeConfig(req.VolumeId, device); err != nil {
600-
klog.Errorf("NodeStageVolume: saveVolumeConfig %s for volume %s with error: %s", device, req.VolumeId, err.Error())
601-
return nil, status.Error(defaultErrCode, "NodeStageVolume: saveVolumeConfig for ("+req.VolumeId+device+") error with: "+err.Error())
602-
}
603585
klog.Infof("NodeStageVolume: Volume Successful Attached: %s, to Node: %s, Device: %s", req.VolumeId, ns.NodeID, device)
604586

605587
err = ns.setupDisk(ctx, device, targetPath, req)
@@ -685,22 +667,23 @@ func (ns *nodeServer) setupDisk(ctx context.Context, device, targetPath string,
685667
return nil
686668
}
687669

688-
func addDiskXattr(diskID string) (err error) {
670+
func (ns *nodeServer) addDiskXattr(logger klog.Logger, diskID string) (err error) {
689671
defer func() {
690672
if errors.Is(err, os.ErrNotExist) {
691673
klog.Infof("addDiskXattr: disk %s not found, skip", diskID)
692674
err = nil
693675
}
694676
}()
695-
device, err := GetVolumeDeviceName(diskID)
677+
device, err := ns.ad.GetVolumeDeviceName(logger, diskID)
696678
if err != nil {
697679
return
698680
}
699-
return unix.Setxattr(device, DiskXattrName, []byte(diskID), 0)
681+
return setDiskXattr(device, diskID)
700682
}
701683

702684
// target format: /var/lib/kubelet/plugins/kubernetes.io/csi/pv/pv-disk-1e7001e0-c54a-11e9-8f89-00163e0e78a0/globalmount
703685
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
686+
logger := klog.FromContext(ctx)
704687
klog.Infof("NodeUnstageVolume:: Starting to Unmount volume, volumeId: %s, target: %v", req.VolumeId, req.StagingTargetPath)
705688

706689
if !ns.locks.TryAcquire(req.VolumeId) {
@@ -788,7 +771,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
788771
}
789772
}
790773

791-
err := addDiskXattr(req.VolumeId)
774+
err := ns.addDiskXattr(logger, req.VolumeId)
792775
if err != nil {
793776
klog.Errorf("NodeUnstageVolume: addDiskXattr %s failed: %v", req.VolumeId, err)
794777
}
@@ -806,8 +789,8 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
806789
klog.Errorf("NodeUnstageVolume: VolumeId: %s, Detach failed with error %v", req.VolumeId, err.Error())
807790
return nil, err
808791
}
809-
_ = removeVolumeConfig(req.VolumeId)
810792
}
793+
ns.ad.devMap.Delete(req.VolumeId)
811794

812795
return &csi.NodeUnstageVolumeResponse{}, nil
813796
}
@@ -933,22 +916,23 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
933916
return nil, status.Error(codes.InvalidArgument, err.Error())
934917
}
935918

936-
return localExpandVolume(ctx, req)
919+
return ns.localExpandVolume(ctx, req)
937920
}
938921

939-
func localExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
922+
func (ns *nodeServer) localExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
940923
requestBytes := req.GetCapacityRange().GetRequiredBytes()
941924
volumePath := req.GetVolumePath()
942925
diskID := req.GetVolumeId()
926+
logger := klog.FromContext(ctx)
943927

944-
devicePath, err := GetVolumeDeviceName(diskID)
928+
devicePath, err := ns.ad.GetVolumeDeviceName(logger, diskID)
945929
if err != nil {
946930
if errors.Is(err, os.ErrNotExist) {
947931
return nil, status.Errorf(codes.NotFound, "can't get devicePath for: %s", diskID)
948932
}
949933
return nil, status.Errorf(codes.Internal, "NodeExpandVolume: VolumeId: %s, get device name error: %s", req.VolumeId, err.Error())
950934
}
951-
logger := klog.FromContext(ctx).WithValues("device", devicePath)
935+
logger = logger.WithValues("device", devicePath)
952936
ctx = klog.NewContext(ctx, logger)
953937

954938
rootPath, index, err := DefaultDeviceManager.GetDeviceRootAndPartitionIndex(devicePath)
@@ -1246,14 +1230,9 @@ func (ns *nodeServer) mountRunvVolumes(volumeId, sourcePath, targetPath, fsType,
12461230
klog.Errorf("NodePublishVolume(runv): unmountStageTarget %s with error: %s", sourcePath, err.Error())
12471231
return status.Error(codes.InvalidArgument, "NodePublishVolume: unmountStageTarget "+sourcePath+" with error: "+err.Error())
12481232
}
1249-
deviceName, err := DefaultDeviceManager.GetDeviceByVolumeID(volumeId)
1233+
deviceName, err := ns.ad.GetRootBlockDevice(klog.TODO(), volumeId)
12501234
if err != nil {
1251-
klog.Errorf("NodePublishVolume(runv): failed to get device by deviceName: %s", err.Error())
1252-
deviceName = getVolumeConfig(volumeId)
1253-
}
1254-
if deviceName == "" {
1255-
klog.Errorf("NodePublishVolume(runv): cannot get local deviceName for volume: %s", volumeId)
1256-
return status.Error(codes.InvalidArgument, "NodePublishVolume: cannot get local deviceName for volume: "+volumeId)
1235+
return status.Errorf(codes.InvalidArgument, "NodePublishVolume: cannot get local deviceName for volume %s: %v", volumeId, err)
12571236
}
12581237

12591238
// save volume info to local file
@@ -1289,9 +1268,11 @@ func (ns *nodeServer) mountRunvVolumes(volumeId, sourcePath, targetPath, fsType,
12891268

12901269
func (ns *nodeServer) mountRunDVolumes(volumeId, pvName, sourcePath, targetPath, fsType, mkfsOptions string, isRawBlock, pvmMode bool, mountFlags []string) (bool, error) {
12911270
klog.Infof("NodePublishVolume:: Disk Volume %s Mounted in RunD csi 3.0/2.0 protocol", volumeId)
1292-
deviceName, err := DefaultDeviceManager.GetDeviceByVolumeID(volumeId)
1271+
logger := klog.TODO()
1272+
deviceName, err := ns.ad.GetRootBlockDevice(logger, volumeId)
12931273
if err != nil {
1294-
deviceName = getVolumeConfig(volumeId)
1274+
logger.V(1).Info("RunD volume device not found", "err", err)
1275+
// maybe OK, we can find the device by xdragon-bdf below.
12951276
}
12961277

12971278
if features.FunctionalMutableFeatureGate.Enabled(features.RundCSIProtocol3) {
@@ -1476,7 +1457,7 @@ func (ns *nodeServer) checkMountedOfRunvAndRund(volumeId, targetPath string) boo
14761457
}
14771458
}
14781459

1479-
device, err := GetVolumeDeviceName(volumeId)
1460+
device, err := ns.ad.GetRootBlockDevice(klog.TODO(), volumeId)
14801461
if err != nil {
14811462
// In VFIO mode, an empty device is an expected condition, so the resulting error should be ignored.
14821463
klog.Warningf("NodeStageVolume: GetVolumeDeviceName failed: %s", err.Error())

0 commit comments

Comments
 (0)