@@ -98,10 +98,23 @@ var (
9898 vmMoidToHostMoid , volumeIDToVMMap map [string ]string
9999)
100100
101+ // volumeLock represents a lock for a specific volume with reference counting
102+ type volumeLock struct {
103+ mutex sync.Mutex
104+ refCount int
105+ }
106+
107+ // snapshotLockManager manages per-volume locks for snapshot operations
108+ type snapshotLockManager struct {
109+ locks map [string ]* volumeLock
110+ mapMutex sync.RWMutex
111+ }
112+
101113type controller struct {
102- manager * common.Manager
103- authMgr common.AuthorizationService
104- topologyMgr commoncotypes.ControllerTopologyService
114+ manager * common.Manager
115+ authMgr common.AuthorizationService
116+ topologyMgr commoncotypes.ControllerTopologyService
117+ snapshotLockMgr * snapshotLockManager
105118 csi.UnimplementedControllerServer
106119}
107120
@@ -216,6 +229,12 @@ func (c *controller) Init(config *cnsconfig.Config, version string) error {
216229 CryptoClient : cryptoClient ,
217230 }
218231
232+ // Initialize snapshot lock manager
233+ c .snapshotLockMgr = & snapshotLockManager {
234+ locks : make (map [string ]* volumeLock ),
235+ }
236+ log .Info ("Initialized snapshot lock manager for per-volume serialization" )
237+
219238 vc , err := common .GetVCenter (ctx , c .manager )
220239 if err != nil {
221240 log .Errorf ("failed to get vcenter. err=%v" , err )
@@ -452,6 +471,53 @@ func (c *controller) ReloadConfiguration(reconnectToVCFromNewConfig bool) error
452471 return nil
453472}
454473
474+ // acquireSnapshotLock acquires a lock for the given volume ID.
475+ // It creates a new lock if one doesn't exist and increments the reference count.
476+ // The caller must call releaseSnapshotLock when done.
477+ func (c * controller ) acquireSnapshotLock (ctx context.Context , volumeID string ) {
478+ log := logger .GetLogger (ctx )
479+ c .snapshotLockMgr .mapMutex .Lock ()
480+ defer c .snapshotLockMgr .mapMutex .Unlock ()
481+
482+ vLock , exists := c .snapshotLockMgr .locks [volumeID ]
483+ if ! exists {
484+ vLock = & volumeLock {}
485+ c .snapshotLockMgr .locks [volumeID ] = vLock
486+ log .Debugf ("Created new lock for volume %q" , volumeID )
487+ }
488+ vLock .refCount ++
489+ log .Debugf ("Acquired lock for volume %q, refCount: %d" , volumeID , vLock .refCount )
490+
491+ // Unlock the map before acquiring the volume lock to avoid deadlock
492+ c .snapshotLockMgr .mapMutex .Unlock ()
493+ vLock .mutex .Lock ()
494+ c .snapshotLockMgr .mapMutex .Lock ()
495+ }
496+
497+ // releaseSnapshotLock releases the lock for the given volume ID.
498+ // It decrements the reference count and removes the lock if count reaches zero.
499+ func (c * controller ) releaseSnapshotLock (ctx context.Context , volumeID string ) {
500+ log := logger .GetLogger (ctx )
501+ c .snapshotLockMgr .mapMutex .Lock ()
502+ defer c .snapshotLockMgr .mapMutex .Unlock ()
503+
504+ vLock , exists := c .snapshotLockMgr .locks [volumeID ]
505+ if ! exists {
506+ log .Warnf ("Attempted to release non-existent lock for volume %q" , volumeID )
507+ return
508+ }
509+
510+ vLock .mutex .Unlock ()
511+ vLock .refCount --
512+ log .Debugf ("Released lock for volume %q, refCount: %d" , volumeID , vLock .refCount )
513+
514+ // Clean up the lock if reference count reaches zero
515+ if vLock .refCount == 0 {
516+ delete (c .snapshotLockMgr .locks , volumeID )
517+ log .Debugf ("Cleaned up lock for volume %q" , volumeID )
518+ }
519+ }
520+
455521// createBlockVolume creates a block volume based on the CreateVolumeRequest.
456522func (c * controller ) createBlockVolume (ctx context.Context , req * csi.CreateVolumeRequest ,
457523 isWorkloadDomainIsolationEnabled bool , clusterMoIds []string ) (
@@ -2451,8 +2517,43 @@ func (c *controller) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshot
24512517 "Queried VolumeType: %v" , volumeType , cnsVolumeDetailsMap [volumeID ].VolumeType )
24522518 }
24532519
2454- // TODO: We may need to add logic to check the limit of max number of snapshots by using
2455- // GlobalMaxSnapshotsPerBlockVolume etc. variables in the future.
2520+ // Extract namespace from request parameters
2521+ volumeSnapshotNamespace := req .Parameters [common .VolumeSnapshotNamespaceKey ]
2522+ if volumeSnapshotNamespace == "" {
2523+ return nil , logger .LogNewErrorCodef (log , codes .Internal ,
2524+ "volumesnapshot namespace is not set in the request parameters" )
2525+ }
2526+
2527+ // Get snapshot limit from namespace ConfigMap
2528+ snapshotLimit , err := getSnapshotLimitForNamespace (ctx , volumeSnapshotNamespace )
2529+ if err != nil {
2530+ return nil , logger .LogNewErrorCodef (log , codes .Internal ,
2531+ "failed to get snapshot limit for namespace %q: %v" , volumeSnapshotNamespace , err )
2532+ }
2533+ log .Infof ("Snapshot limit for namespace %q is set to %d" , volumeSnapshotNamespace , snapshotLimit )
2534+
2535+ // Acquire lock for this volume to serialize snapshot operations
2536+ c .acquireSnapshotLock (ctx , volumeID )
2537+ defer c .releaseSnapshotLock (ctx , volumeID )
2538+
2539+ // Query existing snapshots for this volume
2540+ snapshotList , _ , err := common .QueryVolumeSnapshotsByVolumeID (ctx , c .manager .VolumeManager , volumeID ,
2541+ common .QuerySnapshotLimit )
2542+ if err != nil {
2543+ return nil , logger .LogNewErrorCodef (log , codes .Internal ,
2544+ "failed to query snapshots for volume %q: %v" , volumeID , err )
2545+ }
2546+
2547+ // Check if the limit is exceeded
2548+ currentSnapshotCount := len (snapshotList )
2549+ if currentSnapshotCount >= snapshotLimit {
2550+ return nil , logger .LogNewErrorCodef (log , codes .FailedPrecondition ,
2551+ "the number of snapshots (%d) on the source volume %s has reached or exceeded " +
2552+ "the configured maximum (%d) for namespace %s" ,
2553+ currentSnapshotCount , volumeID , snapshotLimit , volumeSnapshotNamespace )
2554+ }
2555+ log .Infof ("Current snapshot count for volume %q is %d, within limit of %d" ,
2556+ volumeID , currentSnapshotCount , snapshotLimit )
24562557
24572558 // the returned snapshotID below is a combination of CNS VolumeID and CNS SnapshotID concatenated by the "+"
24582559 // sign. That is, a string of "<UUID>+<UUID>". Because, all other CNS snapshot APIs still require both
@@ -2530,7 +2631,6 @@ func (c *controller) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshot
25302631 cnsSnapshotInfo .SnapshotLatestOperationCompleteTime , createSnapshotResponse )
25312632
25322633 volumeSnapshotName := req .Parameters [common .VolumeSnapshotNameKey ]
2533- volumeSnapshotNamespace := req .Parameters [common .VolumeSnapshotNamespaceKey ]
25342634 log .Infof ("Attempting to annotate volumesnapshot %s/%s with annotation %s:%s" ,
25352635 volumeSnapshotNamespace , volumeSnapshotName , common .VolumeSnapshotInfoKey , snapshotID )
25362636 annotated , err := commonco .ContainerOrchestratorUtility .AnnotateVolumeSnapshot (ctx , volumeSnapshotName ,
0 commit comments