Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 101 additions & 8 deletions pkg/spdk/enginefrontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ import (
"github.com/sirupsen/logrus"
"go.uber.org/multierr"

"github.com/longhorn/longhorn-spdk-engine/pkg/client"
"github.com/longhorn/longhorn-spdk-engine/pkg/types"
"github.com/longhorn/longhorn-spdk-engine/pkg/util"

commonbitmap "github.com/longhorn/go-common-libs/bitmap"
safelog "github.com/longhorn/longhorn-spdk-engine/pkg/log"

"github.com/longhorn/go-spdk-helper/pkg/initiator"
"github.com/longhorn/types/pkg/generated/spdkrpc"

commonbitmap "github.com/longhorn/go-common-libs/bitmap"
spdkclient "github.com/longhorn/go-spdk-helper/pkg/spdk/client"
helpertypes "github.com/longhorn/go-spdk-helper/pkg/types"

"github.com/longhorn/longhorn-spdk-engine/pkg/client"
"github.com/longhorn/longhorn-spdk-engine/pkg/types"
"github.com/longhorn/longhorn-spdk-engine/pkg/util"

safelog "github.com/longhorn/longhorn-spdk-engine/pkg/log"
)

type EngineFrontend struct {
Expand Down Expand Up @@ -83,6 +83,8 @@ type EngineFrontend struct {
connectNvmeTCPPathFn func(transportAddress, transportServiceID string) error
// Test hook for native multipath path reconnect during recovery.
reconnectNvmeTCPPathFn func(transportAddress, transportServiceID string) error
// Test hook for target TCP reachability check during recovery.
checkTargetReachableFn func(address string) error
// Test hook for initiator NVMe device info loading.
loadInitiatorNVMeDeviceInfoFn func(transportAddress, transportServiceID, subsystemNQN string) error
// Test hook for initiator endpoint loading.
Expand Down Expand Up @@ -126,6 +128,13 @@ const (

anaSyncMaxAttempts = 5
anaSyncRetryInterval = 200 * time.Millisecond

// recoveryTargetReachabilityTimeout is the timeout for a TCP dial to
// verify the NVMe-TCP target is reachable before attempting expensive
// reconnect retries during engine frontend recovery. This only applies
// to the recovery path — normal creation and switchover paths use the
// full retry loop in the initiator package.
recoveryTargetReachabilityTimeout = 5 * time.Second
)

type NvmeTCPPath struct {
Expand All @@ -146,6 +155,22 @@ type UblkFrontend struct {
UblkID int32
}

func (ef *EngineFrontend) setMetadataDirLocked(metadataDir string) {
ef.metadataDir = metadataDir
}

func (ef *EngineFrontend) setMetadataDir(metadataDir string) {
ef.Lock()
defer ef.Unlock()
ef.setMetadataDirLocked(metadataDir)
}

func (ef *EngineFrontend) getMetadataDir() string {
ef.RLock()
defer ef.RUnlock()
return ef.metadataDir
}

func getUblkQueueDepth(ublkQueueDepth int32) int32 {
if ublkQueueDepth == 0 {
return types.DefaultUblkQueueDepth
Expand Down Expand Up @@ -2184,6 +2209,21 @@ func (ef *EngineFrontend) loadInitiatorEndpoint(dmDeviceIsBusy bool) error {
return ef.initiator.LoadEndpointForNvmeTcpFrontend(dmDeviceIsBusy)
}

func (ef *EngineFrontend) checkTargetReachable(address string) error {
if ef.checkTargetReachableFn != nil {
return ef.checkTargetReachableFn(address)
}
conn, err := net.DialTimeout("tcp", address, recoveryTargetReachabilityTimeout)
if err != nil {
return err
}
errClose := conn.Close()
if errClose != nil {
ef.log.WithError(errClose).Warnf("Failed to close connection to target address %s", address)
}
return nil
}

func (ef *EngineFrontend) getInitiatorEndpoint() string {
if ef.getInitiatorEndpointFn != nil {
return ef.getInitiatorEndpointFn()
Expand Down Expand Up @@ -2516,6 +2556,16 @@ func (ef *EngineFrontend) validateAndUpdateNvmeTcpFrontend() (err error) {
return nil
}

// isRecoveryCancelled checks whether a concurrent operation (e.g.
// EngineFrontendCreate) has changed this ef's state away from Pending,
// meaning the recovery goroutine should abort before performing further
// host-level NVMe/dm operations.
func (ef *EngineFrontend) isRecoveryCancelled() bool {
ef.RLock()
defer ef.RUnlock()
return ef.State != types.InstanceStatePending
}

// RecoverFromHost attempts to recover the engine frontend's initiator state by
// detecting existing NVMe controllers and dm-devices on the host. This is called
// during server startup for engine frontends that were persisted before restart.
Expand All @@ -2542,6 +2592,16 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
// Device not found on host — record already removed, nothing to reconcile.
return
}

// If the state has been changed from Pending by a concurrent operation
// (e.g. Delete was called during recovery), do not overwrite it.
// This prevents a race where recovery's deferred update would revert
// a Terminating/Stopped state set by Delete back to Error/Running.
if ef.State != types.InstanceStatePending {
ef.log.Infof("Skipping recovery state update for engine frontend %s: state already changed to %s by concurrent operation", ef.Name, ef.State)
return
}

if recoverErr != nil {
ef.log.WithError(recoverErr).Errorf("Failed to recover engine frontend %s from host", ef.Name)
ef.State = types.InstanceStateError
Expand Down Expand Up @@ -2576,6 +2636,17 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
return nil

case types.FrontendSPDKTCPBlockdev:
// Early cancellation check before creating the NVMe-TCP initiator.
// If a concurrent EngineFrontendCreate already completed for this
// volume (evicted us and connected its own NVMe controller), we must
// not proceed — creating an initiator and then calling Delete/Stop
// would disconnect the NEW ef's controller via DisconnectTarget
// (which disconnects ALL controllers for the subsystem NQN).
if ef.isRecoveryCancelled() {
recoverErr = ErrRecoveryCancelled
return recoverErr
}

// Recover the NVMe-oF initiator (blockdev frontend with dm-device).
i, nqn, nguid, err := ef.newNvmeTcpInitiator()
if err != nil {
Expand Down Expand Up @@ -2613,6 +2684,20 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
reconnected := false
if strings.Contains(loadErr.Error(), helpertypes.ErrorMessageCannotFindValidNvmeDevice) {
if ef.NvmeTcpFrontend.TargetIP != "" && ef.NvmeTcpFrontend.TargetPort != 0 {
targetAddr := net.JoinHostPort(ef.NvmeTcpFrontend.TargetIP, strconv.Itoa(int(ef.NvmeTcpFrontend.TargetPort)))
if dialErr := ef.checkTargetReachable(targetAddr); dialErr != nil {
ef.log.WithError(dialErr).Warnf("NVMe/TCP target %s is not reachable during recovery of engine frontend %s, skipping reconnect retries", targetAddr, ef.Name)
recoverErr = errors.Wrapf(dialErr, "NVMe/TCP target %s is not reachable during recovery", targetAddr)
return recoverErr
}
// Check cancellation before the expensive host-mutating reconnect.
// A concurrent EngineFrontendCreate may have evicted us while
// the TCP pre-check was in progress.
if ef.isRecoveryCancelled() {
recoverErr = ErrRecoveryCancelled
return recoverErr
}

ef.log.WithError(loadErr).Warnf("NVMe device not found on host during recovery of engine frontend %s, reconnecting persisted multipath target", ef.Name)
if reconnectErr := ef.reconnectNvmeTCPPath(ef.NvmeTcpFrontend.TargetIP, ef.NvmeTcpFrontend.TargetPort); reconnectErr != nil {
recoverErr = errors.Wrapf(reconnectErr, "failed to reconnect NVMe/TCP path during recovery of engine frontend %s", ef.Name)
Expand All @@ -2621,7 +2706,7 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
reconnected = true
} else {
ef.log.WithError(loadErr).Warnf("NVMe device not found on host during recovery of engine frontend %s, removing persisted record", ef.Name)
if removeErr := removeEngineFrontendRecord(ef.metadataDir, ef.VolumeName); removeErr != nil {
if removeErr := removeEngineFrontendRecord(ef.getMetadataDir(), ef.VolumeName); removeErr != nil {
ef.log.WithError(removeErr).Warn("Failed to remove engine frontend record")
}
deviceNotFound = true
Expand All @@ -2634,6 +2719,14 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
}
}

// Check cancellation before loading the dm-device endpoint.
// A concurrent EngineFrontendCreate may have evicted us during
// the preceding reconnect or sysfs scan.
if ef.isRecoveryCancelled() {
recoverErr = ErrRecoveryCancelled
return recoverErr
}

// Try to load the existing dm-device endpoint.
if err := ef.loadInitiatorEndpoint(false); err != nil {
recoverErr = errors.Wrapf(err, "failed to load endpoint during recovery of engine frontend %s", ef.Name)
Expand Down
54 changes: 54 additions & 0 deletions pkg/spdk/enginefrontend_persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func (s *TestSuite) TestRecoverFromHostBlockdevReconnectsPersistedTarget(c *C) {
ef.NvmeTcpFrontend.TargetIP = "10.0.0.8"
ef.NvmeTcpFrontend.TargetPort = 4420
ef.getInitiatorEndpointFn = func() string { return "/dev/longhorn/vol-r" }
ef.checkTargetReachableFn = func(address string) error { return nil }

loadCalls := 0
ef.loadInitiatorNVMeDeviceInfoFn = func(transportAddress, transportServiceID, subsystemNQN string) error {
Expand Down Expand Up @@ -478,6 +479,59 @@ func (s *TestSuite) TestRecoverFromHostBlockdevReconnectsPersistedTarget(c *C) {
}
}

func (s *TestSuite) TestRecoverFromHostBlockdevSkipsReconnectWhenTargetUnreachable(c *C) {
updateCh := make(chan interface{}, 1)

ef := NewEngineFrontend("ef-block-recover", "engine-r", "vol-r",
lhtypes.FrontendSPDKTCPBlockdev, 1048576, 0, 0, updateCh)
ef.NvmeTcpFrontend.TargetIP = "10.0.0.8"
ef.NvmeTcpFrontend.TargetPort = 4420

loadCalls := 0
ef.loadInitiatorNVMeDeviceInfoFn = func(transportAddress, transportServiceID, subsystemNQN string) error {
loadCalls++
c.Assert(subsystemNQN, Equals, getStableVolumeNQN("vol-r"))
if loadCalls == 1 {
c.Assert(transportAddress, Equals, "10.0.0.8")
c.Assert(transportServiceID, Equals, "4420")
return fmt.Errorf("%s", helpertypes.ErrorMessageCannotFindValidNvmeDevice)
}
c.Assert(transportAddress, Equals, "")
c.Assert(transportServiceID, Equals, "")
return fmt.Errorf("%s", helpertypes.ErrorMessageCannotFindValidNvmeDevice)
}

reconnectCalled := false
ef.reconnectNvmeTCPPathFn = func(transportAddress, transportServiceID string) error {
reconnectCalled = true
return nil
}
ef.checkTargetReachableFn = func(address string) error {
c.Assert(address, Equals, "10.0.0.8:4420")
return fmt.Errorf("dial tcp %s: i/o timeout", address)
}
ef.loadInitiatorEndpointFn = func(dmDeviceIsBusy bool) error {
c.Fatal("loadInitiatorEndpoint should not run when reachability check fails")
return nil
}

err := ef.RecoverFromHost(nil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, ".*not reachable during recovery.*")
c.Assert(reconnectCalled, Equals, false)
c.Assert(loadCalls, Equals, 2)

got := ef.Get()
c.Assert(got.State, Equals, string(lhtypes.InstanceStateError))
c.Assert(got.ErrorMsg, Matches, ".*not reachable during recovery.*")

select {
case <-updateCh:
case <-time.After(time.Second):
c.Fatal("expected update on UpdateCh")
}
}

func (s *TestSuite) TestRecoverFromHostBlockdevUsesPersistedTargetForControllerSelection(c *C) {
updateCh := make(chan interface{}, 1)

Expand Down
Loading
Loading