diff --git a/controllers/ytsaurus_controller.go b/controllers/ytsaurus_controller.go index 0f509674..071ac0bd 100644 --- a/controllers/ytsaurus_controller.go +++ b/controllers/ytsaurus_controller.go @@ -21,11 +21,17 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" ) @@ -37,6 +43,8 @@ type YtsaurusReconciler struct { Recorder record.EventRecorder } +const configOverridesField = ".spec.configOverrides" + // +kubebuilder:rbac:groups=cluster.ytsaurus.tech,resources=ytsaurus,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=cluster.ytsaurus.tech,resources=ytsaurus/status,verbs=get;update;patch // +kubebuilder:rbac:groups=cluster.ytsaurus.tech,resources=ytsaurus/finalizers,verbs=update @@ -44,6 +52,7 @@ type YtsaurusReconciler struct { // +kubebuilder:rbac:groups=apps,resources=statefulset/status,verbs=get // +kubebuilder:rbac:groups=core,resources=pod,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pod/status,verbs=get +// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch func (r *YtsaurusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -62,6 +71,17 @@ func (r *YtsaurusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // SetupWithManager sets up the controller with the Manager. func (r *YtsaurusReconciler) SetupWithManager(mgr ctrl.Manager) error { + // See https://book.kubebuilder.io/reference/watching-resources/externally-managed for the reference implementation + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &ytv1.Ytsaurus{}, configOverridesField, func(rawObj client.Object) []string { + ytsaurusResource := rawObj.(*ytv1.Ytsaurus) + if ytsaurusResource.Spec.ConfigOverrides == nil { + return nil + } + return []string{ytsaurusResource.Spec.ConfigOverrides.Name} + }); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&ytv1.Ytsaurus{}). Owns(&appsv1.StatefulSet{}). @@ -69,5 +89,34 @@ func (r *YtsaurusReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&corev1.Service{}). Owns(&appsv1.Deployment{}). Owns(&corev1.Secret{}). + Watches( + &corev1.ConfigMap{}, + handler.EnqueueRequestsFromMapFunc(r.findObjectsForConfigMap), + builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), + ). Complete(r) } + +func (r *YtsaurusReconciler) findObjectsForConfigMap(ctx context.Context, configMap client.Object) []reconcile.Request { + // See https://book.kubebuilder.io/reference/watching-resources/externally-managed for the reference implementation + attachedYtsauruses := &ytv1.YtsaurusList{} + listOps := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(configOverridesField, configMap.GetName()), + Namespace: configMap.GetNamespace(), + } + err := r.List(ctx, attachedYtsauruses, listOps) + if err != nil { + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, len(attachedYtsauruses.Items)) + for i, item := range attachedYtsauruses.Items { + requests[i] = reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: item.GetName(), + Namespace: item.GetNamespace(), + }, + } + } + return requests +} diff --git a/test/e2e/ytsaurus_controller_test.go b/test/e2e/ytsaurus_controller_test.go index 68cf708c..6e71487e 100644 --- a/test/e2e/ytsaurus_controller_test.go +++ b/test/e2e/ytsaurus_controller_test.go @@ -17,6 +17,7 @@ import ( "go.ytsaurus.tech/yt/go/mapreduce/spec" "go.ytsaurus.tech/yt/go/yt/ytrpc" "go.ytsaurus.tech/yt/go/yterrors" + apierrors "k8s.io/apimachinery/pkg/api/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -249,6 +250,17 @@ func runImpossibleUpdateAndRollback(ytsaurus *ytv1.Ytsaurus, ytClient yt.Client) Expect(ytClient.ListNode(ctx, ypath.Path("/"), &res, nil)).Should(Succeed()) } +func createConfigOverridesMap(namespace, name, key, value string) { + resource := corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Data: map[string]string{key: value}, + } + Expect(k8sClient.Create(ctx, &resource)).Should(Succeed()) +} + type testRow struct { A string `yson:"a"` } @@ -769,6 +781,60 @@ var _ = Describe("Basic test for Ytsaurus controller", func() { }, ) + It( + "ConfigOverrides update shout trigger reconciliation", + func(ctx context.Context) { + namespace := "test-overrides" + coName := "config-overrides" + ytsaurus := testutil.CreateMinimalYtsaurusResource(namespace) + ytsaurus.Spec.ConfigOverrides = &corev1.LocalObjectReference{Name: coName} + DeferCleanup(deleteYtsaurus, ytsaurus) + + deployAndCheck(ytsaurus, namespace) + log.Info("sleep a little, because after cluster is running some reconciliations still may happen " + + "for some time (and for some reason) and we don't want to interfere before map creation") + time.Sleep(3 * time.Second) + + dsPodName := "ds-0" + msPodName := "ms-0" + + getPodByName := func(name string) (*corev1.Pod, error) { + ds0Name := types.NamespacedName{Name: name, Namespace: namespace} + dsPod := &corev1.Pod{} + err := k8sClient.Get(ctx, ds0Name, dsPod) + return dsPod, err + } + dsPod, err := getPodByName(dsPodName) + Expect(err).Should(Succeed()) + msPod, err := getPodByName(msPodName) + Expect(err).Should(Succeed()) + dsPodCreatedBefore := dsPod.CreationTimestamp.Time + log.Info("ds created before", "ts", dsPodCreatedBefore) + msPodCreatedBefore := msPod.CreationTimestamp.Time + + discoveryOverride := "{resource_limits = {total_memory = 123456789;};}" + createConfigOverridesMap(namespace, coName, "ytserver-discovery.yson", discoveryOverride) + + Eventually(ctx, func() bool { + pod, err := getPodByName(dsPodName) + if apierrors.IsNotFound(err) { + return false + } + dsPodCreatedAfter := pod.CreationTimestamp + log.Info("ds created after", "ts", dsPodCreatedAfter) + return dsPodCreatedAfter.After(dsPodCreatedBefore) + }).WithTimeout(30 * time.Second). + WithPolling(300 * time.Millisecond). + Should(BeTrueBecause("ds pod should be recreated on configOverrides creation")) + + msPod, err = getPodByName(msPodName) + Expect(err).Should(Succeed()) + Expect(msPod.CreationTimestamp.Time).Should( + Equal(msPodCreatedBefore), "ms pods shouldn't be recreated", + ) + }, + ) + It("Sensors should be annotated with host", func(ctx context.Context) { namespace := "testsolomon" ytsaurus := testutil.CreateMinimalYtsaurusResource(namespace)