Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions PendingReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@

## Features

- set nodeId:userId mapping in metadata [PR](https://github.com/ceph/ceph-csi/pull/5445)
- refer design doc for more details - [here](https://github.com/ceph/ceph-csi/blob/devel/docs/design/proposals/userID-mapping.md)

## NOTE
26 changes: 26 additions & 0 deletions e2e/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,26 @@ var _ = Describe(cephfsType, func() {
})
}

By("verify userId mapping metadata exists", func() {
err := createCephfsStorageClass(f.ClientSet, f, true, nil)
if err != nil {
framework.Failf("failed to create CephFS storageclass: %v", err)
}

err = verifyUserIdMappingMetadata(f, pvcPath, appPath, cephfsType)
if err != nil {
framework.Failf("failed to verify userId mapping metadata exists: %v", err)
}

validateSubvolumeCount(f, 0, fileSystemName, subvolumegroup)
validateOmapCount(f, 0, cephfsType, metadataPool, volumesType)

err = deleteResource(cephFSExamplePath + "storageclass.yaml")
if err != nil {
framework.Failf("failed to delete CephFS storageclass: %v", err)
}
Comment thread
Madhu-1 marked this conversation as resolved.
})

By("verify client address metadata exists", func() {
err := createCephfsStorageClass(f.ClientSet, f, true, nil)
if err != nil {
Expand Down Expand Up @@ -1977,6 +1997,12 @@ var _ = Describe(cephfsType, func() {
if err != nil {
logAndFail("failed to verify client address metadata of snapshot-backed PVC: %v", err)
}
err = verifyUserIdMappingMetadataSnapshotBacked(
f, pvcClone, appClone, backingSubvolumeNameData.imageName, backingSnapshotName,
)
if err != nil {
logAndFail("failed to verify user ID mapping metadata of snapshot-backed PVC: %v", err)
}

// Snapshot-backed volume shouldn't contribute to total subvolume count.
validateSubvolumeCount(f, 1, fileSystemName, subvolumegroup)
Expand Down
29 changes: 29 additions & 0 deletions e2e/cephfs_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,3 +690,32 @@ func verifyClientAddressMetadataSnapshotBacked(

return nil
}

func verifyUserIdMappingMetadataSnapshotBacked(
f *framework.Framework,
pvc *v1.PersistentVolumeClaim,
pod *v1.Pod,
subVolumeName, snapshotName string,
) error {
nodeId := pod.Spec.NodeName
_, pvObject, err := getPVCAndPV(f.ClientSet, pvc.Name, pvc.Namespace)
if err != nil {
return fmt.Errorf("failed to get PVC and PV: %w", err)
}
volumeHandle := pvObject.Spec.CSI.VolumeHandle

expectedValue := keyringCephFSNodePluginUsername
metadataKey := fmt.Sprintf(".cephfs.csi.ceph.com/userid/%s/%s", volumeHandle, nodeId)
metadataValue, err := getCephFSSnapshotMetadata(
f, fileSystemName, subVolumeName, snapshotName, subvolumegroup, metadataKey)
if err != nil {
return fmt.Errorf("failed to get subvolume snapshot metadata %s: %w", metadataKey, err)
}

if metadataValue != expectedValue {
return fmt.Errorf("userId mapping metadata %s has unexpected value %s, expected %s",
metadataKey, metadataValue, expectedValue)
}

return nil
}
193 changes: 133 additions & 60 deletions e2e/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func verifyReadAffinity(
configInfos, stdErr, err = execCommandInContainer(f, command, ns, cn, &opt)
if strings.TrimSpace(configInfos) != "" {
framework.Logf("configInfos: %v, err=%v", configInfos, err)
//override error if the configInfo is found.
// override error if the configInfo is found.
err = nil
break
}
Expand Down Expand Up @@ -748,119 +748,192 @@ func verifyReadAffinity(
return nil
}

func verifyClientAddressMetadataExists(
func verifyMetadataRemoved(
f *framework.Framework,
pvcPath, appPath string,
driverType string,
metadataKey, driverType, rbdImageSpec, subvolumeName string,
) error {
// create PVC
// verify metadata is removed after pod deletion
// retry up to 3 times with 10-second intervals
var (
metadataValue string
err error
)
maxRetries := 3
retryInterval := 10 * time.Second

for attempt := 1; attempt <= maxRetries; attempt++ {
switch driverType {
case rbdType:
metadataValue, err = getImageMeta(rbdImageSpec, metadataKey, f)
case cephfsType:
metadataValue, err = getCephFSSubvolumeMetadata(
f, fileSystemName, subvolumeName, subvolumegroup, metadataKey)
}

if err != nil && strings.Contains(err.Error(), "command terminated with exit code 2") {
framework.Logf("metadata %s successfully removed", metadataKey)
break
}

framework.Logf("metadata %s still exists with value %s, retrying (%d/%d)",
metadataKey, metadataValue, attempt, maxRetries)

if attempt == maxRetries {
return fmt.Errorf("metadata %s still exists with value %s (after %d attempts)",
metadataKey, metadataValue, maxRetries)
}
time.Sleep(retryInterval)
}

return nil
}

func getAppAndPVC(
f *framework.Framework,
pvcPath, appPath string,
) (*v1.Pod, *v1.PersistentVolumeClaim, error) {
pvc, err := loadPVC(pvcPath)
if err != nil {
return fmt.Errorf("failed to load pvc: %w", err)
return nil, nil, fmt.Errorf("failed to load pvc: %w", err)
}
pvc.Namespace = f.UniqueName
err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout)
if err != nil {
return fmt.Errorf("failed to create PVC: %w", err)
if err := createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout); err != nil {
return nil, nil, fmt.Errorf("failed to create PVC: %w", err)
}

app, err := loadApp(appPath)
if err != nil {
return fmt.Errorf("failed to load application: %w", err)
return nil, nil, fmt.Errorf("failed to load application: %w", err)
}
app.Namespace = f.UniqueName
err = createApp(f.ClientSet, app, deployTimeout)
if err != nil {
return fmt.Errorf("failed to create application: %w", err)
if err := createApp(f.ClientSet, app, deployTimeout); err != nil {
return nil, nil, fmt.Errorf("failed to create application: %w", err)
}

pod, err := getPod(f.ClientSet, app.Namespace, app.Name)
if err != nil {
return fmt.Errorf("failed to get pod: %w", err)
return nil, nil, fmt.Errorf("failed to get pod: %w", err)
}
nodeId := pod.Spec.NodeName

pvc, _, err = getPVCAndPV(f.ClientSet, pvc.Name, pvc.Namespace)
if err != nil {
return nil, nil, fmt.Errorf("failed to get PVC and PV: %w", err)
}

return pod, pvc, nil
}

func verifyClientAddressMetadataExists(
f *framework.Framework,
pvcPath, appPath string,
driverType string,
) error {
var (
metadataKey string
metadataValue string
rbdImageSpec string
subVolumeName string
subvolumeName string
err error
)

_, pvObject, err := getPVCAndPV(f.ClientSet, pvc.Name, pvc.Namespace)
pod, pvc, err := getAppAndPVC(f, pvcPath, appPath)
if err != nil {
return err
}
_, pv, err := getPVCAndPV(f.ClientSet, pvc.Name, pvc.Namespace)
if err != nil {
framework.Logf("error getting pvc %q in namespace %q: %v", pvc.Name, pvc.Namespace, err)
return fmt.Errorf("failed to get PVC and PV: %w", err)
}
volumeHandle := pvObject.Spec.CSI.VolumeHandle

// First, get the metadata while the pod is running to verify it exists
nodeId := pod.Spec.NodeName
volumeHandle := pv.Spec.CSI.VolumeHandle
switch driverType {
case rbdType:
metadataKey = fmt.Sprintf(".rbd.csi.ceph.com/clientaddress/%s/%s", volumeHandle, nodeId)

imageData, err := getImageInfoFromPVC(pvc.Namespace, pvc.Name, f)
if err != nil {
return err
}

rbdImageSpec = imageSpec(defaultRBDPool, imageData.imageName)
metadataValue, err = getImageMeta(rbdImageSpec, metadataKey, f)
if err != nil {
return fmt.Errorf("failed to get image metadata %s: %w", metadataKey, err)
}
case cephfsType:
metadataKey = fmt.Sprintf(".cephfs.csi.ceph.com/clientaddress/%s/%s", volumeHandle, nodeId)
subVolumeName = pvObject.Spec.CSI.VolumeAttributes["subvolumeName"]
metadataValue, err = getCephFSSubvolumeMetadata(
f, fileSystemName, subVolumeName, subvolumegroup, metadataKey)
if err != nil {
return fmt.Errorf("failed to get subvolume metadata %s: %w", metadataKey, err)
}
subvolumeName = pv.Spec.CSI.VolumeAttributes["subvolumeName"]
metadataValue, err = getCephFSSubvolumeMetadata(f, fileSystemName, subvolumeName, subvolumegroup, metadataKey)
}
if err != nil {
return fmt.Errorf("failed to get metadata %s: %w", metadataKey, err)
}

if metadataValue == "" {
return fmt.Errorf("client address metadata %s value is empty", metadataKey)
}

err = deletePod(pod.Name, pod.Namespace, f.ClientSet, deployTimeout)
if err != nil {
if err := deletePod(pod.Name, pod.Namespace, f.ClientSet, deployTimeout); err != nil {
return fmt.Errorf("failed to delete pod: %w", err)
}

// verify clientaddress metadata is removed after pod deletion
// retry up to 3 times with 10-second intervals
maxRetries := 3
retryInterval := 10 * time.Second
if err := verifyMetadataRemoved(f, metadataKey, driverType, rbdImageSpec, subvolumeName); err != nil {
return fmt.Errorf("failed to verify metadata removal: %w", err)
}

for attempt := 1; attempt <= maxRetries; attempt++ {
switch driverType {
case rbdType:
metadataValue, err = getImageMeta(rbdImageSpec, metadataKey, f)
case cephfsType:
metadataValue, err = getCephFSSubvolumeMetadata(
f, fileSystemName, subVolumeName, subvolumegroup, metadataKey)
}
return deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout)
}

if err != nil && strings.Contains(err.Error(), "command terminated with exit code 2") {
framework.Logf("clientaddress metadata %s successfully removed", metadataKey)
break
}
func verifyUserIdMappingMetadata(
f *framework.Framework,
pvcPath, appPath string,
driverType string,
) error {
var (
metadataKey string
metadataValue string
expectedValue string
rbdImageSpec string
subvolumeName string
err error
)

framework.Logf("clientaddress metadata %s still exists with value %s, retrying (%d/%d)",
metadataKey, metadataValue, attempt, maxRetries)
pod, pvc, err := getAppAndPVC(f, pvcPath, appPath)
if err != nil {
return err
}
_, pv, err := getPVCAndPV(f.ClientSet, pvc.Name, pvc.Namespace)
if err != nil {
return fmt.Errorf("failed to get PVC and PV: %w", err)
}

if attempt == maxRetries {
return fmt.Errorf("clientaddress metadata %s still exists with value %s (after %d attempts)",
metadataKey, metadataValue, maxRetries)
nodeId := pod.Spec.NodeName
volumeHandle := pv.Spec.CSI.VolumeHandle
switch driverType {
case rbdType:
expectedValue = keyringRBDNodePluginUsername
metadataKey = fmt.Sprintf(".rbd.csi.ceph.com/userid/%s/%s", volumeHandle, nodeId)
imageData, err := getImageInfoFromPVC(pvc.Namespace, pvc.Name, f)
if err != nil {
return err
}
time.Sleep(retryInterval)
rbdImageSpec = imageSpec(defaultRBDPool, imageData.imageName)
metadataValue, err = getImageMeta(rbdImageSpec, metadataKey, f)
case cephfsType:
expectedValue = keyringCephFSNodePluginUsername
metadataKey = fmt.Sprintf(".cephfs.csi.ceph.com/userid/%s/%s", volumeHandle, nodeId)
subvolumeName = pv.Spec.CSI.VolumeAttributes["subvolumeName"]
metadataValue, err = getCephFSSubvolumeMetadata(f, fileSystemName, subvolumeName, subvolumegroup, metadataKey)
}

err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout)
if err != nil {
return fmt.Errorf("failed to delete PVC: %w", err)
return fmt.Errorf("failed to get metadata %s: %w", metadataKey, err)
}
if metadataValue != expectedValue {
return fmt.Errorf("userId mapping metadata %s has unexpected value %s, expected %s", metadataKey, metadataValue, expectedValue)
}

return nil
if err := deletePod(pod.Name, pod.Namespace, f.ClientSet, deployTimeout); err != nil {
return fmt.Errorf("failed to delete pod: %w", err)
}

if err := verifyMetadataRemoved(f, metadataKey, driverType, rbdImageSpec, subvolumeName); err != nil {
return fmt.Errorf("failed to verify metadata removal: %w", err)
}

return deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout)
}
11 changes: 11 additions & 0 deletions e2e/rbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,17 @@ var _ = Describe("RBD", func() {
})
}

By("verify userId mapping metadata exists", func() {
err := verifyUserIdMappingMetadata(f, pvcPath, appPath, rbdType)
if err != nil {
framework.Failf("failed to verify userId mapping metadata exists: %v", err)
}

// validate created backend rbd images
validateRBDImageCount(f, 0, defaultRBDPool)
validateOmapCount(f, 0, rbdType, defaultRBDPool, volumesType)
})
Comment thread
iPraveenParihar marked this conversation as resolved.

By("verify client address metadata exists", func() {
err := verifyClientAddressMetadataExists(f, pvcPath, appPath, rbdType)
if err != nil {
Expand Down
Loading