diff --git a/go.mod b/go.mod index 0b33dcbc8c..a6c4c3f958 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/vmware-tanzu/vm-operator/api v1.9.1-0.20250923172217-bf5a74e51c65 github.com/vmware-tanzu/vm-operator/external/byok v0.0.0-20250509154507-b93e51fc90fa - github.com/vmware/govmomi v0.53.0-alpha.0.0.20251203163802-5ce652387dac + github.com/vmware/govmomi v0.53.0-alpha.0.0.20251203213634-99f18b71ea8e go.uber.org/zap v1.27.0 golang.org/x/crypto v0.43.0 golang.org/x/sync v0.18.0 diff --git a/go.sum b/go.sum index db06b71f9e..86539058e2 100644 --- a/go.sum +++ b/go.sum @@ -291,8 +291,8 @@ github.com/vmware-tanzu/vm-operator/api v1.9.1-0.20250923172217-bf5a74e51c65 h1: github.com/vmware-tanzu/vm-operator/api v1.9.1-0.20250923172217-bf5a74e51c65/go.mod h1:nWTPpxfe4gHuuYuFcrs86+NMxfkqPk3a3IlvI8TCWak= github.com/vmware-tanzu/vm-operator/external/byok v0.0.0-20250509154507-b93e51fc90fa h1:4MKu14YJ7J54O6QKmT4ds5EUpysWLLtQRMff73cVkmU= github.com/vmware-tanzu/vm-operator/external/byok v0.0.0-20250509154507-b93e51fc90fa/go.mod h1:8tiuyYslzjLIUmOlXZuGKQdQP2ZgWGCVhVeyptmZYnk= -github.com/vmware/govmomi v0.53.0-alpha.0.0.20251203163802-5ce652387dac h1:E3W+2J1I0B5LyIillKYVQHIb6CpslGcogt7Q+8FHT3c= -github.com/vmware/govmomi v0.53.0-alpha.0.0.20251203163802-5ce652387dac/go.mod h1:FM3GTg002dFFN7l2/hNS0YWC4f78HTw4kvgUwAE52cM= +github.com/vmware/govmomi v0.53.0-alpha.0.0.20251203213634-99f18b71ea8e h1:TG9xuPu9N29Ak1gNs85VsMImNv1bd2l0yNfAMc3imOU= +github.com/vmware/govmomi v0.53.0-alpha.0.0.20251203213634-99f18b71ea8e/go.mod h1:FM3GTg002dFFN7l2/hNS0YWC4f78HTw4kvgUwAE52cM= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 h1:S2dVYn90KE98chqDkyE9Z4N61UnQd+KOfgp5Iu53llk= diff --git a/pkg/common/cns-lib/volume/manager.go b/pkg/common/cns-lib/volume/manager.go index 94850028e4..a9129f0b48 100644 --- a/pkg/common/cns-lib/volume/manager.go +++ b/pkg/common/cns-lib/volume/manager.go @@ -254,7 +254,8 @@ func GetManager(ctx context.Context, vc *cnsvsphere.VirtualCenter, operationStore cnsvolumeoperationrequest.VolumeOperationRequest, idempotencyHandlingEnabled, multivCenterEnabled, multivCenterTopologyDeployment bool, - clusterFlavor cnstypes.CnsClusterFlavor) (Manager, error) { + clusterFlavor cnstypes.CnsClusterFlavor, + clusterId, clusterDistribution string) (Manager, error) { log := logger.GetLogger(ctx) managerInstanceLock.Lock() defer managerInstanceLock.Unlock() @@ -269,6 +270,8 @@ func GetManager(ctx context.Context, vc *cnsvsphere.VirtualCenter, operationStore: operationStore, idempotencyHandlingEnabled: idempotencyHandlingEnabled, clusterFlavor: clusterFlavor, + clusterId: clusterId, + clusterDistribution: clusterDistribution, } } else { managerInstance = managerInstanceMap[vc.Config.Host] @@ -283,6 +286,8 @@ func GetManager(ctx context.Context, vc *cnsvsphere.VirtualCenter, idempotencyHandlingEnabled: idempotencyHandlingEnabled, multivCenterTopologyDeployment: multivCenterTopologyDeployment, clusterFlavor: clusterFlavor, + clusterId: clusterId, + clusterDistribution: clusterDistribution, } managerInstanceMap[vc.Config.Host] = managerInstance } @@ -301,6 +306,8 @@ type defaultManager struct { multivCenterTopologyDeployment bool listViewIf ListViewIf clusterFlavor cnstypes.CnsClusterFlavor + clusterId string + clusterDistribution string } // ClearTaskInfoObjects is a go routine which runs in the background to clean @@ -1055,7 +1062,8 @@ func (m *defaultManager) AttachVolume(ctx context.Context, vm *cnsvsphere.VirtualMachine, volumeID string, checkNVMeController bool) (string, string, error) { ctx, cancelFunc := ensureOperationContextHasATimeout(ctx) defer cancelFunc() - internalAttachVolume := func() (string, string, error) { + var internalAttachVolume func(bool) (string, string, error) + internalAttachVolume = func(hasRetriedAfterReregister bool) (string, string, error) { log := logger.GetLogger(ctx) var faultType string err := validateManager(ctx, m) @@ -1121,6 +1129,26 @@ func (m *defaultManager) AttachVolume(ctx context.Context, if volumeOperationRes.Fault != nil { faultType = ExtractFaultTypeFromVolumeResponseResult(ctx, volumeOperationRes) + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && IsCnsNotRegisteredFault(ctx, volumeOperationRes.Fault) { + // If we've already retried once after re-registration, return error to prevent infinite loop + if hasRetriedAfterReregister { + return "", faultType, logger.LogNewErrorf(log, + "failed to attach cns volume after re-registration. volumeID: %q, vm: %q, "+ + "fault: CnsNotRegisteredFault, opId: %q", volumeID, vm.String(), taskInfo.ActivationId) + } + log.Infof("observed CnsNotRegisteredFault while attaching volume: %q with vm: %q. "+ + "Attempting to re-register volume", volumeID, vm.String()) + if err := m.reRegisterVolume(ctx, volumeID); err != nil { + log.Errorf("failed to re-register volume %q: %v", volumeID, err) + return "", faultType, logger.LogNewErrorf(log, "failed to re-register volume %q: %v", volumeID, err) + } + log.Infof("Successfully re-registered volume %q. Retrying AttachVolume", volumeID) + // Retry the operation after successful re-registration + return internalAttachVolume(true) + } + if volumeOperationRes.Fault.Fault != nil { _, isResourceInUseFault := volumeOperationRes.Fault.Fault.(*vim25types.ResourceInUse) if isResourceInUseFault { @@ -1175,7 +1203,7 @@ func (m *defaultManager) AttachVolume(ctx context.Context, return diskUUID, "", nil } start := time.Now() - resp, faultType, err := internalAttachVolume() + resp, faultType, err := internalAttachVolume(false) log := logger.GetLogger(ctx) log.Debugf("internalAttachVolume: returns fault %q for volume %q", faultType, volumeID) if err != nil { @@ -1193,7 +1221,8 @@ func (m *defaultManager) DetachVolume(ctx context.Context, vm *cnsvsphere.Virtua error) { ctx, cancelFunc := ensureOperationContextHasATimeout(ctx) defer cancelFunc() - internalDetachVolume := func() (string, error) { + var internalDetachVolume func(bool) (string, error) + internalDetachVolume = func(hasRetriedAfterReregister bool) (string, error) { log := logger.GetLogger(ctx) var faultType string err := validateManager(ctx, m) @@ -1278,6 +1307,26 @@ func (m *defaultManager) DetachVolume(ctx context.Context, vm *cnsvsphere.Virtua if volumeOperationRes.Fault != nil { faultType = ExtractFaultTypeFromVolumeResponseResult(ctx, volumeOperationRes) + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && IsCnsNotRegisteredFault(ctx, volumeOperationRes.Fault) { + // If we've already retried once after re-registration, return error to prevent infinite loop + if hasRetriedAfterReregister { + return faultType, logger.LogNewErrorf(log, + "failed to detach cns volume after re-registration. volumeID: %q, vm: %+v, "+ + "fault: CnsNotRegisteredFault, opId: %q", volumeID, vm, taskInfo.ActivationId) + } + log.Infof("observed CnsNotRegisteredFault while detaching volume: %q from vm: %q. "+ + "Attempting to re-register volume", volumeID, vm.String()) + if err := m.reRegisterVolume(ctx, volumeID); err != nil { + log.Errorf("failed to re-register volume %q: %v", volumeID, err) + return faultType, logger.LogNewErrorf(log, "failed to re-register volume %q: %v", volumeID, err) + } + log.Infof("Successfully re-registered volume %q. Retrying DetachVolume", volumeID) + // Retry the operation after successful re-registration + return internalDetachVolume(true) + } + if volumeOperationRes.Fault.Fault != nil { fault, isManagedObjectNotFoundFault := volumeOperationRes.Fault.Fault.(*vim25types.ManagedObjectNotFound) if isManagedObjectNotFoundFault && fault.Obj.Type == cnsDetachSpec.Vm.Type && @@ -1317,7 +1366,7 @@ func (m *defaultManager) DetachVolume(ctx context.Context, vm *cnsvsphere.Virtua return "", nil } start := time.Now() - faultType, err := internalDetachVolume() + faultType, err := internalDetachVolume(false) log := logger.GetLogger(ctx) log.Debugf("internalDetachVolume: returns fault %q for volume %q", faultType, volumeID) if err != nil { @@ -1351,7 +1400,7 @@ func (m *defaultManager) DeleteVolume(ctx context.Context, volumeID string, dele return faultType, err } if m.idempotencyHandlingEnabled { - return m.deleteVolumeWithImprovedIdempotency(ctx, volumeID, deleteDisk) + return m.deleteVolumeWithImprovedIdempotency(ctx, volumeID, deleteDisk, false) } return m.deleteVolume(ctx, volumeID, deleteDisk) } @@ -1439,7 +1488,7 @@ func (m *defaultManager) deleteVolume(ctx context.Context, volumeID string, dele // CNS task information is persisted by leveraging the VolumeOperationRequest // interface. func (m *defaultManager) deleteVolumeWithImprovedIdempotency(ctx context.Context, - volumeID string, deleteDisk bool) (string, error) { + volumeID string, deleteDisk bool, hasRetriedAfterReregister bool) (string, error) { log := logger.GetLogger(ctx) var ( // Reference to the DeleteVolume task on CNS. @@ -1469,12 +1518,17 @@ func (m *defaultManager) deleteVolumeWithImprovedIdempotency(ctx context.Context return "", nil } // Validate if previous operation is pending. - if IsTaskPending(volumeOperationDetails) { + // If we're retrying after re-registration, don't reuse the old pending task + // as it may contain CnsNotRegisteredFault. Create a new task instead. + if IsTaskPending(volumeOperationDetails) && !hasRetriedAfterReregister { taskMoRef := vim25types.ManagedObjectReference{ Type: "Task", Value: volumeOperationDetails.OperationDetails.TaskID, } task = object.NewTask(m.virtualCenter.Client.Client, taskMoRef) + } else if IsTaskPending(volumeOperationDetails) && hasRetriedAfterReregister { + log.Infof("Retrying after re-registration: ignoring old pending task %s and will create a new task", + volumeOperationDetails.OperationDetails.TaskID) } } case apierrors.IsNotFound(err): @@ -1581,6 +1635,33 @@ func (m *defaultManager) deleteVolumeWithImprovedIdempotency(ctx context.Context if volumeOperationRes.Fault != nil { faultType = ExtractFaultTypeFromVolumeResponseResult(ctx, volumeOperationRes) + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && + IsCnsNotRegisteredFault(ctx, volumeOperationRes.Fault) { + // If we've already retried once after re-registration, return error to prevent infinite loop + if hasRetriedAfterReregister { + msg := fmt.Sprintf("failed to delete volume after re-registration. volumeID: %q, "+ + "fault: CnsNotRegisteredFault, opID: %q", volumeID, taskInfo.ActivationId) + volumeOperationDetails = createRequestDetails(instanceName, "", "", 0, + nil, volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", taskInfo.ActivationId, taskInvocationStatusError, msg) + return faultType, logger.LogNewError(log, msg) + } + log.Infof("observed CnsNotRegisteredFault while deleting volume: %q. Attempting to re-register volume", volumeID) + if err := m.reRegisterVolume(ctx, volumeID); err != nil { + log.Errorf("failed to re-register volume %q: %v", volumeID, err) + msg := fmt.Sprintf("failed to re-register volume %q: %v", volumeID, err) + volumeOperationDetails = createRequestDetails(instanceName, "", "", 0, + nil, volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", taskInfo.ActivationId, taskInvocationStatusError, msg) + return faultType, logger.LogNewErrorf(log, "failed to re-register volume %q: %v", volumeID, err) + } + log.Infof("Successfully re-registered volume %q. Retrying deleteVolumeWithImprovedIdempotency", volumeID) + // Retry the operation after successful re-registration + return m.deleteVolumeWithImprovedIdempotency(ctx, volumeID, deleteDisk, true) + } + // If volume is not found on host, but is present in CNS DB, we will get vim.fault.NotFound fault. // In such a case, send back success as the volume is already deleted. if IsNotFoundFault(ctx, faultType) { @@ -1607,7 +1688,8 @@ func (m *defaultManager) deleteVolumeWithImprovedIdempotency(ctx context.Context func (m *defaultManager) UpdateVolumeMetadata(ctx context.Context, spec *cnstypes.CnsVolumeMetadataUpdateSpec) error { ctx, cancelFunc := ensureOperationContextHasATimeout(ctx) defer cancelFunc() - internalUpdateVolumeMetadata := func() error { + var internalUpdateVolumeMetadata func(bool) error + internalUpdateVolumeMetadata = func(hasRetriedAfterReregister bool) error { log := logger.GetLogger(ctx) err := validateManager(ctx, m) if err != nil { @@ -1672,6 +1754,26 @@ func (m *defaultManager) UpdateVolumeMetadata(ctx context.Context, spec *cnstype } volumeOperationRes := taskResult.GetCnsVolumeOperationResult() if volumeOperationRes.Fault != nil { + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && + IsCnsNotRegisteredFault(ctx, volumeOperationRes.Fault) { + // If we've already retried once after re-registration, return error to prevent infinite loop + if hasRetriedAfterReregister { + return logger.LogNewErrorf(log, "failed to update volume after re-registration. "+ + "updateSpec: %q, fault: CnsNotRegisteredFault, opID: %q", + spew.Sdump(spec), taskInfo.ActivationId) + } + log.Infof("observed CnsNotRegisteredFault while updating volume metadata for volume: %q. "+ + "Attempting to re-register volume", spec.VolumeId.Id) + if err := m.reRegisterVolume(ctx, spec.VolumeId.Id); err != nil { + log.Errorf("failed to re-register volume %q: %v", spec.VolumeId.Id, err) + return logger.LogNewErrorf(log, "failed to re-register volume %q: %v", spec.VolumeId.Id, err) + } + log.Infof("Successfully re-registered volume %q. Retrying UpdateVolumeMetadata", spec.VolumeId.Id) + // Retry the operation after successful re-registration + return internalUpdateVolumeMetadata(true) + } return logger.LogNewErrorf(log, "failed to update volume. updateSpec: %q, fault: %q, opID: %q", spew.Sdump(spec), spew.Sdump(volumeOperationRes.Fault), taskInfo.ActivationId) } @@ -1680,7 +1782,7 @@ func (m *defaultManager) UpdateVolumeMetadata(ctx context.Context, spec *cnstype return nil } start := time.Now() - err := internalUpdateVolumeMetadata() + err := internalUpdateVolumeMetadata(false) if err != nil { prometheus.CnsControlOpsHistVec.WithLabelValues(prometheus.PrometheusCnsUpdateVolumeMetadataOpType, prometheus.PrometheusFailStatus).Observe(time.Since(start).Seconds()) @@ -1695,7 +1797,8 @@ func (m *defaultManager) UpdateVolumeMetadata(ctx context.Context, spec *cnstype func (m *defaultManager) UpdateVolumeCrypto(ctx context.Context, spec *cnstypes.CnsVolumeCryptoUpdateSpec) error { ctx, cancelFunc := ensureOperationContextHasATimeout(ctx) defer cancelFunc() - internalUpdateVolumeCrypto := func() error { + var internalUpdateVolumeCrypto func(bool) error + internalUpdateVolumeCrypto = func(hasRetriedAfterReregister bool) error { log := logger.GetLogger(ctx) err := validateManager(ctx, m) if err != nil { @@ -1737,6 +1840,26 @@ func (m *defaultManager) UpdateVolumeCrypto(ctx context.Context, spec *cnstypes. } volumeOperationRes := taskResult.GetCnsVolumeOperationResult() if volumeOperationRes.Fault != nil { + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && + IsCnsNotRegisteredFault(ctx, volumeOperationRes.Fault) { + // If we've already retried once after re-registration, return error to prevent infinite loop + if hasRetriedAfterReregister { + return logger.LogNewErrorf(log, "failed to update volume crypto after re-registration. "+ + "updateSpec: %q, fault: CnsNotRegisteredFault, opID: %q", + spew.Sdump(spec), taskInfo.ActivationId) + } + log.Infof("observed CnsNotRegisteredFault while updating volume crypto for volume: %q. "+ + "Attempting to re-register volume", spec.VolumeId.Id) + if err := m.reRegisterVolume(ctx, spec.VolumeId.Id); err != nil { + log.Errorf("failed to re-register volume %q: %v", spec.VolumeId.Id, err) + return logger.LogNewErrorf(log, "failed to re-register volume %q: %v", spec.VolumeId.Id, err) + } + log.Infof("Successfully re-registered volume %q. Retrying UpdateVolumeCrypto", spec.VolumeId.Id) + // Retry the operation after successful re-registration + return internalUpdateVolumeCrypto(true) + } return logger.LogNewErrorf(log, "failed to update volume. updateSpec: %q, fault: %q, opID: %q", spew.Sdump(spec), spew.Sdump(volumeOperationRes.Fault), taskInfo.ActivationId) } @@ -1745,7 +1868,7 @@ func (m *defaultManager) UpdateVolumeCrypto(ctx context.Context, spec *cnstypes. return nil } start := time.Now() - err := internalUpdateVolumeCrypto() + err := internalUpdateVolumeCrypto(false) if err != nil { prometheus.CnsControlOpsHistVec.WithLabelValues(prometheus.PrometheusCnsUpdateVolumeCryptoOpType, prometheus.PrometheusFailStatus).Observe(time.Since(start).Seconds()) @@ -1778,7 +1901,7 @@ func (m *defaultManager) ExpandVolume(ctx context.Context, volumeID string, size return faultType, err } if m.idempotencyHandlingEnabled { - return m.expandVolumeWithImprovedIdempotency(ctx, volumeID, size, extraParams) + return m.expandVolumeWithImprovedIdempotency(ctx, volumeID, size, extraParams, false) } return m.expandVolume(ctx, volumeID, size) @@ -1865,7 +1988,7 @@ func (m *defaultManager) expandVolume(ctx context.Context, volumeID string, size // interface to persist CNS task information. It uses this persisted information // to handle idempotency of ExpandVolume callbacks to CNS for the same volume. func (m *defaultManager) expandVolumeWithImprovedIdempotency(ctx context.Context, volumeID string, - size int64, extraParams interface{}) (faultType string, finalErr error) { + size int64, extraParams interface{}, hasRetriedAfterReregister bool) (faultType string, finalErr error) { log := logger.GetLogger(ctx) var ( // Reference to the ExtendVolume task. @@ -1919,7 +2042,9 @@ func (m *defaultManager) expandVolumeWithImprovedIdempotency(ctx context.Context log.Infof("Volume with ID %s already expanded to size %v", volumeID, size) return "", nil } - if IsTaskPending(volumeOperationDetails) { + // If we're retrying after re-registration, don't reuse the old pending task + // as it may contain CnsNotRegisteredFault. Create a new task instead. + if IsTaskPending(volumeOperationDetails) && !hasRetriedAfterReregister { log.Infof("Volume with ID %s has ExtendVolume task %s pending on CNS.", volumeID, volumeOperationDetails.OperationDetails.TaskID) @@ -1928,6 +2053,9 @@ func (m *defaultManager) expandVolumeWithImprovedIdempotency(ctx context.Context Value: volumeOperationDetails.OperationDetails.TaskID, } task = object.NewTask(m.virtualCenter.Client.Client, taskMoRef) + } else if IsTaskPending(volumeOperationDetails) && hasRetriedAfterReregister { + log.Infof("Retrying after re-registration: ignoring old pending task %s and will create a new task", + volumeOperationDetails.OperationDetails.TaskID) } } case !apierrors.IsNotFound(finalErr): @@ -2056,6 +2184,36 @@ func (m *defaultManager) expandVolumeWithImprovedIdempotency(ctx context.Context volumeOperationRes := taskResult.GetCnsVolumeOperationResult() if volumeOperationRes.Fault != nil { + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && + IsCnsNotRegisteredFault(ctx, volumeOperationRes.Fault) { + // If we've already retried once, return error to prevent infinite loop + if hasRetriedAfterReregister { + faultType = ExtractFaultTypeFromVolumeResponseResult(ctx, volumeOperationRes) + volumeOperationDetails = createRequestDetails(instanceName, "", "", + volumeOperationDetails.Capacity, quotaInfo, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", taskInfo.ActivationId, taskInvocationStatusError, + volumeOperationRes.Fault.LocalizedMessage) + return faultType, logger.LogNewErrorf(log, "failed to extend volume after re-registration. "+ + "volumeID: %q, fault: CnsNotRegisteredFault, opID: %q", volumeID, taskInfo.ActivationId) + } + log.Infof("observed CnsNotRegisteredFault while expanding volume: %q. "+ + "Attempting to re-register volume", volumeID) + if err := m.reRegisterVolume(ctx, volumeID); err != nil { + log.Errorf("failed to re-register volume %q: %v", volumeID, err) + faultType = ExtractFaultTypeFromVolumeResponseResult(ctx, volumeOperationRes) + volumeOperationDetails = createRequestDetails(instanceName, "", "", + volumeOperationDetails.Capacity, quotaInfo, volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", taskInfo.ActivationId, taskInvocationStatusError, + fmt.Sprintf("failed to re-register volume: %v", err)) + return faultType, logger.LogNewErrorf(log, "failed to re-register volume %q: %v", volumeID, err) + } + log.Infof("Successfully re-registered volume %q. Retrying expandVolumeWithImprovedIdempotency", volumeID) + // Retry the operation after successful re-registration + return m.expandVolumeWithImprovedIdempotency(ctx, volumeID, size, extraParams, true) + } if _, ok := volumeOperationRes.Fault.Fault.(*cnstypes.CnsFault); ok { log.Debugf("ExtendVolume task %s returned with CnsFault. Querying CNS to "+ "determine if volume with ID %s was successfully expanded.", @@ -2260,6 +2418,9 @@ func (m *defaultManager) RelocateVolume(ctx context.Context, log.Errorf("CNS RelocateVolume failed from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) return nil, err } + // Note: RelocateVolume returns a task without waiting for completion. + // If the caller waits on the task and encounters CnsNotRegisteredFault, + // they should handle it by calling reRegisterVolume and retrying the operation. return res, err } start := time.Now() @@ -2753,6 +2914,25 @@ func (m *defaultManager) createSnapshotWithImprovedIdempotencyCheck(ctx context. // Handle snapshot operation result createSnapshotsOperationRes := createSnapshotsTaskResult.GetCnsVolumeOperationResult() if createSnapshotsOperationRes.Fault != nil { + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && + IsCnsNotRegisteredFault(ctx, createSnapshotsOperationRes.Fault) { + log.Infof("observed CnsNotRegisteredFault while creating snapshot %q on volume: %q. "+ + "Attempting to re-register volume", instanceName, volumeID) + if err := m.reRegisterVolume(ctx, volumeID); err != nil { + log.Errorf("failed to re-register volume %q: %v", volumeID, err) + } + // Continue to return the fault so the operation can be retried + errMsg := fmt.Sprintf("failed to create snapshot %q on volume %q with fault: CnsNotRegisteredFault, opID: %q", + instanceName, volumeID, createSnapshotsTaskInfo.ActivationId) + if m.idempotencyHandlingEnabled { + volumeOperationDetails = createRequestDetails(instanceName, volumeID, "", 0, nil, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, createSnapshotsTask.Reference().Value, + "", createSnapshotsTaskInfo.ActivationId, taskInvocationStatusError, errMsg) + } + return nil, logger.LogNewError(log, errMsg) + } errMsg := fmt.Sprintf("failed to create snapshot %q on volume %q with fault: %q, opID: %q", instanceName, volumeID, spew.Sdump(createSnapshotsOperationRes.Fault), createSnapshotsTaskInfo.ActivationId) @@ -2951,6 +3131,20 @@ func (m *defaultManager) createSnapshotWithTransaction(ctx context.Context, volu "invalid task result: got %T with value %+v", createSnapshotsTaskResult, createSnapshotsTaskResult) } if snapshotCreateResult.Fault != nil { + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && + IsCnsNotRegisteredFault(ctx, snapshotCreateResult.Fault) { + log.Infof("observed CnsNotRegisteredFault while creating snapshot %q on volume: %q. "+ + "Attempting to re-register volume", instanceName, volumeID) + if err := m.reRegisterVolume(ctx, volumeID); err != nil { + log.Errorf("failed to re-register volume %q: %v", volumeID, err) + } + // Continue to return the fault so the operation can be retried + return nil, csifault.CSIInternalFault, logger.LogNewErrorf(log, + "failed to create snapshot %q on volume %q with fault: CnsNotRegisteredFault", + instanceName, volumeID) + } return nil, "", logger.LogNewErrorf(log, "failed to create snapshot %q on volume %q with fault: %+v", instanceName, volumeID, snapshotCreateResult.Fault) } @@ -3271,6 +3465,25 @@ func (m *defaultManager) deleteSnapshotWithImprovedIdempotencyCheck( // Handle snapshot operation result deleteSnapshotsOperationRes := deleteSnapshotsTaskResult.GetCnsVolumeOperationResult() if deleteSnapshotsOperationRes.Fault != nil { + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && + IsCnsNotRegisteredFault(ctx, deleteSnapshotsOperationRes.Fault) { + log.Infof("observed CnsNotRegisteredFault while deleting snapshot %q on volume: %q. Attempting to re-register volume", snapshotID, volumeID) + if err := m.reRegisterVolume(ctx, volumeID); err != nil { + log.Errorf("failed to re-register volume %q: %v", volumeID, err) + } + // Continue to return the fault so the operation can be retried + errMsg := fmt.Sprintf("failed to delete snapshot %q on volume %q. fault: CnsNotRegisteredFault, opId: %q", + snapshotID, volumeID, deleteSnapshotsTaskInfo.ActivationId) + if m.idempotencyHandlingEnabled { + volumeOperationDetails = createRequestDetails(instanceName, "", "", 0, nil, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, deleteSnapshotTask.Reference().Value, + "", deleteSnapshotsTaskInfo.ActivationId, taskInvocationStatusError, errMsg) + } + return nil, logger.LogNewError(log, errMsg) + } + err = soap.WrapVimFault(deleteSnapshotsOperationRes.Fault.Fault) isInvalidArgumentError := cnsvsphere.IsInvalidArgumentError(err) @@ -3447,6 +3660,46 @@ func GetAllManagerInstances(ctx context.Context) map[string]*defaultManager { return newManagerInstanceMap } +// reRegisterVolume re-registers a volume to CNS +func (m *defaultManager) reRegisterVolume(ctx context.Context, volumeID string) error { + log := logger.GetLogger(ctx) + log.Infof("reRegisterVolume: Attempting to re-register volume %q to CNS", volumeID) + + containerCluster := cnsvsphere.GetContainerCluster(m.clusterId, + m.virtualCenter.Config.Username, + m.clusterFlavor, m.clusterDistribution) + containerClusterArray := []cnstypes.CnsContainerCluster{containerCluster} + + volumeName := "pvc-" + volumeID + + createSpec := &cnstypes.CnsVolumeCreateSpec{ + Name: volumeName, + VolumeType: string(cnstypes.CnsVolumeTypeBlock), + Metadata: cnstypes.CnsVolumeMetadata{ + ContainerCluster: containerCluster, + ContainerClusterArray: containerClusterArray, + }, + BackingObjectDetails: &cnstypes.CnsBlockBackingDetails{ + BackingDiskId: volumeID, + }, + } + + log.Debugf("reRegisterVolume: Re-registering volume %q with spec: %+v", volumeID, spew.Sdump(createSpec)) + _, faultType, err := m.createVolume(ctx, createSpec) + if err != nil { + // Check if it's CnsVolumeAlreadyExistsFault or CnsAlreadyRegisteredFault + // which means the volume is already registered (race condition or already fixed) + if IsCnsVolumeAlreadyExistsFault(ctx, faultType) { + log.Infof("reRegisterVolume: Volume %q is already registered (possibly registered by another operation)", volumeID) + return nil + } + return logger.LogNewErrorf(log, "failed to re-register volume %q: %v", volumeID, err) + } + + log.Infof("reRegisterVolume: Successfully re-registered volume %q to CNS", volumeID) + return nil +} + func (m *defaultManager) getAggregatedSnapshotSize(ctx context.Context, volumeID string) (int64, error) { log := logger.GetLogger(ctx) var aggregatedSnapshotCapacity int64 @@ -3478,7 +3731,7 @@ func (m *defaultManager) getAggregatedSnapshotSize(ctx context.Context, volumeID } // compileBatchAttachTaskResult consolidates Batch AttachVolume API's task result. -func compileBatchAttachTaskResult(ctx context.Context, result cnstypes.BaseCnsVolumeOperationResult, +func compileBatchAttachTaskResult(ctx context.Context, m *defaultManager, result cnstypes.BaseCnsVolumeOperationResult, vm *cnsvsphere.VirtualMachine, activationId string) (BatchAttachResult, error) { log := logger.GetLogger(ctx) volumeOperationResult := result.GetCnsVolumeOperationResult() @@ -3494,6 +3747,15 @@ func compileBatchAttachTaskResult(ctx context.Context, result cnstypes.BaseCnsVo fault := volumeOperationResult.Fault if fault != nil { + // Check for CnsNotRegisteredFault and attempt to re-register the volume + if IsCnsNotRegisteredFault(ctx, fault) { + log.Infof("observed CnsNotRegisteredFault while batch attaching volume: %q to vm: %q. "+ + "Attempting to re-register volume", volumeId, vm.String()) + if err := m.reRegisterVolume(ctx, volumeId); err != nil { + log.Errorf("failed to re-register volume %q: %v", volumeId, err) + } + // Continue to set the fault so the operation can be retried + } // In case of failure, set faultType and error. faultType := ExtractFaultTypeFromVolumeResponseResult(ctx, volumeOperationResult) batchAttachResult.FaultType = faultType @@ -3535,8 +3797,11 @@ func constructBatchAttachSpecList(ctx context.Context, vm *cnsvsphere.VirtualMac Vm: vm.Reference(), Sharing: volume.SharingMode, DiskMode: volume.DiskMode, - BackingTypeName: cnstypes.CnsVolumeBackingType(volume.BackingType), VolumeEncrypted: volume.VolumeEncrypted, + BackingTypeName: cnstypes.CnsVolumeBackingType(volume.BackingType), + } + if cnsAttachDetachSpec.BackingTypeName == "" { + cnsAttachDetachSpec.BackingTypeName = cnstypes.CnsVolumeBackingTypeFlatVer2BackingInfo } // Set controllerKey and unitNumber only if they are provided by the user. @@ -3625,7 +3890,7 @@ func (m *defaultManager) BatchAttachVolumes(ctx context.Context, volumesThatFailedToAttach := make([]string, 0) for _, result := range taskResults { - currentBatchAttachResult, err := compileBatchAttachTaskResult(ctx, result, vm, taskInfo.ActivationId) + currentBatchAttachResult, err := compileBatchAttachTaskResult(ctx, m, result, vm, taskInfo.ActivationId) if err != nil { log.Errorf("failed to compile task results. Err: %s", err) return []BatchAttachResult{}, csifault.CSIInternalFault, err @@ -3820,6 +4085,23 @@ func (m *defaultManager) unregisterVolume(ctx context.Context, volumeID string, volOpRes := res.GetCnsVolumeOperationResult() if volOpRes.Fault != nil { + // Check for CnsNotRegisteredFault and attempt to re-register the volume + // Note: CnsNotRegisteredFault only occurs when unregisterDisk is true + // Only handle this for WORKLOAD cluster flavor + if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && unregisterDisk && + IsCnsNotRegisteredFault(ctx, volOpRes.Fault) { + log.Infof("observed CnsNotRegisteredFault while unregistering volume: %q (unregisterDisk=%v). "+ + "Attempting to re-register volume", volumeID, unregisterDisk) + if err := m.reRegisterVolume(ctx, volumeID); err != nil { + log.Errorf("failed to re-register volume %q: %v", volumeID, err) + } + // Continue to return the fault so the operation can be retried + msg := "volume operation result contains CnsNotRegisteredFault" + fault := ExtractFaultTypeFromVolumeResponseResult(ctx, volOpRes) + log.Errorf("%s from vCenter %q. fault: %q, opId: %q", msg, + m.virtualCenter.Config.Host, fault, taskInfo.ActivationId) + return fault, errors.New(msg) + } msg := "volume operation result contains fault" fault := ExtractFaultTypeFromVolumeResponseResult(ctx, volOpRes) log.Errorf("%s from vCenter %q. fault: %q, opId: %q", msg, diff --git a/pkg/common/cns-lib/volume/util.go b/pkg/common/cns-lib/volume/util.go index 620336963a..729aecc98e 100644 --- a/pkg/common/cns-lib/volume/util.go +++ b/pkg/common/cns-lib/volume/util.go @@ -632,3 +632,17 @@ func IsCnsVolumeAlreadyExistsFault(ctx context.Context, faultType string) bool { log.Infof("Checking fault type: %q is vim.fault.CnsVolumeAlreadyExistsFault", faultType) return faultType == "vim.fault.CnsVolumeAlreadyExistsFault" } + +// IsCnsNotRegisteredFault checks if the fault is CnsNotRegisteredFault +func IsCnsNotRegisteredFault(ctx context.Context, fault *types.LocalizedMethodFault) bool { + log := logger.GetLogger(ctx) + if fault == nil || fault.Fault == nil { + log.Infof("fault is nil or fault.Fault is nil. Not a CnsNotRegisteredFault") + return false + } + if _, ok := fault.Fault.(*cnstypes.CnsNotRegisteredFault); ok { + log.Infof("observed CnsNotRegisteredFault") + return true + } + return false +} diff --git a/pkg/common/utils/utils_test.go b/pkg/common/utils/utils_test.go index 20a7eb1a01..a72f852603 100644 --- a/pkg/common/utils/utils_test.go +++ b/pkg/common/utils/utils_test.go @@ -139,7 +139,7 @@ func getCommonUtilsTest(t *testing.T) *commonUtilsTest { t.Fatal(err) } - volumeManager, err := cnsvolumes.GetManager(ctx, virtualCenter, nil, false, false, false, "") + volumeManager, err := cnsvolumes.GetManager(ctx, virtualCenter, nil, false, false, false, "", "", "") if err != nil { t.Fatalf("failed to create an instance of volume manager. err=%v", err) } diff --git a/pkg/csi/service/common/vsphereutil.go b/pkg/csi/service/common/vsphereutil.go index 3b27e1a754..2e781bf8e9 100644 --- a/pkg/csi/service/common/vsphereutil.go +++ b/pkg/csi/service/common/vsphereutil.go @@ -1225,32 +1225,21 @@ func DeleteSnapshotUtil(ctx context.Context, volumeManager cnsvolume.Manager, cs return cnsSnapshotInfo, nil } -// GetCnsVolumeType is the helper function that determines the volume type based on the volume-id -func GetCnsVolumeType(ctx context.Context, volumeManager cnsvolume.Manager, volumeId string) (string, error) { +// GetCnsVolumeType is the helper function that determines the volume type based on the volume-id prefix. +// If volume ID begins with "file:", it is a file volume, otherwise it is a block volume. +func GetCnsVolumeType(ctx context.Context, volumeId string) string { log := logger.GetLogger(ctx) var volumeType string - queryFilter := cnstypes.CnsQueryFilter{ - VolumeIds: []cnstypes.CnsVolumeId{{Id: volumeId}}, - } - querySelection := cnstypes.CnsQuerySelection{ - Names: []string{ - string(cnstypes.QuerySelectionNameTypeVolumeType), - }, - } - // Select only the volume type. - queryResult, err := volumeManager.QueryAllVolume(ctx, queryFilter, querySelection) - if err != nil { - return "", logger.LogNewErrorCodef(log, codes.Internal, - "queryVolume failed for volumeID: %q with err=%+v", volumeId, err) - } - if len(queryResult.Volumes) == 0 { - log.Infof("volume: %s not found during query while determining CNS volume type", volumeId) - return "", ErrNotFound + // Determine volume type based on volume ID prefix + if strings.HasPrefix(volumeId, "file:") { + volumeType = FileVolumeType + } else { + volumeType = BlockVolumeType } - volumeType = queryResult.Volumes[0].VolumeType - log.Infof("volume: %s is of type: %s", volumeId, volumeType) - return volumeType, nil + + log.Infof("volume: %s is of type: %s (determined from volume ID prefix)", volumeId, volumeType) + return volumeType } // GetNodeVMsWithAccessToDatastore finds out NodeVMs which have access to the given diff --git a/pkg/csi/service/vanilla/controller.go b/pkg/csi/service/vanilla/controller.go index a8a814b038..12eb18529e 100644 --- a/pkg/csi/service/vanilla/controller.go +++ b/pkg/csi/service/vanilla/controller.go @@ -232,7 +232,8 @@ func (c *controller) Init(config *cnsconfig.Config, version string) error { c.managers.VcenterConfigs[vcenterconfig.Host] = vcenterconfig volumeManager, err := cnsvolume.GetManager(ctx, vcenter, operationStore, true, true, - multivCenterTopologyDeployment, cnstypes.CnsClusterFlavorVanilla) + multivCenterTopologyDeployment, cnstypes.CnsClusterFlavorVanilla, config.Global.ClusterID, + config.Global.ClusterDistribution) if err != nil { return logger.LogNewErrorf(log, "failed to create an instance of volume manager. err=%v", err) } @@ -1617,16 +1618,7 @@ func (c *controller) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequ } if cnsVolumeType == common.UnknownVolumeType { - cnsVolumeType, err = common.GetCnsVolumeType(ctx, volumeManager, req.VolumeId) - if err != nil { - if err.Error() == common.ErrNotFound.Error() { - // The volume couldn't be found during query, assuming the delete operation as success - return &csi.DeleteVolumeResponse{}, "", nil - } else { - return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.Internal, - "failed to determine CNS volume type for volume: %q. Error: %+v", req.VolumeId, err) - } - } + cnsVolumeType = common.GetCnsVolumeType(ctx, req.VolumeId) volumeType = convertCnsVolumeType(ctx, cnsVolumeType) } // Check if the volume contains CNS snapshots only for block volumes. diff --git a/pkg/csi/service/vanilla/controller_test.go b/pkg/csi/service/vanilla/controller_test.go index 1ae47a398e..45ac6bf5d5 100644 --- a/pkg/csi/service/vanilla/controller_test.go +++ b/pkg/csi/service/vanilla/controller_test.go @@ -243,7 +243,7 @@ func getControllerTest(t *testing.T) *controllerTest { volumeManager, err := cnsvolume.GetManager(ctx, vcenter, fakeOpStore, true, false, - false, cnstypes.CnsClusterFlavorVanilla) + false, cnstypes.CnsClusterFlavorVanilla, "", "") if err != nil { t.Fatalf("failed to create an instance of volume manager. err=%v", err) } diff --git a/pkg/csi/service/vanilla/controller_topology_test.go b/pkg/csi/service/vanilla/controller_topology_test.go index 2a9cbd23f1..96f34e3041 100644 --- a/pkg/csi/service/vanilla/controller_topology_test.go +++ b/pkg/csi/service/vanilla/controller_topology_test.go @@ -415,7 +415,7 @@ func getControllerTestWithTopology(t *testing.T) *controllerTestTopology { volumeManager, err := cnsvolume.GetManager(ctxtopology, vcenter, fakeOpStore, true, false, - false, cnstypes.CnsClusterFlavorVanilla) + false, cnstypes.CnsClusterFlavorVanilla, "", "") if err != nil { t.Fatalf("failed to create an instance of volume manager. err=%v", err) } diff --git a/pkg/csi/service/wcp/controller.go b/pkg/csi/service/wcp/controller.go index 94dadfa984..28041964a0 100644 --- a/pkg/csi/service/wcp/controller.go +++ b/pkg/csi/service/wcp/controller.go @@ -194,7 +194,7 @@ func (c *controller) Init(config *cnsconfig.Config, version string) error { volumeManager, err := cnsvolume.GetManager(ctx, vcenter, operationStore, idempotencyHandlingEnabled, false, - false, cnstypes.CnsClusterFlavorWorkload) + false, cnstypes.CnsClusterFlavorWorkload, config.Global.SupervisorID, config.Global.ClusterDistribution) if err != nil { return logger.LogNewErrorf(log, "failed to create an instance of volume manager. err=%v", err) } @@ -431,10 +431,9 @@ func (c *controller) ReloadConfiguration(reconnectToVCFromNewConfig bool) error return logger.LogNewErrorf(log, "failed to reset volume manager. err=%v", err) } c.manager.VcenterConfig = newVCConfig - volumeManager, err := cnsvolume.GetManager(ctx, vcenter, operationStore, idempotencyHandlingEnabled, false, - false, cnstypes.CnsClusterFlavorWorkload) + false, cnstypes.CnsClusterFlavorWorkload, cfg.Global.SupervisorID, cfg.Global.ClusterDistribution) if err != nil { return logger.LogNewErrorf(log, "failed to create an instance of volume manager. err=%v", err) } @@ -1726,16 +1725,7 @@ func (c *controller) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequ return nil, csifault.CSIInvalidArgumentFault, err } if cnsVolumeType == common.UnknownVolumeType { - cnsVolumeType, err = common.GetCnsVolumeType(ctx, c.manager.VolumeManager, req.VolumeId) - if err != nil { - if err.Error() == common.ErrNotFound.Error() { - // The volume couldn't be found during query, assuming the delete operation as success - return &csi.DeleteVolumeResponse{}, "", nil - } else { - return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.Internal, - "failed to determine CNS volume type for volume: %q. Error: %+v", req.VolumeId, err) - } - } + cnsVolumeType = common.GetCnsVolumeType(ctx, req.VolumeId) volumeType = convertCnsVolumeType(ctx, cnsVolumeType) } // Check if the volume contains CNS snapshots only for block volumes. @@ -2437,24 +2427,13 @@ func (c *controller) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshot } volumeID := req.GetSourceVolumeId() volumeType = prometheus.PrometheusBlockVolumeType - // Query capacity in MB for block volume snapshot - volumeIds := []cnstypes.CnsVolumeId{{Id: volumeID}} - cnsVolumeDetailsMap, err := utils.QueryVolumeDetailsUtil(ctx, c.manager.VolumeManager, volumeIds) - if err != nil { - return nil, err - } - if _, ok := cnsVolumeDetailsMap[volumeID]; !ok { - return nil, logger.LogNewErrorCodef(log, codes.Internal, - "cns query volume did not return the volume: %s", volumeID) - } - snapshotSizeInMB := cnsVolumeDetailsMap[volumeID].SizeInMB - if cnsVolumeDetailsMap[volumeID].VolumeType != common.BlockVolumeType { + cnsvolumeType := common.GetCnsVolumeType(ctx, volumeID) + if cnsvolumeType != common.BlockVolumeType { return nil, logger.LogNewErrorCodef(log, codes.FailedPrecondition, - "queried volume doesn't have the expected volume type. Expected VolumeType: %v. "+ - "Queried VolumeType: %v", volumeType, cnsVolumeDetailsMap[volumeID].VolumeType) + "Expected VolumeType: %v. "+ + "Observed VolumeType: %v", volumeType, cnsvolumeType) } - // TODO: We may need to add logic to check the limit of max number of snapshots by using // GlobalMaxSnapshotsPerBlockVolume etc. variables in the future. @@ -2462,6 +2441,7 @@ func (c *controller) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshot // sign. That is, a string of "+". Because, all other CNS snapshot APIs still require both // VolumeID and SnapshotID as the input, while corresponding snapshot APIs in upstream CSI require SnapshotID. // So, we need to bridge the gap in vSphere CSI driver and return a combined SnapshotID to CSI Snapshotter. + var err error var snapshotID string var cnsSnapshotInfo *cnsvolume.CnsSnapshotInfo var cnsVolumeInfo *cnsvolumeinfov1alpha1.CNSVolumeInfo @@ -2517,6 +2497,17 @@ func (c *controller) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshot "failed to create snapshot on volume %q with error: %v", volumeID, err) } } + // Query capacity in MB for block volume snapshot + volumeIds := []cnstypes.CnsVolumeId{{Id: volumeID}} + cnsVolumeDetailsMap, err := utils.QueryVolumeDetailsUtil(ctx, c.manager.VolumeManager, volumeIds) + if err != nil { + return nil, err + } + if _, ok := cnsVolumeDetailsMap[volumeID]; !ok { + return nil, logger.LogNewErrorCodef(log, codes.Internal, + "cns query volume did not return the volume: %s", volumeID) + } + snapshotSizeInMB := cnsVolumeDetailsMap[volumeID].SizeInMB snapshotCreateTimeInProto := timestamppb.New(cnsSnapshotInfo.SnapshotLatestOperationCompleteTime) createSnapshotResponse := &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ @@ -2708,11 +2699,7 @@ func (c *controller) ControllerExpandVolume(ctx context.Context, req *csi.Contro // Later we may need to define different csi faults. // Check if the volume contains CNS snapshots only for block volumes. if cnsVolumeType == common.UnknownVolumeType { - cnsVolumeType, err = common.GetCnsVolumeType(ctx, c.manager.VolumeManager, req.VolumeId) - if err != nil { - return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.Internal, - "failed to determine CNS volume type for volume: %q. Error: %+v", req.VolumeId, err) - } + cnsVolumeType = common.GetCnsVolumeType(ctx, req.VolumeId) volumeType = convertCnsVolumeType(ctx, cnsVolumeType) } if cnsVolumeType == common.BlockVolumeType && diff --git a/pkg/csi/service/wcp/controller_test.go b/pkg/csi/service/wcp/controller_test.go index 679ab5df90..325476e1ba 100644 --- a/pkg/csi/service/wcp/controller_test.go +++ b/pkg/csi/service/wcp/controller_test.go @@ -117,7 +117,7 @@ func getControllerTest(t *testing.T) *controllerTest { volumeManager, err := cnsvolume.GetManager(ctx, vcenter, fakeOpStore, true, false, - false, cnstypes.CnsClusterFlavorWorkload) + false, cnstypes.CnsClusterFlavorWorkload, "", "") if err != nil { t.Fatalf("failed to create an instance of volume manager. err=%v", err) } diff --git a/pkg/syncer/byokoperator/manager.go b/pkg/syncer/byokoperator/manager.go index acd020be43..a63e8234e3 100644 --- a/pkg/syncer/byokoperator/manager.go +++ b/pkg/syncer/byokoperator/manager.go @@ -76,7 +76,8 @@ func NewManager( return nil, err } - volumeManager, err := volume.GetManager(ctx, vcClient, nil, false, false, false, clusterFlavor) + volumeManager, err := volume.GetManager(ctx, vcClient, nil, false, + false, false, clusterFlavor, configInfo.Cfg.Global.SupervisorID, configInfo.Cfg.Global.ClusterDistribution) if err != nil { return nil, fmt.Errorf("failed to create an instance of volume manager: %w", err) } diff --git a/pkg/syncer/cnsoperator/controller/cnsnodevmbatchattachment/cnsnodevmbatchattachment_controller.go b/pkg/syncer/cnsoperator/controller/cnsnodevmbatchattachment/cnsnodevmbatchattachment_controller.go index 94f25a3947..67c486374f 100644 --- a/pkg/syncer/cnsoperator/controller/cnsnodevmbatchattachment/cnsnodevmbatchattachment_controller.go +++ b/pkg/syncer/cnsoperator/controller/cnsnodevmbatchattachment/cnsnodevmbatchattachment_controller.go @@ -525,6 +525,12 @@ func removeFinalizerAndStatusEntry(ctx context.Context, client client.Client, k8 // processBatchAttach first constructs the batch attach volume request for all volumes in instance spec // and then calls CNS batch attach for them. +// TODO: When QueryBackingTypeFromVirtualDiskInfo is used to query backing type for volumes +// (from PR kubernetes-sigs/vsphere-csi-driver#3799), handle the case where a volume is not +// registered with CNS. If the query returns no results because the volume is unregistered +// (CnsNotRegisteredFault scenario), we need to re-register the volume using reRegisterVolume() +// and retry the query. Without this handling, unregistered volumes will fail to attach because +// QueryBackingTypeFromVirtualDiskInfo will return "no volume found" error. func (r *Reconciler) processBatchAttach(ctx context.Context, k8sClient kubernetes.Interface, vm *cnsvsphere.VirtualMachine, instance *v1alpha1.CnsNodeVMBatchAttachment, diff --git a/pkg/syncer/cnsoperator/manager/init.go b/pkg/syncer/cnsoperator/manager/init.go index 841813ddfb..415afcdd93 100644 --- a/pkg/syncer/cnsoperator/manager/init.go +++ b/pkg/syncer/cnsoperator/manager/init.go @@ -89,7 +89,14 @@ func InitCnsOperator(ctx context.Context, clusterFlavor cnstypes.CnsClusterFlavo return err } - volumeManager, err = volumes.GetManager(ctx, vCenter, nil, false, false, false, clusterFlavor) + var clusterId string + if clusterFlavor == cnstypes.CnsClusterFlavorWorkload { + clusterId = cnsOperator.configInfo.Cfg.Global.SupervisorID + } else { + clusterId = cnsOperator.configInfo.Cfg.Global.ClusterID + } + volumeManager, err = volumes.GetManager(ctx, vCenter, nil, false, false, false, + clusterFlavor, clusterId, cnsOperator.configInfo.Cfg.Global.ClusterDistribution) if err != nil { return logger.LogNewErrorf(log, "failed to create an instance of volume manager. err=%v", err) } diff --git a/pkg/syncer/metadatasyncer.go b/pkg/syncer/metadatasyncer.go index c77b08cf35..10bb38c11a 100644 --- a/pkg/syncer/metadatasyncer.go +++ b/pkg/syncer/metadatasyncer.go @@ -437,7 +437,8 @@ func InitMetadataSyncer(ctx context.Context, clusterFlavor cnstypes.CnsClusterFl volumeOperationsLock[metadataSyncer.host] = &sync.Mutex{} volumeManager, err := volumes.GetManager(ctx, vCenter, - nil, false, false, false, metadataSyncer.clusterFlavor) + nil, false, false, false, + metadataSyncer.clusterFlavor, configInfo.Cfg.Global.SupervisorID, configInfo.Cfg.Global.ClusterDistribution) if err != nil { return logger.LogNewErrorf(log, "failed to create an instance of volume manager. err=%v", err) } @@ -508,7 +509,8 @@ func InitMetadataSyncer(ctx context.Context, clusterFlavor cnstypes.CnsClusterFl } volumeManager, err := volumes.GetManager(ctx, vCenter, nil, false, true, - multivCenterTopologyDeployment, metadataSyncer.clusterFlavor) + multivCenterTopologyDeployment, metadataSyncer.clusterFlavor, configInfo.Cfg.Global.ClusterID, + configInfo.Cfg.Global.ClusterDistribution) if err != nil { return logger.LogNewErrorf(log, "failed to create an instance of volume manager. err=%v", err) } @@ -2407,7 +2409,7 @@ func ReloadConfiguration(metadataSyncer *metadataSyncInformer, reconnectToVCFrom return logger.LogNewErrorf(log, "failed to reset volume manager. err=%v", err) } volumeManager, err := volumes.GetManager(ctx, vcenter, nil, false, false, false, - metadataSyncer.clusterFlavor) + metadataSyncer.clusterFlavor, cfg.Global.SupervisorID, cfg.Global.ClusterDistribution) if err != nil { return logger.LogNewErrorf(log, "failed to create an instance of volume manager. err=%v", err) } diff --git a/pkg/syncer/storagepool/diskDecommissionController.go b/pkg/syncer/storagepool/diskDecommissionController.go index 6a8c303ad8..cd11c8e3ce 100644 --- a/pkg/syncer/storagepool/diskDecommissionController.go +++ b/pkg/syncer/storagepool/diskDecommissionController.go @@ -103,7 +103,13 @@ func (w *DiskDecommController) detachVolumes(ctx context.Context, storagePoolNam if err != nil { return logger.LogNewErrorf(log, "failed to get cluster flavor. Error: %v", err) } - volManager, err := volume.GetManager(ctx, &vc, nil, false, false, false, clusterFlavor) + cfg, err := config.GetConfig(ctx) + if err != nil { + return logger.LogNewErrorf(log, "failed to get config. Error: %v", err) + } + volManager, err := volume.GetManager(ctx, &vc, nil, false, + false, false, + clusterFlavor, cfg.Global.SupervisorID, cfg.Global.ClusterDistribution) if err != nil { return logger.LogNewErrorf(log, "failed to create an instance of volume manager. err=%v", err) } diff --git a/pkg/syncer/storagepool/migrationController.go b/pkg/syncer/storagepool/migrationController.go index bc15f408c8..6ec24561eb 100644 --- a/pkg/syncer/storagepool/migrationController.go +++ b/pkg/syncer/storagepool/migrationController.go @@ -89,7 +89,12 @@ func (m *migrationController) relocateCNSVolume(ctx context.Context, volumeID st if err != nil { return logger.LogNewErrorf(log, "failed to get cluster flavor. Error: %v", err) } - volManager, err := volume.GetManager(ctx, m.vc, nil, false, false, false, clusterFlavor) + cfg, err := config.GetConfig(ctx) + if err != nil { + return logger.LogNewErrorf(log, "failed to get config. Error: %v", err) + } + volManager, err := volume.GetManager(ctx, m.vc, nil, false, false, false, + clusterFlavor, cfg.Global.SupervisorID, cfg.Global.ClusterDistribution) if err != nil { return logger.LogNewErrorf(log, "failed to create an instance of volume manager. err=%v", err) } diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index d052b2d06d..94f9923830 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -144,7 +144,7 @@ func TestSyncerWorkflows(t *testing.T) { } }() - volumeManager, err = cnsvolumes.GetManager(ctx, virtualCenter, nil, false, false, false, "") + volumeManager, err = cnsvolumes.GetManager(ctx, virtualCenter, nil, false, false, false, "", "", "") if err != nil { t.Fatalf("failed to create an instance of volume manager. err=%v", err) }