diff --git a/controllers/zookeepercluster_controller.go b/controllers/zookeepercluster_controller.go index ea77e519..d84e3b43 100644 --- a/controllers/zookeepercluster_controller.go +++ b/controllers/zookeepercluster_controller.go @@ -238,6 +238,20 @@ func (r *ZookeeperClusterReconciler) reconcileStatefulSet(instance *zookeeperv1b } else if err != nil { return err } else { + // check whether orphans PVCs need to be deleted before updating the sts + if instance.Spec.Persistence != nil && + instance.Spec.Persistence.VolumeReclaimPolicy == zookeeperv1beta1.VolumeReclaimPolicyDelete { + pvcCount, err := r.getPVCCount(instance) + if err != nil { + return err + } + r.Log.Info("PVC count", "count", pvcCount, "replicas", foundSts.Status.Replicas, "cr replicas", instance.Spec.Replicas) + if pvcCount > int(foundSts.Status.Replicas) { + r.Log.Info("Deleting PVCs", "count", pvcCount, "replicas", instance.Status.Replicas) + return nil + } + } + // check whether zookeeperCluster is updated before updating the sts cmp := compareResourceVersion(instance, foundSts) if cmp < 0 { @@ -285,8 +299,6 @@ func (r *ZookeeperClusterReconciler) updateStatefulSet(instance *zookeeperv1beta if err != nil { return err } - instance.Status.Replicas = foundSts.Status.Replicas - instance.Status.ReadyReplicas = foundSts.Status.ReadyReplicas return nil } @@ -585,6 +597,15 @@ func (r *ZookeeperClusterReconciler) reconcileClusterStatus(instance *zookeeperv instance.Status.Members.Ready = readyMembers instance.Status.Members.Unready = unreadyMembers + foundSts := &appsv1.StatefulSet{} + err = r.Client.Get(context.TODO(), types.NamespacedName{ + Name: instance.GetName(), + Namespace: instance.Namespace, + }, foundSts) + + instance.Status.Replicas = foundSts.Status.Replicas + instance.Status.ReadyReplicas = foundSts.Status.ReadyReplicas + // If Cluster is in a ready state... if instance.Spec.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) { r.Log.Info("Cluster is Ready, Creating ZK Metadata...") @@ -734,21 +755,38 @@ func (r *ZookeeperClusterReconciler) getPVCCount(instance *zookeeperv1beta1.Zook } func (r *ZookeeperClusterReconciler) cleanupOrphanPVCs(instance *zookeeperv1beta1.ZookeeperCluster) (err error) { + // get the up to date STS + foundSts := &appsv1.StatefulSet{} + err = r.Client.Get(context.TODO(), types.NamespacedName{ + Name: instance.GetName(), + Namespace: instance.Namespace, + }, foundSts) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + // this check should make sure we do not delete the PVCs before the STS has scaled down - if instance.Status.ReadyReplicas == instance.Spec.Replicas { + if foundSts.Status.ReadyReplicas == foundSts.Status.Replicas { pvcCount, err := r.getPVCCount(instance) if err != nil { return err } - r.Log.Info("cleanupOrphanPVCs", "PVC Count", pvcCount, "ReadyReplicas Count", instance.Status.ReadyReplicas) - if pvcCount > int(instance.Spec.Replicas) { + + r.Log.Info("cleanupOrphanPVCs", + "PVC Count", pvcCount, + "Replicas Count", foundSts.Spec.Replicas) + if pvcCount > int(*foundSts.Spec.Replicas) { pvcList, err := r.getPVCList(instance) if err != nil { return err } for _, pvcItem := range pvcList.Items { // delete only Orphan PVCs - if utils.IsPVCOrphan(pvcItem.Name, instance.Spec.Replicas) { + if utils.IsPVCOrphan(pvcItem.Name, *foundSts.Spec.Replicas) { + r.Log.Info("cleanupOrphanPVCs", "Deleting Orphan PVC", pvcItem.Name) r.deletePVC(pvcItem) } } diff --git a/controllers/zookeepercluster_controller_test.go b/controllers/zookeepercluster_controller_test.go index 1b22e612..91e5e672 100644 --- a/controllers/zookeepercluster_controller_test.go +++ b/controllers/zookeepercluster_controller_test.go @@ -20,10 +20,12 @@ import ( corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/pravega/zookeeper-operator/api/v1beta1" @@ -811,5 +813,57 @@ var _ = Describe("ZookeeperCluster Controller", func() { Ω(oldRestartValue).NotTo(Equal(newRestartValue)) }) }) + + Context("orphaned PVCs and reclaim policy is delete", func() { + var ( + cl client.Client + err error + ) + + BeforeEach(func() { + z.WithDefaults() + z.UID = "test-uid" + z.Spec.Replicas = 1 + z.Spec.Persistence.VolumeReclaimPolicy = "Delete" + pvc_0 := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "data-zookeeper-0", + Namespace: Namespace, + Labels: map[string]string{ + "app": z.GetName(), + "uid": "test-uid", + }, + }, + } + pvc_1 := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "data-zookeeper-1", + Namespace: Namespace, + Labels: map[string]string{ + "app": z.GetName(), + "uid": "test-uid", + }, + }, + } + sts := zk.MakeStatefulSet(z) + sts.Status.ReadyReplicas = 1 + sts.Status.Replicas = 1 + cl = fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects([]runtime.Object{z, &pvc_0, &pvc_1, sts}...).Build() + r = &ZookeeperClusterReconciler{Client: cl, Scheme: s, ZkClient: mockZkClient, Log: logf.Log.WithName("Test")} + }) + + It("should requeue if there is still orphaned pvc", func() { + err = r.reconcileStatefulSet(z) + Ω(err).To(BeNil()) + }) + + It("should delete orphaned PVC", func() { + err = r.cleanupOrphanPVCs(z) + Ω(err).To(BeNil()) + pvcList, err := r.getPVCList(z) + Ω(err).To(BeNil()) + Ω(pvcList.Items).To(HaveLen(1)) + }) + }) }) }) diff --git a/test/e2e/scale_test.go b/test/e2e/scale_test.go index eb5d73ef..8de65dd4 100644 --- a/test/e2e/scale_test.go +++ b/test/e2e/scale_test.go @@ -68,4 +68,48 @@ var _ = Describe("Perform scale for cluster upgrade", func() { Expect(zk_e2eutil.WaitForClusterToTerminate(logger, k8sClient, zk)).NotTo(HaveOccurred()) }) }) + + Context("Scale down and up", func() { + It("should wait for orphan PVCs cleaned before scaling up", func() { + defaultCluster := zk_e2eutil.NewDefaultCluster(testNamespace) + defaultCluster.WithDefaults() + + defaultCluster.Status.Init() + defaultCluster.Spec.Persistence.VolumeReclaimPolicy = "Delete" + + zk, err := zk_e2eutil.CreateCluster(logger, k8sClient, defaultCluster) + + Expect(err).NotTo(HaveOccurred()) + + // A default zk cluster should have 3 pods + podSize := 3 + Expect(zk_e2eutil.WaitForClusterToBecomeReady(logger, k8sClient, zk, podSize)).NotTo(HaveOccurred()) + + // This is to get the latest zk cluster object + zk, err = zk_e2eutil.GetCluster(logger, k8sClient, zk) + Expect(err).NotTo(HaveOccurred()) + + // Scale down zk cluster, decrease replicas to 1 + zk.Spec.Replicas = 1 + podSize = 1 + Expect(zk_e2eutil.UpdateCluster(logger, k8sClient, zk)).NotTo(HaveOccurred()) + + Expect(zk_e2eutil.WaitForClusterToBecomeReady(logger, k8sClient, zk, podSize)).NotTo(HaveOccurred()) + + zk, err = zk_e2eutil.GetCluster(logger, k8sClient, zk) + Expect(err).NotTo(HaveOccurred()) + + // Scale up zk cluster to 3 again, before the PVCs are cleaned up + zk.Spec.Replicas = 3 + podSize = 3 + Expect(zk_e2eutil.UpdateCluster(logger, k8sClient, zk)).NotTo(HaveOccurred()) + + Expect(zk_e2eutil.WaitForClusterToBecomeReady(logger, k8sClient, zk, podSize)).NotTo(HaveOccurred()) + + // Delete cluster + Expect(zk_e2eutil.DeleteCluster(logger, k8sClient, zk)).NotTo(HaveOccurred()) + + Expect(zk_e2eutil.WaitForClusterToTerminate(logger, k8sClient, zk)).NotTo(HaveOccurred()) + }) + }) })