Skip to content

Commit c262d6e

Browse files
committed
fix(server): run engine frontend recovery asynchronously with cancellation support
Make recoverEngineFrontends() run in a separate goroutine so that gRPC servers can start immediately. This prevents the liveness probe from killing the pod when persisted targets are unreachable. Add per-volume host locks (volumeHostLocks) to serialize host-level NVMe/dm operations for the same volume. Recovery and all frontend lifecycle RPCs that mutate host NVMe controllers or dm devices (create, delete, suspend, resume, expand, switchover) acquire the per-volume lock so that these operations cannot overlap on one volume. Longhorn 13185 Signed-off-by: Derek Su <derek.su@suse.com>
1 parent 260ee8d commit c262d6e

7 files changed

Lines changed: 357 additions & 23 deletions

File tree

pkg/spdk/enginefrontend.go

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@ import (
1515
"github.com/sirupsen/logrus"
1616
"go.uber.org/multierr"
1717

18-
"github.com/longhorn/longhorn-spdk-engine/pkg/client"
19-
"github.com/longhorn/longhorn-spdk-engine/pkg/types"
20-
"github.com/longhorn/longhorn-spdk-engine/pkg/util"
21-
22-
commonbitmap "github.com/longhorn/go-common-libs/bitmap"
23-
safelog "github.com/longhorn/longhorn-spdk-engine/pkg/log"
24-
2518
"github.com/longhorn/go-spdk-helper/pkg/initiator"
2619
"github.com/longhorn/types/pkg/generated/spdkrpc"
2720

21+
commonbitmap "github.com/longhorn/go-common-libs/bitmap"
2822
spdkclient "github.com/longhorn/go-spdk-helper/pkg/spdk/client"
2923
helpertypes "github.com/longhorn/go-spdk-helper/pkg/types"
24+
25+
"github.com/longhorn/longhorn-spdk-engine/pkg/client"
26+
"github.com/longhorn/longhorn-spdk-engine/pkg/types"
27+
"github.com/longhorn/longhorn-spdk-engine/pkg/util"
28+
29+
safelog "github.com/longhorn/longhorn-spdk-engine/pkg/log"
3030
)
3131

3232
type EngineFrontend struct {
@@ -83,6 +83,8 @@ type EngineFrontend struct {
8383
connectNvmeTCPPathFn func(transportAddress, transportServiceID string) error
8484
// Test hook for native multipath path reconnect during recovery.
8585
reconnectNvmeTCPPathFn func(transportAddress, transportServiceID string) error
86+
// Test hook for target TCP reachability check during recovery.
87+
checkTargetReachableFn func(address string) error
8688
// Test hook for initiator NVMe device info loading.
8789
loadInitiatorNVMeDeviceInfoFn func(transportAddress, transportServiceID, subsystemNQN string) error
8890
// Test hook for initiator endpoint loading.
@@ -126,6 +128,13 @@ const (
126128

127129
anaSyncMaxAttempts = 5
128130
anaSyncRetryInterval = 200 * time.Millisecond
131+
132+
// recoveryTargetReachabilityTimeout is the timeout for a TCP dial to
133+
// verify the NVMe-TCP target is reachable before attempting expensive
134+
// reconnect retries during engine frontend recovery. This only applies
135+
// to the recovery path — normal creation and switchover paths use the
136+
// full retry loop in the initiator package.
137+
recoveryTargetReachabilityTimeout = 5 * time.Second
129138
)
130139

131140
type NvmeTCPPath struct {
@@ -2184,6 +2193,21 @@ func (ef *EngineFrontend) loadInitiatorEndpoint(dmDeviceIsBusy bool) error {
21842193
return ef.initiator.LoadEndpointForNvmeTcpFrontend(dmDeviceIsBusy)
21852194
}
21862195

2196+
func (ef *EngineFrontend) checkTargetReachable(address string) error {
2197+
if ef.checkTargetReachableFn != nil {
2198+
return ef.checkTargetReachableFn(address)
2199+
}
2200+
conn, err := net.DialTimeout("tcp", address, recoveryTargetReachabilityTimeout)
2201+
if err != nil {
2202+
return err
2203+
}
2204+
errClose := conn.Close()
2205+
if errClose != nil {
2206+
ef.log.WithError(errClose).Warnf("Failed to close connection to target address %s", address)
2207+
}
2208+
return nil
2209+
}
2210+
21872211
func (ef *EngineFrontend) getInitiatorEndpoint() string {
21882212
if ef.getInitiatorEndpointFn != nil {
21892213
return ef.getInitiatorEndpointFn()
@@ -2516,6 +2540,16 @@ func (ef *EngineFrontend) validateAndUpdateNvmeTcpFrontend() (err error) {
25162540
return nil
25172541
}
25182542

2543+
// isRecoveryCancelled checks whether a concurrent operation (e.g.
2544+
// EngineFrontendCreate) has changed this ef's state away from Pending,
2545+
// meaning the recovery goroutine should abort before performing further
2546+
// host-level NVMe/dm operations.
2547+
func (ef *EngineFrontend) isRecoveryCancelled() bool {
2548+
ef.RLock()
2549+
defer ef.RUnlock()
2550+
return ef.State != types.InstanceStatePending
2551+
}
2552+
25192553
// RecoverFromHost attempts to recover the engine frontend's initiator state by
25202554
// detecting existing NVMe controllers and dm-devices on the host. This is called
25212555
// during server startup for engine frontends that were persisted before restart.
@@ -2542,6 +2576,16 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
25422576
// Device not found on host — record already removed, nothing to reconcile.
25432577
return
25442578
}
2579+
2580+
// If the state has been changed from Pending by a concurrent operation
2581+
// (e.g. Delete was called during recovery), do not overwrite it.
2582+
// This prevents a race where recovery's deferred update would revert
2583+
// a Terminating/Stopped state set by Delete back to Error/Running.
2584+
if ef.State != types.InstanceStatePending {
2585+
ef.log.Infof("Skipping recovery state update for engine frontend %s: state already changed to %s by concurrent operation", ef.Name, ef.State)
2586+
return
2587+
}
2588+
25452589
if recoverErr != nil {
25462590
ef.log.WithError(recoverErr).Errorf("Failed to recover engine frontend %s from host", ef.Name)
25472591
ef.State = types.InstanceStateError
@@ -2613,6 +2657,20 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
26132657
reconnected := false
26142658
if strings.Contains(loadErr.Error(), helpertypes.ErrorMessageCannotFindValidNvmeDevice) {
26152659
if ef.NvmeTcpFrontend.TargetIP != "" && ef.NvmeTcpFrontend.TargetPort != 0 {
2660+
targetAddr := net.JoinHostPort(ef.NvmeTcpFrontend.TargetIP, strconv.Itoa(int(ef.NvmeTcpFrontend.TargetPort)))
2661+
if dialErr := ef.checkTargetReachable(targetAddr); dialErr != nil {
2662+
ef.log.WithError(dialErr).Warnf("NVMe/TCP target %s is not reachable during recovery of engine frontend %s, skipping reconnect retries", targetAddr, ef.Name)
2663+
recoverErr = errors.Wrapf(dialErr, "NVMe/TCP target %s is not reachable during recovery", targetAddr)
2664+
return recoverErr
2665+
}
2666+
// Check cancellation before the expensive host-mutating reconnect.
2667+
// A concurrent EngineFrontendCreate may have evicted us while
2668+
// the TCP pre-check was in progress.
2669+
if ef.isRecoveryCancelled() {
2670+
recoverErr = ErrRecoveryCancelled
2671+
return recoverErr
2672+
}
2673+
26162674
ef.log.WithError(loadErr).Warnf("NVMe device not found on host during recovery of engine frontend %s, reconnecting persisted multipath target", ef.Name)
26172675
if reconnectErr := ef.reconnectNvmeTCPPath(ef.NvmeTcpFrontend.TargetIP, ef.NvmeTcpFrontend.TargetPort); reconnectErr != nil {
26182676
recoverErr = errors.Wrapf(reconnectErr, "failed to reconnect NVMe/TCP path during recovery of engine frontend %s", ef.Name)
@@ -2634,6 +2692,14 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
26342692
}
26352693
}
26362694

2695+
// Check cancellation before loading the dm-device endpoint.
2696+
// A concurrent EngineFrontendCreate may have evicted us during
2697+
// the preceding reconnect or sysfs scan.
2698+
if ef.isRecoveryCancelled() {
2699+
recoverErr = ErrRecoveryCancelled
2700+
return recoverErr
2701+
}
2702+
26372703
// Try to load the existing dm-device endpoint.
26382704
if err := ef.loadInitiatorEndpoint(false); err != nil {
26392705
recoverErr = errors.Wrapf(err, "failed to load endpoint during recovery of engine frontend %s", ef.Name)

pkg/spdk/enginefrontend_persist_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ func (s *TestSuite) TestRecoverFromHostBlockdevReconnectsPersistedTarget(c *C) {
421421
ef.NvmeTcpFrontend.TargetIP = "10.0.0.8"
422422
ef.NvmeTcpFrontend.TargetPort = 4420
423423
ef.getInitiatorEndpointFn = func() string { return "/dev/longhorn/vol-r" }
424+
ef.checkTargetReachableFn = func(address string) error { return nil }
424425

425426
loadCalls := 0
426427
ef.loadInitiatorNVMeDeviceInfoFn = func(transportAddress, transportServiceID, subsystemNQN string) error {
@@ -478,6 +479,59 @@ func (s *TestSuite) TestRecoverFromHostBlockdevReconnectsPersistedTarget(c *C) {
478479
}
479480
}
480481

482+
func (s *TestSuite) TestRecoverFromHostBlockdevSkipsReconnectWhenTargetUnreachable(c *C) {
483+
updateCh := make(chan interface{}, 1)
484+
485+
ef := NewEngineFrontend("ef-block-recover", "engine-r", "vol-r",
486+
lhtypes.FrontendSPDKTCPBlockdev, 1048576, 0, 0, updateCh)
487+
ef.NvmeTcpFrontend.TargetIP = "10.0.0.8"
488+
ef.NvmeTcpFrontend.TargetPort = 4420
489+
490+
loadCalls := 0
491+
ef.loadInitiatorNVMeDeviceInfoFn = func(transportAddress, transportServiceID, subsystemNQN string) error {
492+
loadCalls++
493+
c.Assert(subsystemNQN, Equals, getStableVolumeNQN("vol-r"))
494+
if loadCalls == 1 {
495+
c.Assert(transportAddress, Equals, "10.0.0.8")
496+
c.Assert(transportServiceID, Equals, "4420")
497+
return fmt.Errorf("%s", helpertypes.ErrorMessageCannotFindValidNvmeDevice)
498+
}
499+
c.Assert(transportAddress, Equals, "")
500+
c.Assert(transportServiceID, Equals, "")
501+
return fmt.Errorf("%s", helpertypes.ErrorMessageCannotFindValidNvmeDevice)
502+
}
503+
504+
reconnectCalled := false
505+
ef.reconnectNvmeTCPPathFn = func(transportAddress, transportServiceID string) error {
506+
reconnectCalled = true
507+
return nil
508+
}
509+
ef.checkTargetReachableFn = func(address string) error {
510+
c.Assert(address, Equals, "10.0.0.8:4420")
511+
return fmt.Errorf("dial tcp %s: i/o timeout", address)
512+
}
513+
ef.loadInitiatorEndpointFn = func(dmDeviceIsBusy bool) error {
514+
c.Fatal("loadInitiatorEndpoint should not run when reachability check fails")
515+
return nil
516+
}
517+
518+
err := ef.RecoverFromHost(nil)
519+
c.Assert(err, NotNil)
520+
c.Assert(err.Error(), Matches, ".*not reachable during recovery.*")
521+
c.Assert(reconnectCalled, Equals, false)
522+
c.Assert(loadCalls, Equals, 2)
523+
524+
got := ef.Get()
525+
c.Assert(got.State, Equals, string(lhtypes.InstanceStateError))
526+
c.Assert(got.ErrorMsg, Matches, ".*not reachable during recovery.*")
527+
528+
select {
529+
case <-updateCh:
530+
case <-time.After(time.Second):
531+
c.Fatal("expected update on UpdateCh")
532+
}
533+
}
534+
481535
func (s *TestSuite) TestRecoverFromHostBlockdevUsesPersistedTargetForControllerSelection(c *C) {
482536
updateCh := make(chan interface{}, 1)
483537

0 commit comments

Comments
 (0)