Skip to content

Commit 15172e0

Browse files
fix(engine,replica): code refactor
Signed-off-by: David Cheng <davidcheng0922@gmail.com>
1 parent 3bd40e6 commit 15172e0

2 files changed

Lines changed: 55 additions & 50 deletions

File tree

pkg/spdk/engine.go

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -412,18 +412,16 @@ func (e *Engine) ValidateReplicaSize(replicaAddressMap map[string]string) error
412412
for _, replicaSize := range replicaSizeMap {
413413
if expectedSize == 0 {
414414
expectedSize = replicaSize
415-
} else if expectedSize != replicaSize {
415+
continue
416+
}
417+
418+
if expectedSize != replicaSize {
416419
return fmt.Errorf("found different replica sizes during engine creation: %v", replicaSizeMap)
417420
}
418421
}
419422

420423
if e.SpecSize < expectedSize {
421424
return fmt.Errorf("engine spec size %d is smaller than replica size %d", e.SpecSize, expectedSize)
422-
} else if e.SpecSize > expectedSize {
423-
// not return error here
424-
// it may cause infinite retry if the user doesn't fix the size issue
425-
e.log.Warnf("Engine spec size (%d) is larger than replica size (%d); setting engine spec size to %d", e.SpecSize, expectedSize, expectedSize)
426-
e.SpecSize = expectedSize
427425
}
428426

429427
return nil
@@ -918,7 +916,7 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) {
918916
}
919917

920918
if e.isExpanding {
921-
e.log.Debug("Engine is expandind, will skip the validation and update")
919+
e.log.Debug("Engine is expanding, will skip the validation and update")
922920
return nil
923921
}
924922

@@ -966,7 +964,7 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) {
966964
}
967965

968966
bdevRaidSize := bdevRaid.NumBlocks * uint64(bdevRaid.BlockSize)
969-
if e.SpecSize != bdevRaidSize {
967+
if e.SpecSize > bdevRaidSize {
970968
// not directly return error
971969

972970
// If the volume is not attached and do the expand
@@ -982,6 +980,11 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) {
982980
return nil
983981
}
984982

983+
if e.SpecSize < bdevRaidSize {
984+
// should not happen
985+
return fmt.Errorf("engine spec size %d is smaller than actual raid bdev size %d for engine %s", e.SpecSize, bdevRaidSize, e.Name)
986+
}
987+
985988
// Verify replica status map
986989
containValidReplica := false
987990
for replicaName, replicaStatus := range e.ReplicaStatusMap {
@@ -1346,11 +1349,9 @@ func (e *Engine) validateAndUpdateReplicaNvme(replicaName string, bdev *spdktype
13461349
return e.ReplicaStatusMap[replicaName].Mode, nil
13471350
}
13481351

1349-
// This method performs an online volume expansion for the Longhorn Engine using SPDK. It:
1350-
// Expands underlying replica logical volumes (lvol)
1351-
// Recreates the SPDK RAID bdev
1352-
// Suspends and resumes frontend I/O as needed
1353-
// Ensures cleanup and status update on failure
1352+
// Expand performs an online volume expansion for the Longhorn Engine using SPDK.
1353+
// It expands the underlying replica logical volumes (lvol), recreates the SPDK RAID bdev,
1354+
// suspends and resumes frontend I/O as needed, and ensures cleanup and status updates on failure.
13541355
func (e *Engine) Expand(spdkClient *spdkclient.Client, size uint64) (retErr error) {
13551356
e.Lock()
13561357
defer e.Unlock()
@@ -1360,19 +1361,18 @@ func (e *Engine) Expand(spdkClient *spdkclient.Client, size uint64) (retErr erro
13601361
// fetch all replica clients
13611362
replicaClients, err := e.getReplicaClients()
13621363
if err != nil {
1363-
retErr = err
1364-
return retErr
1364+
return err
13651365
}
13661366
defer e.closeRplicaClients(replicaClients)
13671367

1368-
// startExpansion checks if the expansion can be started, and marks the expansion as in progress
1369-
requireExpansion, err := e.startExpansion(size, replicaClients)
1368+
// requireExpansion checks if the expansion is needed and validates the request.
1369+
requireExpansion, err := e.requireExpansion(size, replicaClients)
13701370
if err != nil {
1371-
return errors.Wrap(err, "startExpansion failed")
1371+
return errors.Wrap(err, "requireExpansion failed")
13721372
}
13731373
if !requireExpansion {
13741374
e.log.Info("No need to expand engine")
1375-
e.SpecSize = size
1375+
e.finishExpansion(true, size)
13761376
return nil
13771377
}
13781378

@@ -1390,18 +1390,17 @@ func (e *Engine) Expand(spdkClient *spdkclient.Client, size uint64) (retErr erro
13901390
}
13911391

13921392
if engineErr != nil {
1393-
if e.State != types.InstanceStateError {
1394-
e.State = types.InstanceStateError
1395-
}
1393+
e.State = types.InstanceStateError
13961394
e.ErrorMsg = engineErr.Error()
1397-
e.log.WithError(engineErr).Errorf("Engine %s under non-recoverable operation during expansion", e.Name)
13981395

13991396
if errUpdateLogger := e.log.UpdateLogger(logrus.Fields{
14001397
"replicaStatusMap": e.ReplicaStatusMap,
14011398
}); errUpdateLogger != nil {
14021399
e.log.WithError(errUpdateLogger).Warn("Failed to update logger with replica status map during engine creation")
14031400
}
14041401

1402+
e.log.WithError(engineErr).Errorf("Engine %s under non-recoverable operation during expansion", e.Name)
1403+
14051404
e.lastExpansionError = errors.Wrap(retErr, "engine under non-recoverable operation").Error()
14061405
e.lastExpansionFailedAt = time.Now().UTC().Format(time.RFC3339Nano)
14071406
}
@@ -1425,30 +1424,27 @@ func (e *Engine) Expand(spdkClient *spdkclient.Client, size uint64) (retErr erro
14251424
}
14261425
if err != nil {
14271426
engineErr = errors.Wrap(err, "prepare raid for expansion failed")
1428-
retErr = engineErr
1429-
return retErr
1427+
return err
14301428
}
14311429

14321430
// expand replicas
14331431
if err := e.expandReplicas(replicaClients, spdkClient, size); err != nil {
14341432
engineErr = err
1435-
retErr = err
1436-
return retErr
1433+
return err
14371434
}
14381435

14391436
// recreate RAID and reconnect frontend
14401437
if err := e.reconnectFrontend(spdkClient, bdevRaidUUID); err != nil {
14411438
engineErr = errors.Wrap(err, "reconnectFrontend failed")
1442-
retErr = engineErr
1443-
return retErr
1439+
return err
14441440
}
14451441
e.log.Info("Expanding engine complete")
14461442

14471443
expanded = true // which could be true even in partial success
14481444
return nil
14491445
}
14501446

1451-
func (e *Engine) startExpansion(size uint64, replicaClients map[string]*client.SPDKClient) (requireExpansion bool, err error) {
1447+
func (e *Engine) requireExpansion(size uint64, replicaClients map[string]*client.SPDKClient) (requireExpansion bool, err error) {
14521448
if e.isExpanding {
14531449
return false, fmt.Errorf("expansion is in progress")
14541450
}
@@ -1459,7 +1455,8 @@ func (e *Engine) startExpansion(size uint64, replicaClients map[string]*client.S
14591455

14601456
if e.SpecSize > size {
14611457
return false, fmt.Errorf("cannot expand engine to a smaller size %v, current size %v", size, e.SpecSize)
1462-
} else if e.SpecSize == size {
1458+
}
1459+
if e.SpecSize == size {
14631460
e.log.Infof("Engine already at requested size %v, skipping expansion", size)
14641461
return false, nil // no need to expand
14651462
}
@@ -1492,16 +1489,19 @@ func (e *Engine) startExpansion(size uint64, replicaClients map[string]*client.S
14921489

14931490
if currentReplicaSize == 0 {
14941491
currentReplicaSize = replica.SpecSize
1495-
} else if currentReplicaSize != replica.SpecSize {
1492+
continue
1493+
}
1494+
1495+
if currentReplicaSize != replica.SpecSize {
14961496
return false, fmt.Errorf("cannot expand engine with replicas in different sizes: replica %s has size %v while other replicas have size %v", replicaName, replica.SpecSize, currentReplicaSize)
14971497
}
14981498
}
14991499

15001500
if currentReplicaSize > size {
15011501
return false, fmt.Errorf("cannot expand engine to a smaller size %v, current replica size %v", size, currentReplicaSize)
1502-
} else if currentReplicaSize == size {
1502+
}
1503+
if currentReplicaSize == size {
15031504
e.log.Infof("Replicas already at requested size %v, skipping expansion", size)
1504-
e.SpecSize = size
15051505
return false, nil // no need to expand
15061506
}
15071507

@@ -1524,6 +1524,7 @@ func (e *Engine) finishExpansion(expanded bool, size uint64) {
15241524
} else {
15251525
e.log.Infof("Failed to expand from size %v to %v", e.SpecSize, size)
15261526
}
1527+
15271528
e.isExpanding = false
15281529
}
15291530

pkg/spdk/replica.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,9 +1059,9 @@ func (r *Replica) Expand(spdkClient *spdkclient.Client, size uint64) error {
10591059
return fmt.Errorf("replica %s rounded up spec size from %v to %v since the spec size should be multiple of MiB", r.Name, size, roundedSize)
10601060
}
10611061

1062-
if r.SpecSize > roundedSize {
1062+
if r.SpecSize > size {
10631063
return fmt.Errorf("cannot expand replica %s to a smaller size %v, current spec size %v", r.Name, size, r.SpecSize)
1064-
} else if r.SpecSize == roundedSize {
1064+
} else if r.SpecSize == size {
10651065
r.log.Infof("Replica %s had been expanded to size %v", r.Name, size)
10661066
return nil
10671067
}
@@ -1074,8 +1074,7 @@ func (r *Replica) Expand(spdkClient *spdkclient.Client, size uint64) error {
10741074
}
10751075
bdevLvol := BdevLvolInfoToServiceLvol(&headBdevLvol)
10761076

1077-
// resize it if not equal
1078-
if bdevLvol.SpecSize != size {
1077+
if bdevLvol.SpecSize < size {
10791078
// If the bdev is exposed, we must stop exposing it before the resize.
10801079
reExposeBdev := false
10811080
if r.IsExposed {
@@ -1090,7 +1089,9 @@ func (r *Replica) Expand(spdkClient *spdkclient.Client, size uint64) error {
10901089
if err != nil {
10911090
r.log.Errorf("Resize replica %s failed, %v", r.Name, err)
10921091
return errors.Wrapf(err, "bdev lvol resize error")
1093-
} else if !resized {
1092+
}
1093+
1094+
if !resized {
10941095
return fmt.Errorf("no error, but replica %s not resized", r.Name)
10951096
}
10961097

@@ -1102,21 +1103,24 @@ func (r *Replica) Expand(spdkClient *spdkclient.Client, size uint64) error {
11021103
}
11031104
r.IsExposed = true
11041105
}
1105-
}
11061106

1107-
// Blindly clean up then update the caches for the head
1108-
r.Head = nil
1109-
if r.ActiveChain[len(r.ActiveChain)-1] != nil &&
1110-
r.ActiveChain[len(r.ActiveChain)-1].Name == r.Name {
1111-
r.ActiveChain = r.ActiveChain[:len(r.ActiveChain)-1]
1112-
}
1107+
// Blindly clean up then update the caches for the head
1108+
r.Head = nil
1109+
if r.ActiveChain[len(r.ActiveChain)-1] != nil &&
1110+
r.ActiveChain[len(r.ActiveChain)-1].Name == r.Name {
1111+
r.ActiveChain = r.ActiveChain[:len(r.ActiveChain)-1]
1112+
}
11131113

1114-
if err := r.updateHeadCache(spdkClient); err != nil {
1115-
r.log.Errorf("update head failed, %v", err)
1116-
return err
1114+
if err := r.updateHeadCache(spdkClient); err != nil {
1115+
r.log.Errorf("update head failed, %v", err)
1116+
return err
1117+
}
1118+
1119+
r.log.Info("Expanding replica complete")
1120+
} else if bdevLvol.SpecSize > size {
1121+
r.log.Warnf("Found the actual size %v of replica %s is already larger than the requested size %v, will just update the spec size", bdevLvol.SpecSize, r.Name, size)
11171122
}
11181123

1119-
r.log.Info("Expanding replica complete")
11201124
r.SpecSize = size
11211125
return nil
11221126
}

0 commit comments

Comments
 (0)