Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 50 additions & 20 deletions internal/nvmeof/nodeserver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/nvmeof"
nvmeutil "github.com/ceph/ceph-csi/internal/nvmeof/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
Expand Down Expand Up @@ -326,6 +327,8 @@ func (ns *NodeServer) NodeUnstageVolume(

stagingTargetPath := getStagingTargetPath(req)

devPath := getDeviceFromStagingPath(ctx, stagingTargetPath)

isMnt, err := ns.Mounter.IsMountPoint(stagingTargetPath)
if err != nil {
if !os.IsNotExist(err) {
Expand Down Expand Up @@ -358,6 +361,20 @@ func (ns *NodeServer) NodeUnstageVolume(
}
log.DebugLog(ctx, "successfully removed staging path (%s)", stagingTargetPath)

// Disconnect controllers if this was the last mounted namespace on each controller.
// Non-fatal - a failed disconnect just means the connection lingers until the
// kernel's ctrl_loss_tmo expires or the next reconnect cycle.
if devPath != "" {
mountedDevices, err := getNVMeMountedDevices(ctx)
if err != nil {
log.WarningLog(ctx, "failed to get mounted devices: %v (skipping disconnect)", err)
} else {
if err := ns.initiator.DisconnectIfLastMount(ctx, devPath, mountedDevices); err != nil {
log.WarningLog(ctx, "failed to disconnect controller for device %s: %v", devPath, err)
}
}
}

return &csi.NodeUnstageVolumeResponse{}, nil
}

Expand Down Expand Up @@ -394,18 +411,17 @@ func (ns *NodeServer) NodeExpandVolume(
mountPath := volumePath + "/" + volumeID

// Find device from mount (no metadata needed!)
devicePath, err := ns.getDeviceFromMount(ctx, mountPath)
if err != nil {
log.ErrorLog(ctx, "failed to find device for mount %s: %v", mountPath, err)
devicePath := getDeviceFromStagingPath(ctx, mountPath)
if devicePath == "" {
log.ErrorLog(ctx, "failed to find device for mount %s", mountPath)

return nil, status.Errorf(codes.Internal, "failed to find device: %v", err)
return nil, status.Errorf(codes.Internal, "failed to find device for mount %s", mountPath)
}

log.DebugLog(ctx, "nvmeof: resizing filesystem on device %s at mount path %s", devicePath, mountPath)

resizer := mount.NewResizeFs(utilexec.New())
var ok bool
ok, err = resizer.Resize(devicePath, mountPath)
ok, err := resizer.Resize(devicePath, mountPath)
if !ok {
return nil, status.Errorf(codes.Internal,
"nvmeof: resize failed on path %s, error: %v", req.GetVolumePath(), err)
Expand Down Expand Up @@ -476,6 +492,10 @@ func (ns *NodeServer) stageTransaction(
return transaction, nil
}

// undoStagingTransaction rolls back a staging transaction based on the state of the transaction.
// It attempts to unmount the staging path if it was mounted,
// remove the staging path if it was created, and disconnect the NVMe device if it was connected
// and it was the last mount for that device.
func (ns *NodeServer) undoStagingTransaction(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
Expand All @@ -488,8 +508,7 @@ func (ns *NodeServer) undoStagingTransaction(
err = ns.Mounter.Unmount(stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "failed to unmount stagingtargetPath: %s with error: %v", stagingTargetPath, err)

return
// Continue anyway - try to clean up what we can
}
}

Expand All @@ -501,6 +520,18 @@ func (ns *NodeServer) undoStagingTransaction(
// continue on failure to disconnect the image
}
}
// disconnect if we connected
if transaction.devicePath != "" {
mountedDevices, err := getNVMeMountedDevices(ctx)
if err != nil {
log.WarningLog(ctx, "failed to get mounted devices during rollback: %v (skipping disconnect)", err)
} else {
if err := ns.initiator.DisconnectIfLastMount(ctx, transaction.devicePath, mountedDevices); err != nil {
log.WarningLog(ctx, "failed to disconnect during rollback for device %s: %v",
transaction.devicePath, err)
}
}
}
}

// createTargetMountPath check if the mountPath already has something mounted
Expand Down Expand Up @@ -758,20 +789,19 @@ func getStagingTargetPath(req interface{}) string {
return ""
}

// getDeviceFromMount finds the device path for a given mount path.
func (ns *NodeServer) getDeviceFromMount(ctx context.Context, mountPath string) (string, error) {
mountPoints, err := ns.Mounter.List()
// getDeviceFromStagingPath returns the device path for either filesystem or block volumes.
func getDeviceFromStagingPath(ctx context.Context, stagingTargetPath string) string {
device, err := nvmeutil.GetDeviceFromMountpoint(ctx, stagingTargetPath)
if err != nil {
return "", fmt.Errorf("failed to list mounts: %w", err)
}

for _, mp := range mountPoints {
if mp.Path == mountPath {
log.DebugLog(ctx, "found device %s for mount path %s", mp.Device, mountPath)
log.DebugLog(ctx, "could not get device from staging path %s: %v", stagingTargetPath, err)

return mp.Device, nil
}
return ""
}

return "", fmt.Errorf("no mount found for path %s", mountPath)
return device
}

// getNVMeMountedDevices returns a map of all currently mounted NVMe devices.
func getNVMeMountedDevices(ctx context.Context) (map[string]bool, error) {
return nvmeutil.GetAllNVMeMountedDevices(ctx)
}
201 changes: 201 additions & 0 deletions internal/nvmeof/nvmeof_initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ type NVMeInitiator interface {

// GetNamespaceDeviceByUUID returns the device path for a given namespace UUID
GetNamespaceDeviceByUUID(ctx context.Context, uuid string) (string, error)

// DisconnectIfLastMount disconnects from the NVMe controllers for the given device path,
// all-or-nothing (skip disconnect for all controllers
// if any controller has another mounted namespace).
// Safe for multipath - disconnects each controller individually.
//
// Example: If device /dev/nvme0n1 is connected via controllers nvme0 and nvme1,
// and both controllers have no other mounted namespaces, both will be disconnected.
DisconnectIfLastMount(ctx context.Context, devPath string, mountedDevices map[string]bool) error
}

// ConnectRequest represents a subsystem connection request.
Expand All @@ -78,11 +87,34 @@ type nvmePathAddress struct {
}

// nvmeHost represents the structure from nvme list-subsys output.
// Example JSON structure:
// [
// {
// "HostNQN":"nqn.2014-08.org.nvmexpress:gdidi-zwcq5-worker-2-7mppg",
// "HostID":"d48fbfbf-1b0e-45ce-93a4-b919224aff2c",
// "Subsystems":[
// {
// "Name":"nvme-subsys0",
// "NQN":"nqn.2016-06.io.ceph:subsystem.test-integration",
// "Paths":[
// {
// "Name":"nvme0",
// "Transport":"tcp",
// "Address":"traddr=10.131.0.225,trsvcid=4420,src_addr=10.131.0.2",
// "State":"live"
// }
// ]
// }
// ]
// }
// ]

type nvmeHost struct {
HostNQN string `json:"HostNQN"`
Subsystems []struct {
NQN string `json:"NQN"`
Paths []struct {
Name string `json:"Name"`
Address nvmePathAddress `json:"Address"`
State string `json:"State"`
} `json:"Paths"`
Expand Down Expand Up @@ -222,6 +254,82 @@ func (ni *nvmeInitiator) ConnectSubsystem(ctx context.Context, req *ConnectReque
return true, nil
}

// DisconnectIfLastMount disconnects from ALL NVMe controllers for the given device path,
// but only if NO controller has other mounted namespaces.
//
// In NVMe-oF multipath, all namespaces in a subsystem are accessible through all controllers.
// This means if any namespace is mounted, it's using ALL controllers. Therefore, we use an
// all-or-nothing approach: either disconnect all controllers or none.
//
// params:
// - devPath: the device path of the namespace being unstaged (e.g. /dev/nvme0n1)
// - mountedDevices: a map of currently MOUNTED device paths for quick lookup
//
// Algorithm:
// 1. Check each controller to see if it has other mounted namespaces
// 2. If ANY controller has other mounted namespaces, return early (don't disconnect anything)
// 3. If NO controllers have other mounted namespaces, disconnect ALL controllers
//
// Example: Device /dev/nvme0n1 is connected via controllers nvme0 and nvme1
//
// Case 1 - Other namespace mounted:
// - Controller nvme0 has namespaces: [nvme0n1 (being unstaged), nvme0n2 (mounted)]
// - Controller nvme1 has namespaces: [nvme0n1 (being unstaged), nvme0n2 (mounted)]
// - Decision: nvme0n2 is still using both controllers -> Don't disconnect ANY controller
//
// Case 2 - No other namespaces mounted:
// - Controller nvme0 has namespaces: [nvme0n1 (being unstaged)]
// - Controller nvme1 has namespaces: [nvme0n1 (being unstaged)]
// - Decision: No other namespaces using the controllers -> Disconnect BOTH controllers
func (ni *nvmeInitiator) DisconnectIfLastMount(ctx context.Context, devPath string,
mountedDevices map[string]bool,
) error {
controllers, err := getControllersForDevice(ctx, devPath)
if err != nil {
return fmt.Errorf("failed to get controllers for %s: %w", devPath, err)
}

// PASS 1: Check if ANY controller has other mounted namespaces
for _, ctrl := range controllers {
hasOtherMounted, err := hasOtherMountedNamespaces(ctx, ctrl, devPath, mountedDevices)
if err != nil {
log.WarningLog(ctx, "Failed to check mounted namespaces for %s: %v (skipping disconnect)",
ctrl, err)

return nil // Safe: don't disconnect if we can't verify
}

if hasOtherMounted {
log.DebugLog(ctx, "Controller %s has other mounted namespaces, skipping disconnect for all controllers",
ctrl)

return nil // EXIT early, don't disconnect ANYTHING
}
}
// PASS 2: No mounted namespaces found, disconnect ALL controllers
log.DebugLog(ctx, "No other mounted namespaces found, disconnecting all controllers")

var disconnectErrors []string
for _, ctrl := range controllers {
log.DebugLog(ctx, "Disconnecting controller %s", ctrl)
_, _, err = util.ExecCommandWithTimeout(ctx, connectTimeout,
"nvme", "disconnect", "-d", "/dev/"+ctrl)
if err != nil {
disconnectErrors = append(disconnectErrors, fmt.Sprintf("%s: %v", ctrl, err))

continue
}
log.DebugLog(ctx, "Successfully disconnected controller %s", ctrl)
}

if len(disconnectErrors) > 0 {
return fmt.Errorf("failed to disconnect some controllers: %s",
strings.Join(disconnectErrors, "; "))
}

return nil
}

// GetNamespaceDeviceByUUID tries to find the path of the block device for the
// namespace. While attaching there can be a delay, this function retries a few
// times with a short delay.
Expand Down Expand Up @@ -348,3 +456,96 @@ func ResolveListeners(ctx context.Context, listeners []ListenerDetails) ([]Liste

return validListeners, nil
}

// getControllersForDevice returns all controller names for a given namespace
// device (e.g. /dev/nvme0n1 -> ["nvme0", "nvme1"]).
// Uses nvme list-subsys <device> which is multipath-aware.
func getControllersForDevice(ctx context.Context, devPath string) ([]string, error) {
stdout, _, err := util.ExecCommandWithTimeout(ctx, listSubsysTimeout,
"nvme", "list-subsys", devPath, "-o", "json")
if err != nil {
return nil, fmt.Errorf("list-subsys failed for %s: %w", devPath, err)
}

var hosts nvmeHostConnections
if err := json.Unmarshal([]byte(stdout), &hosts); err != nil {
log.DebugLog(ctx, "Failed to parse list-subsys output for %s: %v (output: %s)", devPath, err, stdout)

return nil, fmt.Errorf("failed to parse list-subsys output: %w", err)
}

var controllers []string
for _, host := range hosts {
for _, subsys := range host.Subsystems {
for _, path := range subsys.Paths {
if path.Name != "" {
controllers = append(controllers, path.Name)
}
}
}
}
// This makes it idempotent - device may have been disconnected already
if len(controllers) == 0 {
log.DebugLog(ctx, "no controllers found for device %s (may be already disconnected)", devPath)
}

return controllers, nil
}

// hasOtherMountedNamespaces checks if a controller has any mounted
// namespaces OTHER than the one we're about to unstage.
func hasOtherMountedNamespaces(ctx context.Context, controllerName, currentDevPath string,
mountedDevices map[string]bool,
) (bool, error) {
// Get all namespaces on this controller
nsids, err := getNamespacesForController(ctx, controllerName)
if err != nil {
return false, err
}

// Check if any OTHER namespace on this controller is mounted
for _, nsid := range nsids {
nsDevice := fmt.Sprintf("/dev/%sn%d", controllerName, nsid)

// Skip the device we're about to unmount
if nsDevice == currentDevPath {
continue
}

// If any other namespace is mounted, return true
if mountedDevices[nsDevice] {
log.DebugLog(ctx, "Found other mounted namespace %s on controller %s", nsDevice, controllerName)

return true, nil
}
}

return false, nil
}

// getNamespacesForController returns list of namespace IDs on a controller.
func getNamespacesForController(ctx context.Context, controllerName string) ([]int, error) {
controllerPath := "/dev/" + controllerName
stdout, _, err := util.ExecCommandWithTimeout(ctx, connectTimeout,
"nvme", "list-ns", controllerPath, "-o", "json")
if err != nil {
return nil, fmt.Errorf("list-ns failed for %s: %w", controllerPath, err)
}

// output: {"nsid_list":[{"nsid":1},{"nsid":2}]}
var result struct {
NSIDList []struct {
NSID int `json:"nsid"`
} `json:"nsid_list"`
}
if err := json.Unmarshal([]byte(stdout), &result); err != nil {
return nil, fmt.Errorf("failed to parse list-ns output: %w", err)
}

nsids := make([]int, len(result.NSIDList))
for i, ns := range result.NSIDList {
nsids[i] = ns.NSID
}

return nsids, nil
}
2 changes: 2 additions & 0 deletions internal/nvmeof/nvmeof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,15 @@ func TestHasLivePathToGateway(t *testing.T) {
Subsystems: []struct {
NQN string `json:"NQN"`
Paths []struct {
Name string `json:"Name"`
Address nvmePathAddress `json:"Address"`
State string `json:"State"`
} `json:"Paths"`
}{
{
NQN: "nqn.2016-06.io.ceph:subsystem.test",
Paths: []struct {
Name string `json:"Name"`
Address nvmePathAddress `json:"Address"`
State string `json:"State"`
}{
Expand Down
Loading
Loading