Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check orphan PVC before updating statefulSet #526

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions controllers/zookeepercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@
} else if err != nil {
return err
} else {
// check whether orphans PVCs need to be deleted before updating the sts
if instance.Spec.Persistence != nil &&
instance.Spec.Persistence.VolumeReclaimPolicy == zookeeperv1beta1.VolumeReclaimPolicyDelete {
pvcCount, err := r.getPVCCount(instance)
if err != nil {
return err

Check warning on line 246 in controllers/zookeepercluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/zookeepercluster_controller.go#L246

Added line #L246 was not covered by tests
}
r.Log.Info("PVC count", "count", pvcCount, "replicas", foundSts.Status.Replicas, "cr replicas", instance.Spec.Replicas)
if pvcCount > int(foundSts.Status.Replicas) {
r.Log.Info("Deleting PVCs", "count", pvcCount, "replicas", instance.Status.Replicas)
return nil
}
}

// check whether zookeeperCluster is updated before updating the sts
cmp := compareResourceVersion(instance, foundSts)
if cmp < 0 {
Expand Down Expand Up @@ -285,8 +299,6 @@
if err != nil {
return err
}
instance.Status.Replicas = foundSts.Status.Replicas
instance.Status.ReadyReplicas = foundSts.Status.ReadyReplicas
return nil
}

Expand Down Expand Up @@ -585,6 +597,15 @@
instance.Status.Members.Ready = readyMembers
instance.Status.Members.Unready = unreadyMembers

foundSts := &appsv1.StatefulSet{}
err = r.Client.Get(context.TODO(), types.NamespacedName{
Name: instance.GetName(),
Namespace: instance.Namespace,
}, foundSts)

instance.Status.Replicas = foundSts.Status.Replicas
instance.Status.ReadyReplicas = foundSts.Status.ReadyReplicas

// If Cluster is in a ready state...
if instance.Spec.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) {
r.Log.Info("Cluster is Ready, Creating ZK Metadata...")
Expand Down Expand Up @@ -734,21 +755,38 @@
}

func (r *ZookeeperClusterReconciler) cleanupOrphanPVCs(instance *zookeeperv1beta1.ZookeeperCluster) (err error) {
// get the up to date STS
foundSts := &appsv1.StatefulSet{}
err = r.Client.Get(context.TODO(), types.NamespacedName{
Name: instance.GetName(),
Namespace: instance.Namespace,
}, foundSts)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err

Check warning on line 768 in controllers/zookeepercluster_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/zookeepercluster_controller.go#L768

Added line #L768 was not covered by tests
}

// this check should make sure we do not delete the PVCs before the STS has scaled down
if instance.Status.ReadyReplicas == instance.Spec.Replicas {
if foundSts.Status.ReadyReplicas == foundSts.Status.Replicas {
pvcCount, err := r.getPVCCount(instance)
if err != nil {
return err
}
r.Log.Info("cleanupOrphanPVCs", "PVC Count", pvcCount, "ReadyReplicas Count", instance.Status.ReadyReplicas)
if pvcCount > int(instance.Spec.Replicas) {

r.Log.Info("cleanupOrphanPVCs",
"PVC Count", pvcCount,
"Replicas Count", foundSts.Spec.Replicas)
if pvcCount > int(*foundSts.Spec.Replicas) {
pvcList, err := r.getPVCList(instance)
if err != nil {
return err
}
for _, pvcItem := range pvcList.Items {
// delete only Orphan PVCs
if utils.IsPVCOrphan(pvcItem.Name, instance.Spec.Replicas) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hoyhbx why are we deleting pvcs based on Sts replicas. Operator is looking for zookeeper cluster resource. Is there any issue are you seeing if we delete based on instance.Spec.Replicas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to handle the race condition, where the operator has not yet deleted the orphan pvcs after scaling down, but the user scale the cluster back up. In that case, if instance.Spec.Replicas is used to delete old pvcs, the old pvcs will never get deleted.

Just as the added e2e test, when scaling down from 3 to 1, two pods are deleted, and statefulSet's replica gets down to 1. Then it takes sometime for the operator to delete the orphan pvcs. But before operator is able to delete the orphan pvcs, user scales up from 1 to 3, changing the instance.Spec.Replicas to 3. Then the old PVCs will never be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @anishakj , does the above explanation make sense to you?
The problem is basically because there is a race condition when deleting the PVC and user upscaling

if utils.IsPVCOrphan(pvcItem.Name, *foundSts.Spec.Replicas) {
r.Log.Info("cleanupOrphanPVCs", "Deleting Orphan PVC", pvcItem.Name)
r.deletePVC(pvcItem)
}
}
Expand Down
54 changes: 54 additions & 0 deletions controllers/zookeepercluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/pravega/zookeeper-operator/api/v1beta1"
Expand Down Expand Up @@ -811,5 +813,57 @@ var _ = Describe("ZookeeperCluster Controller", func() {
Ω(oldRestartValue).NotTo(Equal(newRestartValue))
})
})

Context("orphaned PVCs and reclaim policy is delete", func() {
var (
cl client.Client
err error
)

BeforeEach(func() {
z.WithDefaults()
z.UID = "test-uid"
z.Spec.Replicas = 1
z.Spec.Persistence.VolumeReclaimPolicy = "Delete"
pvc_0 := corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "data-zookeeper-0",
Namespace: Namespace,
Labels: map[string]string{
"app": z.GetName(),
"uid": "test-uid",
},
},
}
pvc_1 := corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "data-zookeeper-1",
Namespace: Namespace,
Labels: map[string]string{
"app": z.GetName(),
"uid": "test-uid",
},
},
}
sts := zk.MakeStatefulSet(z)
sts.Status.ReadyReplicas = 1
sts.Status.Replicas = 1
cl = fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects([]runtime.Object{z, &pvc_0, &pvc_1, sts}...).Build()
r = &ZookeeperClusterReconciler{Client: cl, Scheme: s, ZkClient: mockZkClient, Log: logf.Log.WithName("Test")}
})

It("should requeue if there is still orphaned pvc", func() {
err = r.reconcileStatefulSet(z)
Ω(err).To(BeNil())
})

It("should delete orphaned PVC", func() {
err = r.cleanupOrphanPVCs(z)
Ω(err).To(BeNil())
pvcList, err := r.getPVCList(z)
Ω(err).To(BeNil())
Ω(pvcList.Items).To(HaveLen(1))
})
})
})
})
44 changes: 44 additions & 0 deletions test/e2e/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,48 @@ var _ = Describe("Perform scale for cluster upgrade", func() {
Expect(zk_e2eutil.WaitForClusterToTerminate(logger, k8sClient, zk)).NotTo(HaveOccurred())
})
})

Context("Scale down and up", func() {
It("should wait for orphan PVCs cleaned before scaling up", func() {
defaultCluster := zk_e2eutil.NewDefaultCluster(testNamespace)
defaultCluster.WithDefaults()

defaultCluster.Status.Init()
defaultCluster.Spec.Persistence.VolumeReclaimPolicy = "Delete"

zk, err := zk_e2eutil.CreateCluster(logger, k8sClient, defaultCluster)

Expect(err).NotTo(HaveOccurred())

// A default zk cluster should have 3 pods
podSize := 3
Expect(zk_e2eutil.WaitForClusterToBecomeReady(logger, k8sClient, zk, podSize)).NotTo(HaveOccurred())

// This is to get the latest zk cluster object
zk, err = zk_e2eutil.GetCluster(logger, k8sClient, zk)
Expect(err).NotTo(HaveOccurred())

// Scale down zk cluster, decrease replicas to 1
zk.Spec.Replicas = 1
podSize = 1
Expect(zk_e2eutil.UpdateCluster(logger, k8sClient, zk)).NotTo(HaveOccurred())

Expect(zk_e2eutil.WaitForClusterToBecomeReady(logger, k8sClient, zk, podSize)).NotTo(HaveOccurred())

zk, err = zk_e2eutil.GetCluster(logger, k8sClient, zk)
Expect(err).NotTo(HaveOccurred())

// Scale up zk cluster to 3 again, before the PVCs are cleaned up
zk.Spec.Replicas = 3
podSize = 3
Expect(zk_e2eutil.UpdateCluster(logger, k8sClient, zk)).NotTo(HaveOccurred())

Expect(zk_e2eutil.WaitForClusterToBecomeReady(logger, k8sClient, zk, podSize)).NotTo(HaveOccurred())

// Delete cluster
Expect(zk_e2eutil.DeleteCluster(logger, k8sClient, zk)).NotTo(HaveOccurred())

Expect(zk_e2eutil.WaitForClusterToTerminate(logger, k8sClient, zk)).NotTo(HaveOccurred())
})
})
})
Loading