Skip to content

Commit dfac075

Browse files
committed
fix(server): evict recovering engine frontends on Create
When EngineFrontendCreate encounters an existing frontend in Pending state (still being recovered asynchronously), evict it instead of returning AlreadyExists. This allows the new Create to proceed immediately without waiting for the potentially slow recovery to complete or fail. Longhorn 13185 Signed-off-by: Derek Su <derek.su@suse.com>
1 parent 93c47a7 commit dfac075

4 files changed

Lines changed: 255 additions & 23 deletions

File tree

pkg/spdk/enginefrontend.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,22 @@ type UblkFrontend struct {
155155
UblkID int32
156156
}
157157

158+
func (ef *EngineFrontend) setMetadataDirLocked(metadataDir string) {
159+
ef.metadataDir = metadataDir
160+
}
161+
162+
func (ef *EngineFrontend) setMetadataDir(metadataDir string) {
163+
ef.Lock()
164+
defer ef.Unlock()
165+
ef.setMetadataDirLocked(metadataDir)
166+
}
167+
168+
func (ef *EngineFrontend) getMetadataDir() string {
169+
ef.RLock()
170+
defer ef.RUnlock()
171+
return ef.metadataDir
172+
}
173+
158174
func getUblkQueueDepth(ublkQueueDepth int32) int32 {
159175
if ublkQueueDepth == 0 {
160176
return types.DefaultUblkQueueDepth
@@ -2620,6 +2636,17 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
26202636
return nil
26212637

26222638
case types.FrontendSPDKTCPBlockdev:
2639+
// Early cancellation check before creating the NVMe-TCP initiator.
2640+
// If a concurrent EngineFrontendCreate already completed for this
2641+
// volume (evicted us and connected its own NVMe controller), we must
2642+
// not proceed — creating an initiator and then calling Delete/Stop
2643+
// would disconnect the NEW ef's controller via DisconnectTarget
2644+
// (which disconnects ALL controllers for the subsystem NQN).
2645+
if ef.isRecoveryCancelled() {
2646+
recoverErr = ErrRecoveryCancelled
2647+
return recoverErr
2648+
}
2649+
26232650
// Recover the NVMe-oF initiator (blockdev frontend with dm-device).
26242651
i, nqn, nguid, err := ef.newNvmeTcpInitiator()
26252652
if err != nil {
@@ -2679,7 +2706,7 @@ func (ef *EngineFrontend) RecoverFromHost(spdkClient *spdkclient.Client) error {
26792706
reconnected = true
26802707
} else {
26812708
ef.log.WithError(loadErr).Warnf("NVMe device not found on host during recovery of engine frontend %s, removing persisted record", ef.Name)
2682-
if removeErr := removeEngineFrontendRecord(ef.metadataDir, ef.VolumeName); removeErr != nil {
2709+
if removeErr := removeEngineFrontendRecord(ef.getMetadataDir(), ef.VolumeName); removeErr != nil {
26832710
ef.log.WithError(removeErr).Warn("Failed to remove engine frontend record")
26842711
}
26852712
deviceNotFound = true

pkg/spdk/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,6 +1132,11 @@ func (s *Server) recoverEngineFrontends(ctx context.Context) {
11321132

11331133
if current != ef {
11341134
logrus.Warnf("Engine frontend %s was superseded during recovery, cleaning up recovered resources", record.Name)
1135+
// The eviction path in EngineFrontendCreate already decided
1136+
// whether metadataDir should be kept (pre-create eviction,
1137+
// where no new record exists yet) or cleared (post-create
1138+
// eviction with successful Create, where a new record was
1139+
// written). Respect that decision — do not override here.
11351140
if deleteErr := ef.Delete(spdkClient); deleteErr != nil {
11361141
logrus.WithError(deleteErr).Warnf("Failed to clean up superseded engine frontend %s", record.Name)
11371142
}

pkg/spdk/server_enginefrontend.go

Lines changed: 130 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -232,24 +232,9 @@ func (s *Server) EngineFrontendCreate(ctx context.Context, req *spdkrpc.EngineFr
232232
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "frontend %v is not supported", req.Frontend)
233233
}
234234

235-
s.Lock()
236-
_, ok := s.engineFrontendMap[req.Name]
237-
if ok {
238-
s.Unlock()
239-
return nil, grpcstatus.Errorf(grpccodes.AlreadyExists, "engine frontend %v already exists", req.Name)
240-
}
241-
if existing := s.engineFrontendByVolumeName(req.VolumeName); existing != nil {
242-
s.Unlock()
243-
return nil, grpcstatus.Errorf(grpccodes.AlreadyExists, "engine frontend %v already exists for volume %v", existing.Name, req.VolumeName)
244-
}
245-
246-
ef := NewEngineFrontend(req.Name, req.EngineName, req.VolumeName, req.Frontend, req.SpecSize,
247-
req.UblkQueueDepth, req.UblkNumberOfQueue, s.updateChs[types.InstanceTypeEngineFrontend])
248-
ef.metadataDir = s.metadataDir
249-
250-
spdkClient := s.spdkClient
251-
s.Unlock()
252-
235+
// Derive and validate targetAddress BEFORE evicting any Pending recovery
236+
// ef, so that a malformed address does not permanently cancel a valid
237+
// ongoing recovery.
253238
targetAddress := req.TargetAddress
254239
// When disableFrontend=True, the manager passes an empty target address
255240
// because the engine's NVMe-oF target port is 0 (no listener exposed).
@@ -262,8 +247,80 @@ func (s *Server) EngineFrontendCreate(ctx context.Context, req *spdkrpc.EngineFr
262247
targetAddress = net.JoinHostPort(podIP, strconv.Itoa(types.SPDKServicePort))
263248
}
264249
}
250+
// Validate the address format. ef.Create() would reject this with a hard
251+
// error (ErrEngineFrontendCreateInvalidArgument), but at that point we
252+
// may have already evicted a Pending recovery ef.
253+
if _, _, splitErr := splitHostPort(targetAddress); splitErr != nil {
254+
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "invalid target address %v: %v", targetAddress, splitErr)
255+
}
256+
257+
s.Lock()
258+
259+
var evictedEfs []*EngineFrontend
260+
261+
if existing, ok := s.engineFrontendMap[req.Name]; ok {
262+
// If the conflicting ef is still recovering (Pending state from
263+
// async recovery), the new Create takes priority — evict it.
264+
// We only mark state and remove from map here; the recovery
265+
// goroutine will detect the state/map change and handle Delete
266+
// sequentially after RecoverFromHost returns, avoiding concurrent
267+
// access to the initiator.
268+
//
269+
// Keep metadataDir intact for now. After Create() we decide:
270+
// - Create succeeded → clear metadataDir so the old ef's Delete
271+
// does not remove the NEW persistence record (same volumeName key).
272+
// - Create failed with runtime error → keep metadataDir so Delete
273+
// removes the stale old record that would cause bad recovery.
274+
existing.Lock()
275+
if existing.State == types.InstanceStatePending {
276+
existing.State = types.InstanceStateTerminating
277+
existing.Unlock()
278+
delete(s.engineFrontendMap, req.Name)
279+
evictedEfs = append(evictedEfs, existing)
280+
} else {
281+
existing.Unlock()
282+
s.Unlock()
283+
return nil, grpcstatus.Errorf(grpccodes.AlreadyExists, "engine frontend %v already exists", req.Name)
284+
}
285+
}
286+
if existing := s.engineFrontendByVolumeName(req.VolumeName); existing != nil {
287+
existing.Lock()
288+
if existing.State == types.InstanceStatePending {
289+
existing.State = types.InstanceStateTerminating
290+
existing.Unlock()
291+
delete(s.engineFrontendMap, existing.Name)
292+
evictedEfs = append(evictedEfs, existing)
293+
} else {
294+
existing.Unlock()
295+
s.Unlock()
296+
return nil, grpcstatus.Errorf(grpccodes.AlreadyExists, "engine frontend %v already exists for volume %v", existing.Name, req.VolumeName)
297+
}
298+
}
299+
300+
ef := NewEngineFrontend(req.Name, req.EngineName, req.VolumeName, req.Frontend, req.SpecSize,
301+
req.UblkQueueDepth, req.UblkNumberOfQueue, s.updateChs[types.InstanceTypeEngineFrontend])
302+
ef.metadataDir = s.metadataDir
303+
304+
s.Unlock()
305+
306+
// Hold the per-volume host lock so that if the async recovery goroutine
307+
// is still performing host NVMe/dm operations for this volume, we wait
308+
// for it to finish before starting our own. The lock is held through
309+
// map registration so that a second concurrent Create for the same
310+
// volume cannot start host operations before we register.
311+
unlockVolumeHost := s.acquireVolumeHostLock(req.VolumeName)
312+
defer unlockVolumeHost()
313+
314+
s.RLock()
315+
spdkClient := s.spdkClient
316+
s.RUnlock()
265317

266318
ret, createErr := ef.Create(spdkClient, targetAddress)
319+
for _, evicted := range evictedEfs {
320+
if createErr == nil && evicted.VolumeName == req.VolumeName {
321+
evicted.setMetadataDir("")
322+
}
323+
}
267324

268325
// Distinguish hard errors (validation / precondition) from runtime
269326
// failures (e.g. NVMe initiator can't connect). Hard errors are
@@ -273,6 +330,26 @@ func (s *Server) EngineFrontendCreate(ctx context.Context, req *spdkrpc.EngineFr
273330
if createErr != nil &&
274331
(errors.Is(createErr, ErrEngineFrontendCreateInvalidArgument) ||
275332
errors.Is(createErr, ErrEngineFrontendCreatePrecondition)) {
333+
// Hard error — Create did not mutate anything. Restore the
334+
// evicted efs so valid ongoing recoveries are not permanently lost.
335+
if len(evictedEfs) > 0 {
336+
s.Lock()
337+
for _, evicted := range evictedEfs {
338+
evicted.Lock()
339+
evicted.State = types.InstanceStatePending
340+
evicted.Unlock()
341+
// Only restore if both the name and volume slots are still free.
342+
// A concurrent Create for the same volume (different name) could
343+
// have registered while we were in-flight, and blindly restoring
344+
// would violate the one-ef-per-volume map invariant.
345+
if _, taken := s.engineFrontendMap[evicted.Name]; !taken {
346+
if s.engineFrontendByVolumeName(evicted.VolumeName) == nil {
347+
s.engineFrontendMap[evicted.Name] = evicted
348+
}
349+
}
350+
}
351+
s.Unlock()
352+
}
276353
return nil, toEngineFrontendCreateGRPCError(createErr, "failed to create engine frontend %v", req.Name)
277354
}
278355

@@ -290,6 +367,35 @@ func (s *Server) EngineFrontendCreate(ctx context.Context, req *spdkrpc.EngineFr
290367
winner = existing
291368
}
292369
if duplicateName || duplicateVolume {
370+
// If the conflicting ef is still recovering (inserted by async
371+
// recovery between our first check and now), the new Create wins.
372+
// Only mark state and remove from map; the recovery goroutine
373+
// will handle Delete sequentially after RecoverFromHost returns.
374+
if winner != nil {
375+
winner.Lock()
376+
if winner.State == types.InstanceStatePending {
377+
winner.State = types.InstanceStateTerminating
378+
// Only clear metadataDir when Create succeeded AND the winner
379+
// shares the same volumeName. Records are keyed by volumeName,
380+
// so only same-volume records collide. Different-volume winners
381+
// must keep metadataDir so Delete removes their own record.
382+
if createErr == nil && winner.VolumeName == ef.VolumeName {
383+
winner.setMetadataDirLocked("")
384+
}
385+
winner.Unlock()
386+
387+
delete(s.engineFrontendMap, winner.Name)
388+
s.engineFrontendMap[req.Name] = ef
389+
s.Unlock()
390+
391+
if createErr != nil {
392+
return ef.Get(), nil
393+
}
394+
return ret, nil
395+
}
396+
winner.Unlock()
397+
}
398+
293399
s.Unlock()
294400
// The race loser holds a fully-created frontend with real SPDK
295401
// resources (bdevs, NVMe controllers, etc.). Clean them up so
@@ -300,7 +406,7 @@ func (s *Server) EngineFrontendCreate(ctx context.Context, req *spdkrpc.EngineFr
300406
// When volumeNames differ, each has its own directory and the
301407
// loser should clean up its own record.
302408
if winner != nil && ef.VolumeName == winner.VolumeName {
303-
ef.metadataDir = ""
409+
ef.setMetadataDir("")
304410
}
305411
if deleteErr := ef.Delete(spdkClient); deleteErr != nil {
306412
logrus.WithError(deleteErr).Warnf("Failed to clean up race-loser engine frontend %v", req.Name)
@@ -311,10 +417,12 @@ func (s *Server) EngineFrontendCreate(ctx context.Context, req *spdkrpc.EngineFr
311417
// Hold the winner's read lock to prevent concurrent mutations
312418
// (e.g. Delete, switchover) from racing with the field reads
313419
// inside saveEngineFrontendRecord.
314-
if winner != nil && winner.metadataDir != "" && ef.VolumeName == winner.VolumeName {
420+
if winner != nil && ef.VolumeName == winner.VolumeName {
315421
winner.RLock()
316-
if err := saveEngineFrontendRecord(winner.metadataDir, winner); err != nil {
317-
logrus.WithError(err).Warnf("Failed to re-persist winner engine frontend %v record after race", winner.Name)
422+
if metadataDir := winner.metadataDir; metadataDir != "" {
423+
if err := saveEngineFrontendRecord(metadataDir, winner); err != nil {
424+
logrus.WithError(err).Warnf("Failed to re-persist winner engine frontend %v record after race", winner.Name)
425+
}
318426
}
319427
winner.RUnlock()
320428
}

pkg/spdk/server_verify_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ func (s *TestSuite) TestEngineFrontendCreateReturnsAlreadyExistsForDuplicate(c *
374374
updateCh := make(chan interface{}, 1)
375375

376376
existing := NewEngineFrontend("ef-dup", "engine-a", "vol-a", lhtypes.FrontendSPDKTCPNvmf, 1024, 0, 0, updateCh)
377+
existing.State = lhtypes.InstanceStateRunning
377378

378379
srv := &Server{
379380
engineFrontendMap: map[string]*EngineFrontend{
@@ -587,3 +588,94 @@ func (s *TestSuite) TestEngineFrontendDeleteWaitsForVolumeHostLock(c *C) {
587588
c.Fatal("delete did not resume after releasing the per-volume host lock")
588589
}
589590
}
591+
592+
func (s *TestSuite) TestEngineFrontendCreateInvalidAddressDoesNotEvictPendingEf(c *C) {
593+
fmt.Println("Testing EngineFrontendCreate with invalid address does not evict a Pending frontend")
594+
595+
updateCh := make(chan interface{}, 1)
596+
existing := NewEngineFrontend("ef-recovering", "engine-a", "vol-a", lhtypes.FrontendSPDKTCPNvmf, 1024, 0, 0, updateCh)
597+
// State is Pending (the default from NewEngineFrontend), simulating an
598+
// in-progress async recovery.
599+
600+
srv := &Server{
601+
engineFrontendMap: map[string]*EngineFrontend{
602+
"ef-recovering": existing,
603+
},
604+
volumeHostLocks: map[string]*volumeHostLockEntry{},
605+
updateChs: map[lhtypes.InstanceType]chan interface{}{
606+
lhtypes.InstanceTypeEngineFrontend: updateCh,
607+
},
608+
}
609+
610+
// Use an un-bracketed IPv6 address that splitHostPort rejects.
611+
_, err := srv.EngineFrontendCreate(context.Background(), &spdkrpc.EngineFrontendCreateRequest{
612+
Name: "ef-recovering",
613+
EngineName: "engine-b",
614+
VolumeName: "vol-a",
615+
Frontend: lhtypes.FrontendSPDKTCPNvmf,
616+
SpecSize: 2048,
617+
TargetAddress: "2001:db8::1:9502",
618+
})
619+
c.Assert(err, NotNil)
620+
st, ok := grpcstatus.FromError(err)
621+
c.Assert(ok, Equals, true)
622+
c.Assert(st.Code(), Equals, grpccodes.InvalidArgument)
623+
624+
// The existing Pending frontend must still be in the map, untouched.
625+
srv.RLock()
626+
ef := srv.engineFrontendMap["ef-recovering"]
627+
srv.RUnlock()
628+
c.Assert(ef, Equals, existing)
629+
c.Assert(ef.State, Equals, lhtypes.InstanceState(lhtypes.InstanceStatePending))
630+
}
631+
632+
func (s *TestSuite) TestEngineFrontendCreateEvictsMultiplePendingRecoveries(c *C) {
633+
fmt.Println("Testing EngineFrontendCreate evicts both name and volume Pending recoveries in one request")
634+
635+
updateCh := make(chan interface{}, 1)
636+
sameName := NewEngineFrontend("ef-new", "engine-old-name", "vol-old", lhtypes.FrontendSPDKTCPNvmf, 1024, 0, 0, updateCh)
637+
sameName.metadataDir = "/tmp/name-recovery"
638+
sameVolume := NewEngineFrontend("ef-old-volume", "engine-old-volume", "vol-new", lhtypes.FrontendSPDKTCPNvmf, 1024, 0, 0, updateCh)
639+
sameVolume.metadataDir = "/tmp/volume-recovery"
640+
641+
srv := &Server{
642+
engineFrontendMap: map[string]*EngineFrontend{
643+
sameName.Name: sameName,
644+
sameVolume.Name: sameVolume,
645+
},
646+
volumeHostLocks: map[string]*volumeHostLockEntry{},
647+
updateChs: map[lhtypes.InstanceType]chan interface{}{
648+
lhtypes.InstanceTypeEngineFrontend: updateCh,
649+
},
650+
}
651+
652+
resp, err := srv.EngineFrontendCreate(context.Background(), &spdkrpc.EngineFrontendCreateRequest{
653+
Name: "ef-new",
654+
EngineName: "engine-new",
655+
VolumeName: "vol-new",
656+
Frontend: lhtypes.FrontendSPDKTCPNvmf,
657+
SpecSize: 2048,
658+
})
659+
c.Assert(err, IsNil)
660+
c.Assert(resp, NotNil)
661+
662+
srv.RLock()
663+
created := srv.engineFrontendMap["ef-new"]
664+
_, sameVolumeStillMapped := srv.engineFrontendMap["ef-old-volume"]
665+
srv.RUnlock()
666+
667+
c.Assert(created, NotNil)
668+
c.Assert(created, Not(Equals), sameName)
669+
c.Assert(created.VolumeName, Equals, "vol-new")
670+
c.Assert(sameVolumeStillMapped, Equals, false)
671+
672+
sameName.RLock()
673+
c.Assert(sameName.State, Equals, lhtypes.InstanceState(lhtypes.InstanceStateTerminating))
674+
c.Assert(sameName.metadataDir, Equals, "/tmp/name-recovery")
675+
sameName.RUnlock()
676+
677+
sameVolume.RLock()
678+
c.Assert(sameVolume.State, Equals, lhtypes.InstanceState(lhtypes.InstanceStateTerminating))
679+
c.Assert(sameVolume.metadataDir, Equals, "")
680+
sameVolume.RUnlock()
681+
}

0 commit comments

Comments
 (0)