@@ -25,6 +25,8 @@ import (
2525 k8sapi "github.com/openebs/lib-csi/pkg/client/k8s"
2626 clientset "github.com/openebs/lvm-localpv/pkg/generated/clientset/internalclientset"
2727 informers "github.com/openebs/lvm-localpv/pkg/generated/informer/externalversions"
28+ "github.com/openebs/lvm-localpv/pkg/mgmt/csipv"
29+ corev1 "k8s.io/api/core/v1"
2830 kubeinformers "k8s.io/client-go/informers"
2931 "k8s.io/client-go/kubernetes"
3032 "sigs.k8s.io/controller-runtime/pkg/manager/signals"
@@ -67,6 +69,8 @@ type controller struct {
6769
6870 k8sNodeInformer cache.SharedIndexInformer
6971 lvmNodeInformer cache.SharedIndexInformer
72+
73+ leakProtection * csipv.LeakProtectionController
7074}
7175
7276// NewController returns a new instance
@@ -210,6 +214,22 @@ func (cs *controller) init() error {
210214 cs .k8sNodeInformer .HasSynced ,
211215 cs .lvmNodeInformer .HasSynced )
212216 klog .Info ("synced k8s & lvm node informer caches" )
217+
218+ klog .Infof ("initializing csi provisioning leak protection controller" )
219+ pvcInformer := kubeInformerFactory .Core ().V1 ().PersistentVolumeClaims ()
220+ go pvcInformer .Informer ().Run (stopCh )
221+ if cs .leakProtection , err = csipv .NewLeakProtectionController (kubeClient ,
222+ pvcInformer , cs .driver .config .DriverName ,
223+ func (pvc * corev1.PersistentVolumeClaim , volumeName string ) error {
224+ // use default timeout of 10s for deletion.
225+ ctx , cancelCtx := context .WithTimeout (context .Background (), 10 * time .Second )
226+ defer cancelCtx ()
227+ return cs .deleteVolume (ctx , volumeName )
228+ },
229+ ); err != nil {
230+ return errors .Wrap (err , "failed to init leak protection controller" )
231+ }
232+ go cs .leakProtection .Run (2 , stopCh )
213233 return nil
214234}
215235
@@ -312,6 +332,15 @@ func (cs *controller) CreateVolume(
312332 } else if contentSource != nil && contentSource .GetVolume () != nil {
313333 return nil , status .Error (codes .Unimplemented , "" )
314334 } else {
335+ // mark volume for leak protection if pvc gets deleted
336+ // before the creation of pv.
337+ var finishCreateVolume func ()
338+ if finishCreateVolume , err = cs .leakProtection .BeginCreateVolume (volName ,
339+ params .PVCNamespace , params .PVCName ); err != nil {
340+ return nil , err
341+ }
342+ defer finishCreateVolume ()
343+
315344 vol , err = CreateLVMVolume (ctx , req , params )
316345 }
317346
@@ -339,49 +368,41 @@ func (cs *controller) DeleteVolume(
339368 ctx context.Context ,
340369 req * csi.DeleteVolumeRequest ) (* csi.DeleteVolumeResponse , error ) {
341370
342- klog .Infof ("received request to delete volume {%s}" , req .VolumeId )
343-
344- var (
345- err error
346- )
347-
371+ var err error
348372 if err = cs .validateDeleteVolumeReq (req ); err != nil {
349373 return nil , err
350374 }
351-
352375 volumeID := strings .ToLower (req .GetVolumeId ())
353-
354- // verify if the volume has already been deleted
355- vol , err := lvm .GetLVMVolume (volumeID )
356- if vol != nil && vol .DeletionTimestamp != nil {
357- goto deleteResponse
376+ if err = cs .deleteVolume (ctx , volumeID ); err != nil {
377+ return nil , err
358378 }
379+ return csipayload .NewDeleteVolumeResponseBuilder ().Build (), nil
380+ }
359381
382+ func (cs * controller ) deleteVolume (ctx context.Context , volumeID string ) error {
383+ klog .Infof ("received request to delete volume %q" , volumeID )
384+ vol , err := lvm .GetLVMVolume (volumeID )
360385 if err != nil {
361386 if k8serror .IsNotFound (err ) {
362- goto deleteResponse
387+ return nil
363388 }
364- return nil , errors .Wrapf (
365- err ,
366- "failed to get volume for {%s}" ,
367- volumeID ,
368- )
389+ return errors .Wrapf (err ,
390+ "failed to get volume for {%s}" , volumeID )
369391 }
370392
371- // Delete the corresponding ZV CR
372- err = lvm .DeleteVolume (volumeID )
373- if err != nil {
374- return nil , errors .Wrapf (
375- err ,
376- "failed to handle delete volume request for {%s}" ,
377- volumeID ,
378- )
393+ // if volume is not already triggered for deletion, delete the volume.
394+ // otherwise, just wait for the existing deletion operation to complete.
395+ if vol .GetDeletionTimestamp () == nil {
396+ if err = lvm .DeleteVolume (volumeID ); err != nil {
397+ return errors .Wrapf (err ,
398+ "failed to handle delete volume request for {%s}" , volumeID )
399+ }
400+ }
401+ if err = lvm .WaitForLVMVolumeDestroy (ctx , volumeID ); err != nil {
402+ return err
379403 }
380-
381404 sendEventOrIgnore ("" , volumeID , vol .Spec .Capacity , "lvm-localpv" , analytics .VolumeDeprovision )
382-
383- deleteResponse:
384- return csipayload .NewDeleteVolumeResponseBuilder ().Build (), nil
405+ return nil
385406}
386407
387408func isValidVolumeCapabilities (volCaps []* csi.VolumeCapability ) bool {
0 commit comments