From ad2a1b2b1906fa5a21675b2f2db37d23f05c4f85 Mon Sep 17 00:00:00 2001 From: Philip Gough Date: Fri, 22 Nov 2024 16:06:05 +0000 Subject: [PATCH] Fixes receive hashring handling for multiple hashrings (#174) * metrics: Adds additional metrics for receiver hashrings * receive: Fixes hashring dynamic merge logic Ensure we are allowing a single replica to be down. Fix ordering of hashring by name. Adds metrics for hashring state * test:e2e: Update test for receive to include second hashring * receive: Return early for unseen hashrings * test: Improve handling of configmap string comparisons --- .../controller/thanosreceive_controller.go | 22 ++++++--------- internal/pkg/metrics/controller_metrics.go | 11 +++++--- internal/pkg/receive/hashrings.go | 19 +++++++++++-- test/e2e/e2e_test.go | 28 +++++++++++++++++-- test/utils/utils.go | 13 +++++++++ 5 files changed, 70 insertions(+), 23 deletions(-) diff --git a/internal/controller/thanosreceive_controller.go b/internal/controller/thanosreceive_controller.go index 281aee52..4d7c9378 100644 --- a/internal/controller/thanosreceive_controller.go +++ b/internal/controller/thanosreceive_controller.go @@ -23,8 +23,6 @@ import ( "github.com/go-logr/logr" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - manifestcompact "github.com/thanos-community/thanos-operator/internal/pkg/manifests/compact" - monitoringthanosiov1alpha1 "github.com/thanos-community/thanos-operator/api/v1alpha1" "github.com/thanos-community/thanos-operator/internal/pkg/handlers" "github.com/thanos-community/thanos-operator/internal/pkg/manifests" @@ -78,7 +76,7 @@ func NewThanosReceiveReconciler(conf Config, client client.Client, scheme *runti Client: client, Scheme: scheme, logger: conf.InstrumentationConfig.Logger, - metrics: controllermetrics.NewThanosReceiveMetrics(conf.InstrumentationConfig.MetricsRegistry, conf.InstrumentationConfig.BaseMetrics), + metrics: controllermetrics.NewThanosReceiveMetrics(conf.InstrumentationConfig.MetricsRegistry), recorder: conf.InstrumentationConfig.EventRecorder, handler: handler, } @@ -89,19 +87,15 @@ func NewThanosReceiveReconciler(conf Config, client client.Client, scheme *runti // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile func (r *ThanosReceiveReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - r.metrics.ReconciliationsTotal.WithLabelValues(manifestreceive.Name).Inc() - // Fetch the ThanosReceive instance to validate it is applied on the cluster. receiver := &monitoringthanosiov1alpha1.ThanosReceive{} err := r.Get(ctx, req.NamespacedName, receiver) if err != nil { - r.metrics.ClientErrorsTotal.WithLabelValues(manifestreceive.Name).Inc() if apierrors.IsNotFound(err) { r.logger.Info("thanos receive resource not found. ignoring since object may be deleted") return ctrl.Result{}, nil } r.logger.Error(err, "failed to get ThanosReceive") - r.metrics.ReconciliationsFailedTotal.WithLabelValues(manifestreceive.Name).Inc() r.recorder.Event(receiver, corev1.EventTypeWarning, "GetFailed", "Failed to get ThanosReceive resource") return ctrl.Result{}, err } @@ -120,7 +114,6 @@ func (r *ThanosReceiveReconciler) Reconcile(ctx context.Context, req ctrl.Reques err = r.syncResources(ctx, *receiver) if err != nil { - r.metrics.ReconciliationsFailedTotal.WithLabelValues(manifestreceive.Name).Inc() r.recorder.Event(receiver, corev1.EventTypeWarning, "SyncFailed", fmt.Sprintf("Failed to sync resources: %v", err)) return ctrl.Result{}, err } @@ -187,14 +180,12 @@ func (r *ThanosReceiveReconciler) syncResources(ctx context.Context, receiver mo } if errCount > 0 { - r.metrics.ClientErrorsTotal.WithLabelValues(manifestreceive.Name).Add(float64(errCount)) return fmt.Errorf("failed to create or update %d resources for receive hashring(s)", errCount) } // prune the ingest resources that are no longer needed/have changed errCount = r.pruneOrphanedResources(ctx, receiver.GetNamespace(), receiver.GetName(), expectIngesters) if errCount > 0 { - r.metrics.ClientErrorsTotal.WithLabelValues(manifestreceive.Name).Add(float64(errCount)) return fmt.Errorf("failed to prune %d orphaned resources for receive ingester(s)", errCount) } @@ -205,7 +196,6 @@ func (r *ThanosReceiveReconciler) syncResources(ctx context.Context, receiver mo routerOpts := r.specToRouterOptions(receiver, string(hashringConfig)) if errs := r.handler.CreateOrUpdate(ctx, receiver.GetNamespace(), &receiver, routerOpts.Build()); errs > 0 { - r.metrics.ClientErrorsTotal.WithLabelValues(manifestreceive.Name).Add(float64(errs)) return fmt.Errorf("failed to create or update %d resources for the receive router", errs) } @@ -215,7 +205,6 @@ func (r *ThanosReceiveReconciler) syncResources(ctx context.Context, receiver mo smObjs[i] = &monitoringv1.ServiceMonitor{ObjectMeta: metav1.ObjectMeta{Name: resource, Namespace: receiver.GetNamespace()}} } if errCount = r.handler.DeleteResource(ctx, smObjs); errCount > 0 { - r.metrics.ClientErrorsTotal.WithLabelValues(manifestcompact.Name).Add(float64(errCount)) return fmt.Errorf("failed to delete %d ServiceMonitors for the receiver", errCount) } } @@ -289,7 +278,14 @@ func (r *ThanosReceiveReconciler) buildHashringConfig(ctx context.Context, recei return []byte(""), nil } - return json.MarshalIndent(out, "", " ") + b, err := json.MarshalIndent(out, "", " ") + if err != nil { + return nil, fmt.Errorf("failed to marshal hashring config: %w", err) + } + + r.metrics.HashringHash.WithLabelValues(receiver.GetName(), receiver.GetNamespace()).Set(receive.HashAsMetricValue(b)) + r.metrics.HashringsConfigured.WithLabelValues(receiver.GetName(), receiver.GetNamespace()).Set(float64(len(out))) + return b, nil } func (r *ThanosReceiveReconciler) handleDeletionTimestamp(receiveHashring *monitoringthanosiov1alpha1.ThanosReceive) (ctrl.Result, error) { diff --git a/internal/pkg/metrics/controller_metrics.go b/internal/pkg/metrics/controller_metrics.go index 5f2c5485..6e818c0e 100644 --- a/internal/pkg/metrics/controller_metrics.go +++ b/internal/pkg/metrics/controller_metrics.go @@ -19,9 +19,9 @@ type ThanosQueryMetrics struct { } type ThanosReceiveMetrics struct { - *BaseMetrics HashringsConfigured *prometheus.GaugeVec EndpointWatchesReconciliationsTotal prometheus.Counter + HashringHash *prometheus.GaugeVec } type ThanosRulerMetrics struct { @@ -75,13 +75,16 @@ func NewThanosQueryMetrics(reg prometheus.Registerer, baseMetrics *BaseMetrics) } } -func NewThanosReceiveMetrics(reg prometheus.Registerer, baseMetrics *BaseMetrics) ThanosReceiveMetrics { +func NewThanosReceiveMetrics(reg prometheus.Registerer) ThanosReceiveMetrics { return ThanosReceiveMetrics{ - BaseMetrics: baseMetrics, HashringsConfigured: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_operator_receive_hashrings_configured", Help: "Number of configured hashrings for ThanosReceive resources", - }, []string{"name", "namespace"}), + }, []string{"resource", "namespace"}), + HashringHash: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_operator_receive_hashring_hash", + Help: "Hash of the hashring for ThanosReceive resources", + }, []string{"resource", "namespace"}), EndpointWatchesReconciliationsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_operator_receive_endpoint_event_reconciliations_total", Help: "Total number of reconciliations for ThanosReceive resources due to EndpointSlice events", diff --git a/internal/pkg/receive/hashrings.go b/internal/pkg/receive/hashrings.go index 09b25e00..934b927d 100644 --- a/internal/pkg/receive/hashrings.go +++ b/internal/pkg/receive/hashrings.go @@ -1,6 +1,8 @@ package receive import ( + "crypto/md5" + "encoding/binary" "encoding/json" "fmt" "slices" @@ -154,18 +156,20 @@ func EndpointSliceListToEndpoints(converter EndpointConverter, eps discoveryv1.E func DynamicMerge(previousState Hashrings, desiredState HashringState, replicationFactor int) Hashrings { var mergedState Hashrings if isEmptyHashring(previousState) { - mergedState = handleUnseenHashrings(desiredState) + return handleUnseenHashrings(desiredState) } for k, v := range desiredState { // we first check that the hashring can meet the desired replication factor // secondly, we allow to tolerate a single missing member. this allows us to account for // voluntary disruptions to the hashring. // todo - allow for more than one missing member based on input from PDB settings etc - if len(v.Config.Endpoints) >= replicationFactor && v.DesiredReplicas-1 >= len(v.Config.Endpoints) { + if len(v.Config.Endpoints) >= replicationFactor && len(v.Config.Endpoints) >= v.DesiredReplicas-1 { mergedState = append(mergedState, metaToHashring(k, v)) continue } // otherwise we look for previous state and merge if it exists + // this means that if the hashring is having issues, we don't interfere with it + // since doing so could cause further disruptions and frequent reshuffling for _, hr := range previousState { if hr.Name == k { mergedState = append(mergedState, hr) @@ -173,7 +177,7 @@ func DynamicMerge(previousState Hashrings, desiredState HashringState, replicati } } sort.Slice(mergedState, func(i, j int) bool { - return mergedState[i].Name > mergedState[j].Name + return mergedState[i].Name < mergedState[j].Name }) return mergedState @@ -227,3 +231,12 @@ func (e *Endpoint) UnmarshalJSON(data []byte) error { } return err } + +// HashAsMetricValue hashes the given data and returns a float64 value. +func HashAsMetricValue(data []byte) float64 { + sum := md5.Sum(data) + smallSum := sum[0:6] + var bytes = make([]byte, 8) + copy(bytes, smallSum) + return float64(binary.LittleEndian.Uint64(bytes)) +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 5cfb58d3..c3dac25d 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -62,7 +62,8 @@ const ( prometheusPort = 9090 - hashringName = "default" + hashringName = "default" + hashringTwoName = "two" ) var _ = Describe("controller", Ordered, func() { @@ -206,6 +207,7 @@ var _ = Describe("controller", Ordered, func() { Describe("Thanos Receive", Ordered, func() { routerName := controller.ReceiveRouterNameFromParent(receiveName) ingesterName := controller.ReceiveIngesterNameFromParent(receiveName, hashringName) + ingesterTwoName := controller.ReceiveIngesterNameFromParent(receiveName, hashringTwoName) Context("When ThanosReceive is created with hashrings", func() { It("should bring up the ingest components", func() { @@ -227,6 +229,13 @@ var _ = Describe("controller", Ordered, func() { Name: hashringName, StorageSize: "100Mi", }, + { + Name: hashringTwoName, + StorageSize: "100Mi", + Tenants: []string{ + "tenant1", + }, + }, }, }, }, @@ -248,7 +257,20 @@ var _ = Describe("controller", Ordered, func() { //nolint:lll expect := fmt.Sprintf(`[ { - "hashring": "default", + "hashring": "%s", + "tenant_matcher_type": "exact", + "endpoints": [ + { + "address": "%s-0.%s.thanos-operator-system.svc.cluster.local:10901", + "az": "" + } + ] + }, + { + "hashring": "%s", + "tenants": [ + "tenant1" + ], "tenant_matcher_type": "exact", "endpoints": [ { @@ -257,7 +279,7 @@ var _ = Describe("controller", Ordered, func() { } ] } -]`, ingesterName, ingesterName) +]`, hashringName, ingesterName, ingesterName, hashringTwoName, ingesterTwoName, ingesterTwoName) Eventually(func() bool { return utils.VerifyConfigMapContents(c, routerName, namespace, receive.HashringConfigKey, expect) }, time.Minute*5, time.Second*10).Should(BeTrue()) diff --git a/test/utils/utils.go b/test/utils/utils.go index b0ad0d44..61c31fa0 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -423,6 +423,19 @@ func VerifyConfigMapContents(c client.Client, name, namespace, key, expect strin } data := cm.Data[key] + + if json.Valid([]byte(data)) && json.Valid([]byte(expect)) { + var jData interface{} + if err := json.Unmarshal([]byte(data), &jData); err != nil { + return false + } + var jExpect interface{} + if err := json.Unmarshal([]byte(expect), &jExpect); err != nil { + return false + } + return equality.Semantic.DeepEqual(jData, jExpect) + + } return data == expect }