Skip to content

Commit

Permalink
Fixes receive hashring handling for multiple hashrings (#174)
Browse files Browse the repository at this point in the history
* metrics: Adds additional metrics for receiver hashrings

* receive: Fixes hashring dynamic merge logic

Ensure we are allowing a single replica to be down.
Fix ordering of hashring by name.
Adds metrics for hashring state

* test:e2e: Update test for receive to include second hashring

* receive: Return early for unseen hashrings

* test: Improve handling of configmap string comparisons
  • Loading branch information
philipgough authored Nov 22, 2024
1 parent e2530f4 commit ad2a1b2
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 23 deletions.
22 changes: 9 additions & 13 deletions internal/controller/thanosreceive_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (

"github.com/go-logr/logr"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
manifestcompact "github.com/thanos-community/thanos-operator/internal/pkg/manifests/compact"

monitoringthanosiov1alpha1 "github.com/thanos-community/thanos-operator/api/v1alpha1"
"github.com/thanos-community/thanos-operator/internal/pkg/handlers"
"github.com/thanos-community/thanos-operator/internal/pkg/manifests"
Expand Down Expand Up @@ -78,7 +76,7 @@ func NewThanosReceiveReconciler(conf Config, client client.Client, scheme *runti
Client: client,
Scheme: scheme,
logger: conf.InstrumentationConfig.Logger,
metrics: controllermetrics.NewThanosReceiveMetrics(conf.InstrumentationConfig.MetricsRegistry, conf.InstrumentationConfig.BaseMetrics),
metrics: controllermetrics.NewThanosReceiveMetrics(conf.InstrumentationConfig.MetricsRegistry),
recorder: conf.InstrumentationConfig.EventRecorder,
handler: handler,
}
Expand All @@ -89,19 +87,15 @@ func NewThanosReceiveReconciler(conf Config, client client.Client, scheme *runti
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *ThanosReceiveReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.metrics.ReconciliationsTotal.WithLabelValues(manifestreceive.Name).Inc()

// Fetch the ThanosReceive instance to validate it is applied on the cluster.
receiver := &monitoringthanosiov1alpha1.ThanosReceive{}
err := r.Get(ctx, req.NamespacedName, receiver)
if err != nil {
r.metrics.ClientErrorsTotal.WithLabelValues(manifestreceive.Name).Inc()
if apierrors.IsNotFound(err) {
r.logger.Info("thanos receive resource not found. ignoring since object may be deleted")
return ctrl.Result{}, nil
}
r.logger.Error(err, "failed to get ThanosReceive")
r.metrics.ReconciliationsFailedTotal.WithLabelValues(manifestreceive.Name).Inc()
r.recorder.Event(receiver, corev1.EventTypeWarning, "GetFailed", "Failed to get ThanosReceive resource")
return ctrl.Result{}, err
}
Expand All @@ -120,7 +114,6 @@ func (r *ThanosReceiveReconciler) Reconcile(ctx context.Context, req ctrl.Reques

err = r.syncResources(ctx, *receiver)
if err != nil {
r.metrics.ReconciliationsFailedTotal.WithLabelValues(manifestreceive.Name).Inc()
r.recorder.Event(receiver, corev1.EventTypeWarning, "SyncFailed", fmt.Sprintf("Failed to sync resources: %v", err))
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -187,14 +180,12 @@ func (r *ThanosReceiveReconciler) syncResources(ctx context.Context, receiver mo
}

if errCount > 0 {
r.metrics.ClientErrorsTotal.WithLabelValues(manifestreceive.Name).Add(float64(errCount))
return fmt.Errorf("failed to create or update %d resources for receive hashring(s)", errCount)
}

// prune the ingest resources that are no longer needed/have changed
errCount = r.pruneOrphanedResources(ctx, receiver.GetNamespace(), receiver.GetName(), expectIngesters)
if errCount > 0 {
r.metrics.ClientErrorsTotal.WithLabelValues(manifestreceive.Name).Add(float64(errCount))
return fmt.Errorf("failed to prune %d orphaned resources for receive ingester(s)", errCount)
}

Expand All @@ -205,7 +196,6 @@ func (r *ThanosReceiveReconciler) syncResources(ctx context.Context, receiver mo
routerOpts := r.specToRouterOptions(receiver, string(hashringConfig))

if errs := r.handler.CreateOrUpdate(ctx, receiver.GetNamespace(), &receiver, routerOpts.Build()); errs > 0 {
r.metrics.ClientErrorsTotal.WithLabelValues(manifestreceive.Name).Add(float64(errs))
return fmt.Errorf("failed to create or update %d resources for the receive router", errs)
}

Expand All @@ -215,7 +205,6 @@ func (r *ThanosReceiveReconciler) syncResources(ctx context.Context, receiver mo
smObjs[i] = &monitoringv1.ServiceMonitor{ObjectMeta: metav1.ObjectMeta{Name: resource, Namespace: receiver.GetNamespace()}}
}
if errCount = r.handler.DeleteResource(ctx, smObjs); errCount > 0 {
r.metrics.ClientErrorsTotal.WithLabelValues(manifestcompact.Name).Add(float64(errCount))
return fmt.Errorf("failed to delete %d ServiceMonitors for the receiver", errCount)
}
}
Expand Down Expand Up @@ -289,7 +278,14 @@ func (r *ThanosReceiveReconciler) buildHashringConfig(ctx context.Context, recei
return []byte(""), nil
}

return json.MarshalIndent(out, "", " ")
b, err := json.MarshalIndent(out, "", " ")
if err != nil {
return nil, fmt.Errorf("failed to marshal hashring config: %w", err)
}

r.metrics.HashringHash.WithLabelValues(receiver.GetName(), receiver.GetNamespace()).Set(receive.HashAsMetricValue(b))
r.metrics.HashringsConfigured.WithLabelValues(receiver.GetName(), receiver.GetNamespace()).Set(float64(len(out)))
return b, nil
}

func (r *ThanosReceiveReconciler) handleDeletionTimestamp(receiveHashring *monitoringthanosiov1alpha1.ThanosReceive) (ctrl.Result, error) {
Expand Down
11 changes: 7 additions & 4 deletions internal/pkg/metrics/controller_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ type ThanosQueryMetrics struct {
}

type ThanosReceiveMetrics struct {
*BaseMetrics
HashringsConfigured *prometheus.GaugeVec
EndpointWatchesReconciliationsTotal prometheus.Counter
HashringHash *prometheus.GaugeVec
}

type ThanosRulerMetrics struct {
Expand Down Expand Up @@ -75,13 +75,16 @@ func NewThanosQueryMetrics(reg prometheus.Registerer, baseMetrics *BaseMetrics)
}
}

func NewThanosReceiveMetrics(reg prometheus.Registerer, baseMetrics *BaseMetrics) ThanosReceiveMetrics {
func NewThanosReceiveMetrics(reg prometheus.Registerer) ThanosReceiveMetrics {
return ThanosReceiveMetrics{
BaseMetrics: baseMetrics,
HashringsConfigured: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_operator_receive_hashrings_configured",
Help: "Number of configured hashrings for ThanosReceive resources",
}, []string{"name", "namespace"}),
}, []string{"resource", "namespace"}),
HashringHash: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_operator_receive_hashring_hash",
Help: "Hash of the hashring for ThanosReceive resources",
}, []string{"resource", "namespace"}),
EndpointWatchesReconciliationsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_operator_receive_endpoint_event_reconciliations_total",
Help: "Total number of reconciliations for ThanosReceive resources due to EndpointSlice events",
Expand Down
19 changes: 16 additions & 3 deletions internal/pkg/receive/hashrings.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package receive

import (
"crypto/md5"
"encoding/binary"
"encoding/json"
"fmt"
"slices"
Expand Down Expand Up @@ -154,26 +156,28 @@ func EndpointSliceListToEndpoints(converter EndpointConverter, eps discoveryv1.E
func DynamicMerge(previousState Hashrings, desiredState HashringState, replicationFactor int) Hashrings {
var mergedState Hashrings
if isEmptyHashring(previousState) {
mergedState = handleUnseenHashrings(desiredState)
return handleUnseenHashrings(desiredState)
}
for k, v := range desiredState {
// we first check that the hashring can meet the desired replication factor
// secondly, we allow to tolerate a single missing member. this allows us to account for
// voluntary disruptions to the hashring.
// todo - allow for more than one missing member based on input from PDB settings etc
if len(v.Config.Endpoints) >= replicationFactor && v.DesiredReplicas-1 >= len(v.Config.Endpoints) {
if len(v.Config.Endpoints) >= replicationFactor && len(v.Config.Endpoints) >= v.DesiredReplicas-1 {
mergedState = append(mergedState, metaToHashring(k, v))
continue
}
// otherwise we look for previous state and merge if it exists
// this means that if the hashring is having issues, we don't interfere with it
// since doing so could cause further disruptions and frequent reshuffling
for _, hr := range previousState {
if hr.Name == k {
mergedState = append(mergedState, hr)
}
}
}
sort.Slice(mergedState, func(i, j int) bool {
return mergedState[i].Name > mergedState[j].Name
return mergedState[i].Name < mergedState[j].Name
})

return mergedState
Expand Down Expand Up @@ -227,3 +231,12 @@ func (e *Endpoint) UnmarshalJSON(data []byte) error {
}
return err
}

// HashAsMetricValue hashes the given data and returns a float64 value.
func HashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
smallSum := sum[0:6]
var bytes = make([]byte, 8)
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}
28 changes: 25 additions & 3 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ const (

prometheusPort = 9090

hashringName = "default"
hashringName = "default"
hashringTwoName = "two"
)

var _ = Describe("controller", Ordered, func() {
Expand Down Expand Up @@ -206,6 +207,7 @@ var _ = Describe("controller", Ordered, func() {
Describe("Thanos Receive", Ordered, func() {
routerName := controller.ReceiveRouterNameFromParent(receiveName)
ingesterName := controller.ReceiveIngesterNameFromParent(receiveName, hashringName)
ingesterTwoName := controller.ReceiveIngesterNameFromParent(receiveName, hashringTwoName)

Context("When ThanosReceive is created with hashrings", func() {
It("should bring up the ingest components", func() {
Expand All @@ -227,6 +229,13 @@ var _ = Describe("controller", Ordered, func() {
Name: hashringName,
StorageSize: "100Mi",
},
{
Name: hashringTwoName,
StorageSize: "100Mi",
Tenants: []string{
"tenant1",
},
},
},
},
},
Expand All @@ -248,7 +257,20 @@ var _ = Describe("controller", Ordered, func() {
//nolint:lll
expect := fmt.Sprintf(`[
{
"hashring": "default",
"hashring": "%s",
"tenant_matcher_type": "exact",
"endpoints": [
{
"address": "%s-0.%s.thanos-operator-system.svc.cluster.local:10901",
"az": ""
}
]
},
{
"hashring": "%s",
"tenants": [
"tenant1"
],
"tenant_matcher_type": "exact",
"endpoints": [
{
Expand All @@ -257,7 +279,7 @@ var _ = Describe("controller", Ordered, func() {
}
]
}
]`, ingesterName, ingesterName)
]`, hashringName, ingesterName, ingesterName, hashringTwoName, ingesterTwoName, ingesterTwoName)
Eventually(func() bool {
return utils.VerifyConfigMapContents(c, routerName, namespace, receive.HashringConfigKey, expect)
}, time.Minute*5, time.Second*10).Should(BeTrue())
Expand Down
13 changes: 13 additions & 0 deletions test/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,19 @@ func VerifyConfigMapContents(c client.Client, name, namespace, key, expect strin
}

data := cm.Data[key]

if json.Valid([]byte(data)) && json.Valid([]byte(expect)) {
var jData interface{}
if err := json.Unmarshal([]byte(data), &jData); err != nil {
return false
}
var jExpect interface{}
if err := json.Unmarshal([]byte(expect), &jExpect); err != nil {
return false
}
return equality.Semantic.DeepEqual(jData, jExpect)

}
return data == expect
}

Expand Down

0 comments on commit ad2a1b2

Please sign in to comment.