Skip to content

Commit

Permalink
Operator should detect configOverrides updates (#314)
Browse files Browse the repository at this point in the history
* #291 Repro test

* fix lint

* Implement configOverrides reconciliation

* Fix test
  • Loading branch information
l0kix2 authored Jul 23, 2024
1 parent 577c4da commit b360ffd
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 0 deletions.
49 changes: 49 additions & 0 deletions controllers/ytsaurus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1"
)
Expand All @@ -37,13 +43,16 @@ type YtsaurusReconciler struct {
Recorder record.EventRecorder
}

const configOverridesField = ".spec.configOverrides"

// +kubebuilder:rbac:groups=cluster.ytsaurus.tech,resources=ytsaurus,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cluster.ytsaurus.tech,resources=ytsaurus/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cluster.ytsaurus.tech,resources=ytsaurus/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=statefulset,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=statefulset/status,verbs=get
// +kubebuilder:rbac:groups=core,resources=pod,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pod/status,verbs=get
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch
func (r *YtsaurusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

Expand All @@ -62,12 +71,52 @@ func (r *YtsaurusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

// SetupWithManager sets up the controller with the Manager.
func (r *YtsaurusReconciler) SetupWithManager(mgr ctrl.Manager) error {
// See https://book.kubebuilder.io/reference/watching-resources/externally-managed for the reference implementation
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &ytv1.Ytsaurus{}, configOverridesField, func(rawObj client.Object) []string {
ytsaurusResource := rawObj.(*ytv1.Ytsaurus)
if ytsaurusResource.Spec.ConfigOverrides == nil {
return nil
}
return []string{ytsaurusResource.Spec.ConfigOverrides.Name}
}); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&ytv1.Ytsaurus{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.ConfigMap{}).
Owns(&corev1.Service{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Secret{}).
Watches(
&corev1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(r.findObjectsForConfigMap),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
Complete(r)
}

func (r *YtsaurusReconciler) findObjectsForConfigMap(ctx context.Context, configMap client.Object) []reconcile.Request {
// See https://book.kubebuilder.io/reference/watching-resources/externally-managed for the reference implementation
attachedYtsauruses := &ytv1.YtsaurusList{}
listOps := &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(configOverridesField, configMap.GetName()),
Namespace: configMap.GetNamespace(),
}
err := r.List(ctx, attachedYtsauruses, listOps)
if err != nil {
return []reconcile.Request{}
}

requests := make([]reconcile.Request, len(attachedYtsauruses.Items))
for i, item := range attachedYtsauruses.Items {
requests[i] = reconcile.Request{
NamespacedName: types.NamespacedName{
Name: item.GetName(),
Namespace: item.GetNamespace(),
},
}
}
return requests
}
66 changes: 66 additions & 0 deletions test/e2e/ytsaurus_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.ytsaurus.tech/yt/go/mapreduce/spec"
"go.ytsaurus.tech/yt/go/yt/ytrpc"
"go.ytsaurus.tech/yt/go/yterrors"
apierrors "k8s.io/apimachinery/pkg/api/errors"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -249,6 +250,17 @@ func runImpossibleUpdateAndRollback(ytsaurus *ytv1.Ytsaurus, ytClient yt.Client)
Expect(ytClient.ListNode(ctx, ypath.Path("/"), &res, nil)).Should(Succeed())
}

func createConfigOverridesMap(namespace, name, key, value string) {
resource := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Data: map[string]string{key: value},
}
Expect(k8sClient.Create(ctx, &resource)).Should(Succeed())
}

type testRow struct {
A string `yson:"a"`
}
Expand Down Expand Up @@ -769,6 +781,60 @@ var _ = Describe("Basic test for Ytsaurus controller", func() {
},
)

It(
"ConfigOverrides update shout trigger reconciliation",
func(ctx context.Context) {
namespace := "test-overrides"
coName := "config-overrides"
ytsaurus := testutil.CreateMinimalYtsaurusResource(namespace)
ytsaurus.Spec.ConfigOverrides = &corev1.LocalObjectReference{Name: coName}
DeferCleanup(deleteYtsaurus, ytsaurus)

deployAndCheck(ytsaurus, namespace)
log.Info("sleep a little, because after cluster is running some reconciliations still may happen " +
"for some time (and for some reason) and we don't want to interfere before map creation")
time.Sleep(3 * time.Second)

dsPodName := "ds-0"
msPodName := "ms-0"

getPodByName := func(name string) (*corev1.Pod, error) {
ds0Name := types.NamespacedName{Name: name, Namespace: namespace}
dsPod := &corev1.Pod{}
err := k8sClient.Get(ctx, ds0Name, dsPod)
return dsPod, err
}
dsPod, err := getPodByName(dsPodName)
Expect(err).Should(Succeed())
msPod, err := getPodByName(msPodName)
Expect(err).Should(Succeed())
dsPodCreatedBefore := dsPod.CreationTimestamp.Time
log.Info("ds created before", "ts", dsPodCreatedBefore)
msPodCreatedBefore := msPod.CreationTimestamp.Time

discoveryOverride := "{resource_limits = {total_memory = 123456789;};}"
createConfigOverridesMap(namespace, coName, "ytserver-discovery.yson", discoveryOverride)

Eventually(ctx, func() bool {
pod, err := getPodByName(dsPodName)
if apierrors.IsNotFound(err) {
return false
}
dsPodCreatedAfter := pod.CreationTimestamp
log.Info("ds created after", "ts", dsPodCreatedAfter)
return dsPodCreatedAfter.After(dsPodCreatedBefore)
}).WithTimeout(30 * time.Second).
WithPolling(300 * time.Millisecond).
Should(BeTrueBecause("ds pod should be recreated on configOverrides creation"))

msPod, err = getPodByName(msPodName)
Expect(err).Should(Succeed())
Expect(msPod.CreationTimestamp.Time).Should(
Equal(msPodCreatedBefore), "ms pods shouldn't be recreated",
)
},
)

It("Sensors should be annotated with host", func(ctx context.Context) {
namespace := "testsolomon"
ytsaurus := testutil.CreateMinimalYtsaurusResource(namespace)
Expand Down

0 comments on commit b360ffd

Please sign in to comment.