Skip to content

Commit

Permalink
Merge branch 'pravega-master' into zk-backup-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
ibumarskov committed Apr 13, 2022
2 parents a53eebc + a62cbf1 commit 06853d8
Show file tree
Hide file tree
Showing 12 changed files with 5,931 additions and 3,407 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ jobs:
sudo go version
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: get go version
run: go version
- name: Gofmt and License checks
run: make check
- name: unit tests
Expand All @@ -37,7 +39,7 @@ jobs:
uses: codecov/[email protected]
- name: Set env
run: |
echo "KUBERNETES_VERSION=v1.20.13" >> $GITHUB_ENV
echo "KUBERNETES_VERSION=v1.23.1" >> $GITHUB_ENV
echo "OPERATOR_SDK_VERSION=v0.19.4" >> $GITHUB_ENV
echo "MINIKUBE_VERSION=v1.24.0" >> $GITHUB_ENV
echo "KUBERNETES_CONFIG_FILE=$HOME/.kube/config" >> $GITHUB_ENV
Expand Down
15 changes: 15 additions & 0 deletions api/v1beta1/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Copyright (c) 2018 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

// Package v1beta1 contains API Schema definitions for the zookeeper v1beta1 API
// group
// +k8s:deepcopy-gen=package,register
// +groupName=zookeeper.pravega.io
package v1beta1
8,690 changes: 5,552 additions & 3,138 deletions config/crd/bases/zookeeper.pravega.io_zookeeperclusters.yaml

Large diffs are not rendered by default.

69 changes: 34 additions & 35 deletions controllers/zookeeperbackup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,26 @@ import (
"github.com/mitchellh/hashstructure/v2"
)


var logBk = logf.Log.WithName("controller_zookeeperbackup")

// ZookeeperBackupReconciler reconciles a ZookeeperBackup object
type ZookeeperBackupReconciler struct {
client client.Client
scheme *runtime.Scheme
log logr.Logger
Client client.Client
Scheme *runtime.Scheme
Log logr.Logger
}

//+kubebuilder:rbac:groups=zookeeper.pravega.io.zookeeper.pravega.io,resources=zookeeperbackups,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=zookeeper.pravega.io.zookeeper.pravega.io,resources=zookeeperbackups/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=zookeeper.pravega.io.zookeeper.pravega.io,resources=zookeeperbackups/finalizers,verbs=update

func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
r.log = logBk.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
r.log.Info("Reconciling ZookeeperBackup")
func (r *ZookeeperBackupReconciler) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
r.Log = logBk.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
r.Log.Info("Reconciling ZookeeperBackup")

// Fetch the ZookeeperBackup instance
zookeeperBackup := &zookeeperv1beta1.ZookeeperBackup{}
err := r.client.Get(context.TODO(), request.NamespacedName, zookeeperBackup)
err := r.Client.Get(context.TODO(), request.NamespacedName, zookeeperBackup)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
Expand All @@ -74,16 +73,16 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
pvc := newPVCForZookeeperBackup(zookeeperBackup)

// Set ZookeeperBackup instance as the owner and controller
if err := controllerutil.SetControllerReference(zookeeperBackup, pvc, r.scheme); err != nil {
if err := controllerutil.SetControllerReference(zookeeperBackup, pvc, r.Scheme); err != nil {
return reconcile.Result{}, err
}

// Check if PVC already created
foundPVC := &corev1.PersistentVolumeClaim{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}, foundPVC)
err = r.Client.Get(context.TODO(), types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}, foundPVC)
if err != nil && errors.IsNotFound(err) {
r.log.Info("Creating a new PersistenVolumeClaim")
err = r.client.Create(context.TODO(), pvc)
r.Log.Info("Creating a new PersistenVolumeClaim")
err = r.Client.Create(context.TODO(), pvc)
if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -95,20 +94,20 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
cronJob := newCronJobForCR(zookeeperBackup)

// Set ZookeeperBackup instance as the owner and controller
if err := controllerutil.SetControllerReference(zookeeperBackup, cronJob, r.scheme); err != nil {
if err := controllerutil.SetControllerReference(zookeeperBackup, cronJob, r.Scheme); err != nil {
return reconcile.Result{}, err
}

// Check if zookeeper cluster exists
foundZookeeperCluster := &zookeeperv1beta1.ZookeeperCluster{}
zkCluster := zookeeperBackup.Spec.ZookeeperCluster
err = r.client.Get(context.TODO(), types.NamespacedName{Name: zkCluster, Namespace: zookeeperBackup.Namespace}, foundZookeeperCluster)
err = r.Client.Get(context.TODO(), types.NamespacedName{Name: zkCluster, Namespace: zookeeperBackup.Namespace}, foundZookeeperCluster)
if err != nil && errors.IsNotFound(err) {
r.log.Error(err, fmt.Sprintf("Zookeeper cluster '%s' not found", zkCluster))
r.Log.Error(err, fmt.Sprintf("Zookeeper cluster '%s' not found", zkCluster))
return reconcile.Result{}, err
}
if foundZookeeperCluster.Status.Replicas != foundZookeeperCluster.Status.ReadyReplicas {
r.log.Info(fmt.Sprintf("Not all cluster replicas are ready: %d/%d. Suspend CronJob",
r.Log.Info(fmt.Sprintf("Not all cluster replicas are ready: %d/%d. Suspend CronJob",
foundZookeeperCluster.Status.ReadyReplicas, foundZookeeperCluster.Status.Replicas))
*cronJob.Spec.Suspend = true
} else {
Expand All @@ -120,7 +119,7 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
if err != nil && errors.IsNotFound(err) {
return reconcile.Result{}, err
}
r.log.Info(fmt.Sprintf("Leader IP (hostname): %s", leaderIp))
r.Log.Info(fmt.Sprintf("Leader IP (hostname): %s", leaderIp))
leaderHostname := strings.Split(leaderIp, ".")[0]

// Landing backup pod on the same node with leader
Expand All @@ -129,11 +128,11 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
client.InNamespace(request.NamespacedName.Namespace),
client.MatchingLabels{"app": zkCluster},
}
err = r.client.List(context.TODO(), podList, opts...)
err = r.Client.List(context.TODO(), podList, opts...)
if err != nil {
if errors.IsNotFound(err) {
msg := fmt.Sprintf("Pods cannot be found by label app:%s", zookeeperBackup.Name)
r.log.Error(err, msg)
r.Log.Error(err, msg)
}
return reconcile.Result{}, err
}
Expand All @@ -142,7 +141,7 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
for _, pod := range podList.Items {
if pod.Spec.Hostname == leaderHostname {
leaderFound = true
r.log.Info(fmt.Sprintf("Leader was found. Pod: %s (node: %s)", pod.Name, pod.Spec.NodeName))
r.Log.Info(fmt.Sprintf("Leader was found. Pod: %s (node: %s)", pod.Name, pod.Spec.NodeName))
// Set appropriate NodeSelector and PVC ClaimName
cronJob.Spec.JobTemplate.Spec.Template.Spec.NodeSelector =
map[string]string{"kubernetes.io/hostname": pod.Spec.NodeName}
Expand All @@ -152,7 +151,7 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc
}
}
if !leaderFound {
r.log.Info("Pod with leader role wasn't found. Suspend CronJob")
r.Log.Info("Pod with leader role wasn't found. Suspend CronJob")
*cronJob.Spec.Suspend = true
}

Expand All @@ -169,50 +168,50 @@ func (r *ZookeeperBackupReconciler) Reconcile(request reconcile.Request) (reconc

// Check if this CronJob already exists
foundCJ := &batchv1beta1.CronJob{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: cronJob.Name, Namespace: cronJob.Namespace}, foundCJ)
err = r.Client.Get(context.TODO(), types.NamespacedName{Name: cronJob.Name, Namespace: cronJob.Namespace}, foundCJ)
if err != nil && errors.IsNotFound(err) {
r.log.Info("Creating a new CronJob", "CronJob.Namespace", cronJob.Namespace, "CronJob.Name", cronJob.Name)
r.Log.Info("Creating a new CronJob", "CronJob.Namespace", cronJob.Namespace, "CronJob.Name", cronJob.Name)
cronJob.Annotations["last-applied-hash"] = hashStr
err = r.client.Create(context.TODO(), cronJob)
err = r.Client.Create(context.TODO(), cronJob)
if err != nil {
return reconcile.Result{}, err
}

// CronJob created successfully
r.log.Info("CronJob created successfully.", "RequeueAfter", ReconcileTime)
r.Log.Info("CronJob created successfully.", "RequeueAfter", ReconcileTime)
return reconcile.Result{RequeueAfter: ReconcileTime}, nil
} else if err != nil {
return reconcile.Result{}, err
}

if foundCJ.Annotations["last-applied-hash"] == hashStr {
r.log.Info("CronJob already exists and looks updated", "CronJob.Namespace", foundCJ.Namespace, "CronJob.Name", foundCJ.Name)
r.Log.Info("CronJob already exists and looks updated", "CronJob.Namespace", foundCJ.Namespace, "CronJob.Name", foundCJ.Name)
} else {
cronJob.Annotations["last-applied-hash"] = hashStr
r.log.Info("Update CronJob", "Namespace", cronJob.Namespace, "Name", cronJob.Name)
r.Log.Info("Update CronJob", "Namespace", cronJob.Namespace, "Name", cronJob.Name)
//cronJob.ObjectMeta.ResourceVersion = foundCJ.ObjectMeta.ResourceVersion
err = r.client.Update(context.TODO(), cronJob)
err = r.Client.Update(context.TODO(), cronJob)
if err != nil {
r.log.Error(err, "CronJob cannot be updated")
r.Log.Error(err, "CronJob cannot be updated")
return reconcile.Result{}, err
}
}

// Requeue
r.log.Info(fmt.Sprintf("Rerun reconclie after %s sec.", ReconcileTime))
r.Log.Info(fmt.Sprintf("Rerun reconclie after %s sec.", ReconcileTime))
return reconcile.Result{RequeueAfter: ReconcileTime}, nil
}

func (r *ZookeeperBackupReconciler) GetLeaderIP(zkCluster *zookeeperv1beta1.ZookeeperCluster) (string, error) {
// Get zookeeper leader via zookeeper admin server
svcAdminName := zkCluster.GetAdminServerServiceName()
foundSvcAdmin := &corev1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{
err := r.Client.Get(context.TODO(), types.NamespacedName{
Name: svcAdminName,
Namespace: zkCluster.Namespace,
}, foundSvcAdmin)
if err != nil && errors.IsNotFound(err) {
r.log.Error(err, fmt.Sprintf("Zookeeper admin service '%s' not found", svcAdminName))
r.Log.Error(err, fmt.Sprintf("Zookeeper admin service '%s' not found", svcAdminName))
return "", err
}

Expand All @@ -221,19 +220,19 @@ func (r *ZookeeperBackupReconciler) GetLeaderIP(zkCluster *zookeeperv1beta1.Zook

resp, err := http.Get(fmt.Sprintf("http://%s:%d/commands/leader", adminIp, svcPort.Port))
if err != nil {
r.log.Error(err, "Admin service error response")
r.Log.Error(err, "Admin service error response")
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
r.log.Error(err, "Can't read response body")
r.Log.Error(err, "Can't read response body")
return "", err
}
var result map[string]interface{}
err = json.Unmarshal(body, &result)
if err != nil {
r.log.Error(err, "Can't unmarshal json")
r.Log.Error(err, "Can't unmarshal json")
return "", err
}
leaderIp := result["leader_ip"].(string)
Expand Down
23 changes: 12 additions & 11 deletions controllers/zookeepercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ package controllers
import (
"context"
"fmt"
"strconv"
"time"

"github.com/pravega/zookeeper-operator/pkg/controller/config"
"github.com/pravega/zookeeper-operator/pkg/utils"
"github.com/pravega/zookeeper-operator/pkg/yamlexporter"
"github.com/pravega/zookeeper-operator/pkg/zk"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"strconv"
"time"

"github.com/go-logr/logr"
zookeeperv1beta1 "github.com/pravega/zookeeper-operator/api/v1beta1"
Expand Down Expand Up @@ -59,7 +60,7 @@ type reconcileFun func(cluster *zookeeperv1beta1.ZookeeperCluster) error
// +kubebuilder:rbac:groups=zookeeper.pravega.io.zookeeper.pravega.io,resources=zookeeperclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=zookeeper.pravega.io.zookeeper.pravega.io,resources=zookeeperclusters/status,verbs=get;update;patch

func (r *ZookeeperClusterReconciler) Reconcile(request ctrl.Request) (ctrl.Result, error) {
func (r *ZookeeperClusterReconciler) Reconcile(_ context.Context, request ctrl.Request) (ctrl.Result, error) {
r.Log = log.WithValues(
"Request.Namespace", request.Namespace,
"Request.Name", request.Name)
Expand Down Expand Up @@ -262,7 +263,7 @@ func (r *ZookeeperClusterReconciler) updateStatefulSet(instance *zookeeperv1beta

func (r *ZookeeperClusterReconciler) upgradeStatefulSet(instance *zookeeperv1beta1.ZookeeperCluster, foundSts *appsv1.StatefulSet) (err error) {

//Getting the upgradeCondition from the zk clustercondition
// Getting the upgradeCondition from the zk clustercondition
_, upgradeCondition := instance.Status.GetClusterCondition(zookeeperv1beta1.ClusterConditionUpgrading)

if upgradeCondition == nil {
Expand All @@ -271,8 +272,8 @@ func (r *ZookeeperClusterReconciler) upgradeStatefulSet(instance *zookeeperv1bet
return nil
}

//Setting the upgrade condition to true to trigger the upgrade
//When the zk cluster is upgrading Statefulset CurrentRevision and UpdateRevision are not equal and zk cluster image tag is not equal to CurrentVersion
// Setting the upgrade condition to true to trigger the upgrade
// When the zk cluster is upgrading Statefulset CurrentRevision and UpdateRevision are not equal and zk cluster image tag is not equal to CurrentVersion
if upgradeCondition.Status == corev1.ConditionFalse {
if instance.Status.IsClusterInReadyState() && foundSts.Status.CurrentRevision != foundSts.Status.UpdateRevision && instance.Spec.Image.Tag != instance.Status.CurrentVersion {
instance.Status.TargetVersion = instance.Spec.Image.Tag
Expand All @@ -281,20 +282,20 @@ func (r *ZookeeperClusterReconciler) upgradeStatefulSet(instance *zookeeperv1bet
}
}

//checking if the upgrade is in progress
// checking if the upgrade is in progress
if upgradeCondition.Status == corev1.ConditionTrue {
//checking when the targetversion is empty
// checking when the targetversion is empty
if instance.Status.TargetVersion == "" {
r.Log.Info("upgrading to an unknown version: cancelling upgrade process")
return r.clearUpgradeStatus(instance)
}
//Checking for upgrade completion
// Checking for upgrade completion
if foundSts.Status.CurrentRevision == foundSts.Status.UpdateRevision {
instance.Status.CurrentVersion = instance.Status.TargetVersion
r.Log.Info("upgrade completed")
return r.clearUpgradeStatus(instance)
}
//updating the upgradecondition if upgrade is in progress
// updating the upgradecondition if upgrade is in progress
if foundSts.Status.CurrentRevision != foundSts.Status.UpdateRevision {
r.Log.Info("upgrade in progress")
if fmt.Sprint(foundSts.Status.UpdatedReplicas) != upgradeCondition.Message {
Expand Down Expand Up @@ -555,7 +556,7 @@ func (r *ZookeeperClusterReconciler) reconcileClusterStatus(instance *zookeeperv
instance.Status.Members.Ready = readyMembers
instance.Status.Members.Unready = unreadyMembers

//If Cluster is in a ready state...
// If Cluster is in a ready state...
if instance.Spec.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) {
r.Log.Info("Cluster is Ready, Creating ZK Metadata...")
zkUri := utils.GetZkServiceUri(instance)
Expand Down
Loading

0 comments on commit 06853d8

Please sign in to comment.