Skip to content

Commit 5736962

Browse files
davidcheng0922derekbit
authored andcommitted
fix(engine,replica): improve error handling and message
Signed-off-by: David Cheng <davidcheng0922@gmail.com>
1 parent 01919c3 commit 5736962

2 files changed

Lines changed: 39 additions & 50 deletions

File tree

pkg/spdk/engine.go

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
232232
}
233233

234234
if err := e.ValidateReplicaSize(replicaAddressMap); err != nil {
235-
return nil, err
235+
return nil, errors.Wrapf(err, "failed to validate replica size during engine creation")
236236
}
237237

238238
defer func() {
@@ -401,7 +401,7 @@ func (e *Engine) ValidateReplicaSize(replicaAddressMap map[string]string) error
401401
}
402402
replica, err := replicaClient.ReplicaGet(replicaName)
403403
if err != nil {
404-
return errors.Wrapf(err, "failed to get replica %v from %v during engine creation", replicaName, replicaAddr)
404+
return errors.Wrapf(err, "failed to get replica %v from %v", replicaName, replicaAddr)
405405
}
406406

407407
replicaSizeMap[replicaName] = replica.SpecSize
@@ -416,7 +416,7 @@ func (e *Engine) ValidateReplicaSize(replicaAddressMap map[string]string) error
416416
}
417417

418418
if expectedSize != replicaSize {
419-
return fmt.Errorf("found different replica sizes during engine creation: %v", replicaSizeMap)
419+
return fmt.Errorf("found different replica sizes: %v", replicaSizeMap)
420420
}
421421
}
422422

@@ -702,26 +702,28 @@ func (e *Engine) reconnectNvmeTcpFrontend(spdkClient *spdkclient.Client) (err er
702702
"port": e.NvmeTcpFrontend.Port,
703703
"targetPort": e.NvmeTcpFrontend.TargetPort,
704704
}); errUpdateLogger != nil {
705-
e.log.WithError(err).Warn("Failed to update logger during reconnect frontend")
705+
e.log.WithError(errUpdateLogger).Warn("Failed to update logger during reconnect frontend")
706706
}
707-
e.log.Infof("Finished reconnecting frontend for engine: %+v", e)
707+
e.log.Info("Finished reconnecting frontend for engine")
708708
}
709709
}()
710710

711+
portStr := strconv.Itoa(int(e.NvmeTcpFrontend.Port))
712+
711713
// No need to disconnect nvme target and stop expose bdev again
712714
// We already did it in prepareRaidForExpansion
713715
err = spdkClient.StartExposeBdev(
714716
e.NvmeTcpFrontend.Nqn,
715717
e.Name,
716718
e.NvmeTcpFrontend.Nguid,
717719
e.NvmeTcpFrontend.IP,
718-
strconv.Itoa(int(e.NvmeTcpFrontend.Port)),
720+
portStr,
719721
)
720722
if err != nil {
721723
return errors.Wrapf(err, "failed to expose bdev during reconnect for engine %v", e.Name)
722724
}
723725

724-
dmDeviceIsBusy, err := e.initiator.StartNvmeTCPInitiator(e.NvmeTcpFrontend.IP, strconv.Itoa(int(e.NvmeTcpFrontend.Port)), true)
726+
dmDeviceIsBusy, err := e.initiator.StartNvmeTCPInitiator(e.NvmeTcpFrontend.IP, portStr, true)
725727
if err != nil {
726728
return errors.Wrapf(err, "failed to start initiator during reconnect for engine %v", e.Name)
727729
}
@@ -1363,7 +1365,7 @@ func (e *Engine) Expand(spdkClient *spdkclient.Client, size uint64) (retErr erro
13631365
if err != nil {
13641366
return err
13651367
}
1366-
defer e.closeRplicaClients(replicaClients)
1368+
defer e.closeReplicaClients(replicaClients)
13671369

13681370
// requireExpansion checks if the expansion is needed and validates the request.
13691371
requireExpansion, err := e.requireExpansion(size, replicaClients)
@@ -1377,32 +1379,24 @@ func (e *Engine) Expand(spdkClient *spdkclient.Client, size uint64) (retErr erro
13771379
}
13781380

13791381
var (
1380-
expanded = false
1381-
engineErr error
1382+
expanded = false
13821383
)
13831384

13841385
// engineErr will be set when the engine failed to do any non-recoverable operations.
13851386
defer func() {
13861387
if retErr != nil {
1387-
e.log.WithError(engineErr).Errorf("Engine %s failed to expand", e.Name)
1388+
e.State = types.InstanceStateError
1389+
e.ErrorMsg = retErr.Error()
13881390
e.lastExpansionError = errors.Wrap(retErr, "engine failed to expand expansion").Error()
13891391
e.lastExpansionFailedAt = time.Now().UTC().Format(time.RFC3339Nano)
1390-
}
1391-
1392-
if engineErr != nil {
1393-
e.State = types.InstanceStateError
1394-
e.ErrorMsg = engineErr.Error()
13951392

13961393
if errUpdateLogger := e.log.UpdateLogger(logrus.Fields{
13971394
"replicaStatusMap": e.ReplicaStatusMap,
13981395
}); errUpdateLogger != nil {
13991396
e.log.WithError(errUpdateLogger).Warn("Failed to update logger with replica status map during engine creation")
14001397
}
14011398

1402-
e.log.WithError(engineErr).Errorf("Engine %s under non-recoverable operation during expansion", e.Name)
1403-
1404-
e.lastExpansionError = errors.Wrap(retErr, "engine under non-recoverable operation").Error()
1405-
e.lastExpansionFailedAt = time.Now().UTC().Format(time.RFC3339Nano)
1399+
e.log.WithError(retErr).Errorf("Engine %s failed to expand", e.Name)
14061400
}
14071401

14081402
e.finishExpansion(expanded, size)
@@ -1413,30 +1407,22 @@ func (e *Engine) Expand(spdkClient *spdkclient.Client, size uint64) (retErr erro
14131407
if isSuspended {
14141408
defer func() {
14151409
if frontendErr := e.initiator.Resume(); frontendErr != nil {
1416-
frontendErr = errors.Wrapf(frontendErr, "failed to resume NVMe initiator during engine %s", e.Name)
1417-
if engineErr != nil {
1418-
engineErr = errors.Wrapf(engineErr, "original error; resume failed: %v", frontendErr)
1419-
} else {
1420-
engineErr = frontendErr
1421-
}
1410+
retErr = multierr.Append(retErr, errors.Wrapf(frontendErr, "original error; resume failed"))
14221411
}
14231412
}()
14241413
}
14251414
if err != nil {
1426-
engineErr = errors.Wrap(err, "prepare raid for expansion failed")
1427-
return err
1415+
return errors.Wrap(err, "prepare raid for expansion failed")
14281416
}
14291417

14301418
// expand replicas
14311419
if err := e.expandReplicas(replicaClients, spdkClient, size); err != nil {
1432-
engineErr = err
1433-
return err
1420+
return errors.Wrap(err, "expand replica failed")
14341421
}
14351422

14361423
// recreate RAID and reconnect frontend
14371424
if err := e.reconnectFrontend(spdkClient, bdevRaidUUID); err != nil {
1438-
engineErr = errors.Wrap(err, "reconnectFrontend failed")
1439-
return err
1425+
return errors.Wrap(err, "reconnectFrontend failed")
14401426
}
14411427
e.log.Info("Expanding engine complete")
14421428

@@ -1655,9 +1641,7 @@ func (e *Engine) expandReplicas(replicaClients map[string]*client.SPDKClient, sp
16551641
e.log.WithField("replica", replicaName).
16561642
Info("Replica expansion succeeded despite earlier error, removing from failure list")
16571643

1658-
errorLock.Lock()
16591644
delete(failedReplica, replicaName)
1660-
errorLock.Unlock()
16611645
}
16621646
}
16631647

@@ -1824,7 +1808,7 @@ func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstRe
18241808
if err != nil {
18251809
return err
18261810
}
1827-
defer e.closeRplicaClients(replicaClients)
1811+
defer e.closeReplicaClients(replicaClients)
18281812

18291813
srcReplicaName, srcReplicaAddress, err := e.getReplicaAddSrcReplica()
18301814
if err != nil {
@@ -2298,7 +2282,7 @@ func (e *Engine) snapshotOperation(spdkClient *spdkclient.Client, inputSnapshotN
22982282
if err != nil {
22992283
return "", err
23002284
}
2301-
defer e.closeRplicaClients(replicaClients)
2285+
defer e.closeReplicaClients(replicaClients)
23022286

23032287
if snapshotName, err = e.snapshotOperationPreCheckWithoutLock(replicaClients, inputSnapshotName, snapshotOp); err != nil {
23042288
return "", err
@@ -2373,7 +2357,7 @@ func (e *Engine) getReplicaClients() (replicaClients map[string]*client.SPDKClie
23732357
return replicaClients, nil
23742358
}
23752359

2376-
func (e *Engine) closeRplicaClients(replicaClients map[string]*client.SPDKClient) {
2360+
func (e *Engine) closeReplicaClients(replicaClients map[string]*client.SPDKClient) {
23772361
for replicaName := range replicaClients {
23782362
if replicaClients[replicaName] != nil {
23792363
if errClose := replicaClients[replicaName].Close(); errClose != nil {

pkg/spdk/replica.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,11 +1047,12 @@ func (r *Replica) Expand(spdkClient *spdkclient.Client, size uint64) error {
10471047
r.Lock()
10481048
defer r.Unlock()
10491049

1050-
r.log.Info("Expanding replica")
1050+
r.log.Infof("Expanding replica %s to size %v", r.Name, size)
10511051

10521052
clusterSize, err := r.fetchClusterSize(spdkClient)
10531053
if err != nil {
1054-
return err
1054+
return errors.Wrapf(err, "Replica %s, fetch cluster size failed", r.Name)
1055+
10551056
}
10561057

10571058
roundedSize := util.RoundUp(size, clusterSize)
@@ -1061,25 +1062,31 @@ func (r *Replica) Expand(spdkClient *spdkclient.Client, size uint64) error {
10611062

10621063
if r.SpecSize > size {
10631064
return fmt.Errorf("cannot expand replica %s to a smaller size %v, current spec size %v", r.Name, size, r.SpecSize)
1064-
} else if r.SpecSize == size {
1065+
}
1066+
if r.SpecSize == size {
10651067
r.log.Infof("Replica %s had been expanded to size %v", r.Name, size)
10661068
return nil
10671069
}
10681070

10691071
// double check if size is already be expanded
10701072
headBdevLvol, err := spdkClient.BdevLvolGetByName(r.Alias, 0)
10711073
if err != nil {
1072-
r.log.Errorf("Get replica %s failed, %v", r.Name, err)
1073-
return err
1074+
return errors.Wrapf(err, "Get replica %s failed", r.Name)
10741075
}
10751076
bdevLvol := BdevLvolInfoToServiceLvol(&headBdevLvol)
10761077

1078+
if bdevLvol.SpecSize > size {
1079+
r.log.Warnf("Found the actual size %v of replica %s is already larger than the requested size %v, will just update the spec size", bdevLvol.SpecSize, r.Name, size)
1080+
r.SpecSize = bdevLvol.SpecSize
1081+
return nil
1082+
}
1083+
10771084
if bdevLvol.SpecSize < size {
10781085
// If the bdev is exposed, we must stop exposing it before the resize.
10791086
reExposeBdev := false
10801087
if r.IsExposed {
10811088
if err := spdkClient.StopExposeBdev(helpertypes.GetNQN(r.Name)); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) {
1082-
return fmt.Errorf("failed to stop expose replica %v before expansion: %w", r.Name, err)
1089+
return errors.Wrapf(err, "failed to stop expose replica %v before expansion", r.Name)
10831090
}
10841091
r.IsExposed = false
10851092
reExposeBdev = true
@@ -1099,26 +1106,24 @@ func (r *Replica) Expand(spdkClient *spdkclient.Client, size uint64) error {
10991106
if reExposeBdev {
11001107
nguid := commonutils.RandomID(nvmeNguidLength)
11011108
if err := spdkClient.StartExposeBdev(helpertypes.GetNQN(r.Name), r.Head.UUID, nguid, r.IP, strconv.Itoa(int(r.PortStart))); err != nil {
1102-
return fmt.Errorf("failed to start expose replica %v after expansion: %w", r.Name, err)
1109+
return errors.Wrapf(err, "failed to start expose replica %v after expansion", r.Name)
11031110
}
11041111
r.IsExposed = true
11051112
}
11061113

11071114
// Blindly clean up then update the caches for the head
11081115
r.Head = nil
1109-
if r.ActiveChain[len(r.ActiveChain)-1] != nil &&
1116+
if len(r.ActiveChain) > 0 &&
1117+
r.ActiveChain[len(r.ActiveChain)-1] != nil &&
11101118
r.ActiveChain[len(r.ActiveChain)-1].Name == r.Name {
11111119
r.ActiveChain = r.ActiveChain[:len(r.ActiveChain)-1]
11121120
}
11131121

11141122
if err := r.updateHeadCache(spdkClient); err != nil {
1115-
r.log.Errorf("update head failed, %v", err)
1116-
return err
1123+
return errors.Wrapf(err, "failed to update head cache for replica %v", r.Name)
11171124
}
11181125

11191126
r.log.Info("Expanding replica complete")
1120-
} else if bdevLvol.SpecSize > size {
1121-
r.log.Warnf("Found the actual size %v of replica %s is already larger than the requested size %v, will just update the spec size", bdevLvol.SpecSize, r.Name, size)
11221127
}
11231128

11241129
r.SpecSize = size
@@ -1141,7 +1146,7 @@ func (r *Replica) fetchClusterSize(spdkClient *spdkclient.Client) (uint64, error
11411146
}
11421147

11431148
if err != nil {
1144-
return 0, fmt.Errorf("failed to query lvstore for replica %s (name=%s uuid=%s): %w", r.Name, r.LvsName, r.LvsUUID, err)
1149+
return 0, errors.Wrapf(err, "failed to query lvstore for replica %s (name=%s uuid=%s)", r.Name, r.LvsName, r.LvsUUID)
11451150
}
11461151

11471152
if len(lvsList) != 1 {

0 commit comments

Comments
 (0)