From 4fdfb99afbcab696621d2a5729b18303cdc7b296 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 23 Sep 2024 16:49:40 +0100 Subject: [PATCH] Make federate sharding possible in metrics-collector by using workers (#1628) * Remove crazy env var based testing in metrics-collector Signed-off-by: Saswata Mukherjee * Refactor forwarder, clean up code, make to client mTLS optional, remove unused recording-file and collect-file flags Signed-off-by: Saswata Mukherjee * Enable sharded metrics-collector with separate agents for recording and collectrules, also removed sort-of-defunct last metrics and reload handling Signed-off-by: Saswata Mukherjee * Remove the overly complex test for worker which now times out. The test only tests for reconfiguring, which again, is not a critical function of metrics-collector Signed-off-by: Saswata Mukherjee * Address feedback from review Signed-off-by: Saswata Mukherjee --------- Signed-off-by: Saswata Mukherjee --- collectors/metrics/Dockerfile.dev | 17 + .../metrics/cmd/metrics-collector/main.go | 520 ++++++++++++------ .../cmd/metrics-collector/main_test.go | 29 +- .../metrics/pkg/collectrule/evaluator.go | 23 +- collectors/metrics/pkg/forwarder/forwarder.go | 354 +++++++----- .../metrics/pkg/forwarder/forwarder_test.go | 176 +++--- .../metricfamily/hypershift_transformer.go | 42 +- .../hypershift_transformer_test.go | 13 +- .../pkg/metricsclient/metricsclient.go | 14 +- .../pkg/metricsclient/metricsclient_test.go | 4 +- collectors/metrics/pkg/status/status.go | 50 +- collectors/metrics/pkg/status/status_test.go | 19 +- ...servability-v1beta1-to-v1beta2-golden.yaml | 1 + .../v1beta2/custom-certs/observability.yaml | 1 + examples/mco/e2e/v1beta2/observability.yaml | 1 + .../pkg/collector/metrics_collector.go | 6 + .../multiclusterobservability_shared.go | 9 + ...bility-operator.clusterserviceversion.yaml | 2 +- ...gement.io_multiclusterobservabilities.yaml | 20 + ...ter-management.io_observabilityaddons.yaml | 10 + ...gement.io_multiclusterobservabilities.yaml | 20 + ...ter-management.io_observabilityaddons.yaml | 10 + 22 files changed, 784 insertions(+), 557 deletions(-) create mode 100644 collectors/metrics/Dockerfile.dev diff --git a/collectors/metrics/Dockerfile.dev b/collectors/metrics/Dockerfile.dev new file mode 100644 index 000000000..95b0b9727 --- /dev/null +++ b/collectors/metrics/Dockerfile.dev @@ -0,0 +1,17 @@ +FROM golang:1.22-alpine3.18 AS builder + +WORKDIR /workspace +COPY go.sum go.mod ./ +COPY ./collectors/metrics ./collectors/metrics +COPY ./operators/pkg ./operators/pkg +COPY ./operators/multiclusterobservability/api ./operators/multiclusterobservability/api +RUN go build -v -o metrics-collector ./collectors/metrics/cmd/metrics-collector/main.go + +FROM alpine:3.18 AS runner + +USER 1001:1001 + +COPY --from=builder /workspace/metrics-collector /usr/bin/ + + +CMD ["/bin/bash", "-c", "/usr/bin/metrics-collector"] diff --git a/collectors/metrics/cmd/metrics-collector/main.go b/collectors/metrics/cmd/metrics-collector/main.go index 384dac2fe..cd80ace5b 100644 --- a/collectors/metrics/cmd/metrics-collector/main.go +++ b/collectors/metrics/cmd/metrics-collector/main.go @@ -12,40 +12,44 @@ import ( "net/http" "net/url" "os" - "os/signal" "regexp" "strings" - "syscall" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/run" + hyperv1 "github.com/openshift/hypershift/api/hypershift/v1alpha1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/common/expfmt" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/collectrule" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/forwarder" collectorhttp "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/http" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/logger" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/metricfamily" + oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1" ) func main() { opt := &Options{ - From: "http://localhost:9090", - Listen: "localhost:9002", - LimitBytes: 200 * 1024, - Rules: []string{`{__name__="up"}`}, - Interval: 4*time.Minute + 30*time.Second, - EvaluateInterval: 30 * time.Second, - WorkerNum: 1, + From: "http://localhost:9090", + Listen: "localhost:9002", + LimitBytes: 200 * 1024, + Matchers: []string{`{__name__="up"}`}, + Interval: 4*time.Minute + 30*time.Second, + EvaluateInterval: 30 * time.Second, + WorkerNum: 1, + DisableHyperShift: false, + DisableStatusReporting: false, } cmd := &cobra.Command{ - Short: "Federate Prometheus via push", + Short: "Remote write federated metrics from prometheus", SilenceErrors: true, SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { @@ -57,7 +61,7 @@ func main() { &opt.WorkerNum, "worker-number", opt.WorkerNum, - "The number of client runs in the simulate environment.") + "The number of workers that will work in parallel to send metrics.") cmd.Flags().StringVar( &opt.Listen, "listen", @@ -126,37 +130,26 @@ func main() { opt.LimitBytes, "The maxiumum acceptable size of a response returned when scraping Prometheus.") - // TODO: more complex input definition, such as a JSON struct cmd.Flags().StringArrayVar( - &opt.Rules, + &opt.Matchers, "match", - opt.Rules, + opt.Matchers, "Match rules to federate.") cmd.Flags().StringVar( - &opt.RulesFile, + &opt.MatcherFile, "match-file", - opt.RulesFile, + opt.MatcherFile, "A file containing match rules to federate, one rule per line.") cmd.Flags().StringArrayVar( &opt.RecordingRules, "recordingrule", opt.RecordingRules, "Define recording rule is to generate new metrics based on specified query expression.") - cmd.Flags().StringVar( - &opt.RecordingRulesFile, - "recording-file", - opt.RulesFile, - "A file containing recording rules.") cmd.Flags().StringArrayVar( &opt.CollectRules, "collectrule", opt.CollectRules, "Define metrics collect rule is to collect additional metrics based on specified event.") - cmd.Flags().StringVar( - &opt.RecordingRulesFile, - "collect-file", - opt.RecordingRulesFile, - "A file containing collect rules.") cmd.Flags().StringSliceVar( &opt.LabelFlag, @@ -203,12 +196,17 @@ func main() { opt.LogLevel, "Log filtering level. e.g info, debug, warn, error") - // deprecated opt - cmd.Flags().StringVar( - &opt.Identifier, - "id", - opt.Identifier, - "The unique identifier for metrics sent with this client.") + cmd.Flags().BoolVar( + &opt.DisableStatusReporting, + "disable-status-reporting", + opt.DisableStatusReporting, + "Disable status reporting to hub cluster.") + + cmd.Flags().BoolVar( + &opt.DisableHyperShift, + "disable-hypershift", + opt.DisableHyperShift, + "Disable hypershift related metrics collection.") // simulation test cmd.Flags().StringVar( @@ -259,12 +257,10 @@ type Options struct { AnonymizeSalt string AnonymizeSaltFile string - Rules []string - RulesFile string - RecordingRules []string - RecordingRulesFile string - CollectRules []string - CollectRulesFile string + Matchers []string + MatcherFile string + RecordingRules []string + CollectRules []string LabelFlag []string Labels map[string]string @@ -275,17 +271,19 @@ type Options struct { LogLevel string Logger log.Logger - // deprecated - Identifier string - // simulation file SimulatedTimeseriesFile string // how many threads are running // for production, it is always 1 WorkerNum int64 + + DisableHyperShift bool + DisableStatusReporting bool } +// Run is the entry point of the metrics collector +// It is in charge of running forwarders, collectrule agent and recording rule agents. func (o *Options) Run() error { var g run.Group @@ -301,16 +299,36 @@ func (o *Options) Run() error { // Some packages still use default Register. Replace to have those metrics. prometheus.DefaultRegisterer = metricsReg - err, cfg := initConfig(o) + cfgRR, err := initShardedConfigs(o, AgentRecordingRule) + if err != nil { + return err + } + + shardCfgs, err := initShardedConfigs(o, AgentShardedForwarder) + if err != nil { + return err + } + + evalCfg, err := initShardedConfigs(o, AgentCollectRule) if err != nil { return err } metrics := forwarder.NewWorkerMetrics(metricsReg) - cfg.Metrics = metrics - worker, err := forwarder.New(*cfg) + evalCfg[0].Metrics = metrics + cfgRR[0].Metrics = metrics + recordingRuleWorker, err := forwarder.New(*cfgRR[0]) if err != nil { - return fmt.Errorf("failed to configure metrics collector: %w", err) + return fmt.Errorf("failed to configure recording rule worker: %w", err) + } + + shardWorkers := make([]*forwarder.Worker, len(shardCfgs)) + for i, shardCfg := range shardCfgs { + shardCfg.Metrics = metrics + shardWorkers[i], err = forwarder.New(*shardCfg) + if err != nil { + return fmt.Errorf("failed to configure shard worker %d: %w", i, err) + } } logger.Log( @@ -320,36 +338,31 @@ func (o *Options) Run() error { "to", o.ToUpload, "listen", o.Listen) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + { - // Execute the worker's `Run` func. - ctx, cancel := context.WithCancel(context.Background()) + // Execute the recording rule worker's `Run` func. g.Add(func() error { - worker.Run(ctx) + recordingRuleWorker.Run(ctx) return nil }, func(error) { cancel() }) } - { - // Notify and reload on SIGHUP. - hup := make(chan os.Signal, 1) - signal.Notify(hup, syscall.SIGHUP) - cancel := make(chan struct{}) + // Execute the shard workers' `Run` func. g.Add(func() error { - for { - select { - case <-hup: - if err := worker.Reconfigure(*cfg); err != nil { - logger.Log(o.Logger, logger.Error, "msg", "failed to reload config", "err", err) - return err - } - case <-cancel: - return nil - } + for i, shardWorker := range shardWorkers { + go func(i int, shardWorker *forwarder.Worker) { + logger.Log(o.Logger, logger.Info, "msg", "Starting shard worker", "worker", i) + shardWorker.Run(ctx) + }(i, shardWorker) } + <-ctx.Done() + return nil }, func(error) { - close(cancel) + cancel() }) } @@ -358,10 +371,6 @@ func (o *Options) Run() error { collectorhttp.DebugRoutes(handlers) collectorhttp.HealthRoutes(handlers) collectorhttp.MetricRoutes(handlers, metricsReg) - collectorhttp.ReloadRoutes(handlers, func() error { - return worker.Reconfigure(*cfg) - }) - handlers.Handle("/federate", serveLastMetrics(o.Logger, worker)) s := http.Server{ Addr: o.Listen, Handler: handlers, @@ -379,7 +388,7 @@ func (o *Options) Run() error { } return nil }, func(error) { - err := s.Shutdown(context.Background()) + err := s.Shutdown(ctx) if err != nil { logger.Log(o.Logger, logger.Error, "msg", "failed to close listener", "err", err) } @@ -387,17 +396,18 @@ func (o *Options) Run() error { } } - err = runMultiWorkers(o, cfg) + // Run the simulation agent. + err = runMultiWorkers(o, evalCfg[0]) if err != nil { return err } + // Run the Collectrules agent. if len(o.CollectRules) != 0 { - evaluator, err := collectrule.New(*cfg) + evaluator, err := collectrule.New(*evalCfg[0]) if err != nil { return fmt.Errorf("failed to configure collect rule evaluator: %w", err) } - ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { evaluator.Run(ctx) return nil @@ -409,68 +419,40 @@ func (o *Options) Run() error { return g.Run() } -func runMultiWorkers(o *Options, cfg *forwarder.Config) error { - for i := 1; i < int(o.WorkerNum); i++ { - opt := &Options{ - From: o.From, - FromQuery: o.FromQuery, - ToUpload: o.ToUpload, - FromCAFile: o.FromCAFile, - FromTokenFile: o.FromTokenFile, - ToUploadCA: o.ToUploadCA, - ToUploadCert: o.ToUploadCert, - ToUploadKey: o.ToUploadKey, - Rules: o.Rules, - RenameFlag: o.RenameFlag, - RecordingRules: o.RecordingRules, - Interval: o.Interval, - Labels: map[string]string{}, - SimulatedTimeseriesFile: o.SimulatedTimeseriesFile, - Logger: o.Logger, - } - for _, flag := range o.LabelFlag { - values := strings.SplitN(flag, "=", 2) - if len(values) != 2 { - return fmt.Errorf("--label must be of the form key=value: %s", flag) - } - if values[0] == "cluster" { - values[1] += "-" + fmt.Sprint(i) - } - if values[0] == "clusterID" { - values[1] = string(uuid.NewUUID()) - } - opt.Labels[values[0]] = values[1] - } - err, forwardCfg := initConfig(opt) - if err != nil { - return err - } - - forwardCfg.Metrics = cfg.Metrics - forwardWorker, err := forwarder.New(*forwardCfg) - if err != nil { - return fmt.Errorf("failed to configure metrics collector: %w", err) - } - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - forwardWorker.Run(ctx) - cancel() - }() +// splitMatchersIntoShards divides the matchers into approximately equal shards +func splitMatchersIntoShards(matchers []string, shardCount int) [][]string { + if shardCount <= 1 { + return [][]string{matchers} + } + shards := make([][]string, shardCount) + for i, matcher := range matchers { + shardIndex := i % shardCount + shards[shardIndex] = append(shards[shardIndex], matcher) } - return nil + + return shards } -func initConfig(o *Options) (error, *forwarder.Config) { +// Agent is the type of the worker agent that will be running. +// They are classified according to what they collect. +type Agent string + +const ( + AgentCollectRule Agent = "collectrule" + AgentRecordingRule Agent = "recordingrule" + AgentShardedForwarder Agent = "forwarder" +) + +func initShardedConfigs(o *Options, agent Agent) ([]*forwarder.Config, error) { if len(o.From) == 0 { - return errors.New("you must specify a Prometheus server to federate from (e.g. http://localhost:9090)"), nil + return nil, errors.New("you must specify a Prometheus server to federate from (e.g. http://localhost:9090)") } for _, flag := range o.LabelFlag { values := strings.SplitN(flag, "=", 2) if len(values) != 2 { - return fmt.Errorf("--label must be of the form key=value: %s", flag), nil + return nil, fmt.Errorf("--label must be of the form key=value: %s", flag) } if o.Labels == nil { o.Labels = make(map[string]string) @@ -484,7 +466,7 @@ func initConfig(o *Options) (error, *forwarder.Config) { } values := strings.SplitN(flag, "=", 2) if len(values) != 2 { - return fmt.Errorf("--rename must be of the form OLD_NAME=NEW_NAME: %s", flag), nil + return nil, fmt.Errorf("--rename must be of the form OLD_NAME=NEW_NAME: %s", flag) } if o.Renames == nil { o.Renames = make(map[string]string) @@ -494,7 +476,7 @@ func initConfig(o *Options) (error, *forwarder.Config) { from, err := url.Parse(o.From) if err != nil { - return fmt.Errorf("--from is not a valid URL: %w", err), nil + return nil, fmt.Errorf("--from is not a valid URL: %w", err) } from.Path = strings.TrimRight(from.Path, "/") if len(from.Path) == 0 { @@ -503,8 +485,9 @@ func initConfig(o *Options) (error, *forwarder.Config) { fromQuery, err := url.Parse(o.FromQuery) if err != nil { - return fmt.Errorf("--from-query is not a valid URL: %w", err), nil + return nil, fmt.Errorf("--from-query is not a valid URL: %w", err) } + fromQuery.Path = strings.TrimRight(fromQuery.Path, "/") if len(fromQuery.Path) == 0 { fromQuery.Path = "/api/v1/query" @@ -514,12 +497,12 @@ func initConfig(o *Options) (error, *forwarder.Config) { if len(o.ToUpload) > 0 { toUpload, err = url.Parse(o.ToUpload) if err != nil { - return fmt.Errorf("--to-upload is not a valid URL: %w", err), nil + return nil, fmt.Errorf("--to-upload is not a valid URL: %w", err) } } if toUpload == nil { - return errors.New("--to-upload must be specified"), nil + return nil, errors.New("--to-upload must be specified") } var transformer metricfamily.MultiTransformer @@ -554,68 +537,241 @@ func initConfig(o *Options) (error, *forwarder.Config) { transformer.With(metricfamily.TransformerFunc(metricfamily.PackMetrics)) transformer.With(metricfamily.TransformerFunc(metricfamily.SortMetrics)) - isHypershift, err := metricfamily.CheckCRDExist(o.Logger) - if err != nil { - return err, nil + // TODO(saswatamcode): Kill this feature. + // This is too messy of an approach, to get hypershift specific labels into metrics we send. + // There is much better way to do this, with relabel configs. + // A collection agent shouldn't be calling out to Kube API server just to add labels. + if !o.DisableHyperShift { + isHypershift, err := metricfamily.CheckCRDExist(o.Logger) + if err != nil { + return nil, err + } + if isHypershift { + config, err := clientcmd.BuildConfigFromFlags("", "") + if err != nil { + return nil, errors.New("failed to create the kube config for hypershiftv1") + } + s := scheme.Scheme + if err := hyperv1.AddToScheme(s); err != nil { + return nil, errors.New("failed to add observabilityaddon into scheme") + } + hClient, err := client.New(config, client.Options{Scheme: s}) + if err != nil { + return nil, errors.New("failed to create the kube client") + } + + hyperTransformer, err := metricfamily.NewHypershiftTransformer(hClient, o.Logger, o.Labels) + if err != nil { + return nil, err + } + transformer.WithFunc(func() metricfamily.Transformer { + return hyperTransformer + }) + } } - if isHypershift { - hyperTransformer, err := metricfamily.NewHypershiftTransformer(o.Logger, nil, o.Labels) + + // Configure matchers. + matchers := o.Matchers + if len(o.MatcherFile) > 0 { + data, err := os.ReadFile(o.MatcherFile) if err != nil { - return err, nil + return nil, fmt.Errorf("unable to read match-file: %w", err) + } + matchers = append(matchers, strings.Split(string(data), "\n")...) + } + for i := 0; i < len(matchers); { + s := strings.TrimSpace(matchers[i]) + if len(s) == 0 { + matchers = append(matchers[:i], matchers[i+1:]...) + continue + } + matchers[i] = s + i++ + } + + var statusClient client.Client + + // TODO(saswatamcode): Evaluate better way for status reporting. + // Currently, it reports status for every 3 errors in forward request. This is too much of an overhead + // Every remote write request error does not need to be reported. + // Instead we can try to do this with metrics-collector meta-monitoring + if !o.DisableStatusReporting { + config, err := clientcmd.BuildConfigFromFlags("", "") + if err != nil { + return nil, errors.New("failed to create the kube config for status") + } + s := scheme.Scheme + if err := oav1beta1.AddToScheme(s); err != nil { + return nil, errors.New("failed to add observabilityaddon into scheme") + } + + statusClient, err = client.New(config, client.Options{Scheme: s}) + if err != nil { + return nil, errors.New("failed to create the kube client") } - transformer.WithFunc(func() metricfamily.Transformer { - return hyperTransformer - }) } - return nil, &forwarder.Config{ - From: from, - FromQuery: fromQuery, - ToUpload: toUpload, - FromToken: o.FromToken, - FromTokenFile: o.FromTokenFile, - FromCAFile: o.FromCAFile, - ToUploadCA: o.ToUploadCA, - ToUploadCert: o.ToUploadCert, - ToUploadKey: o.ToUploadKey, - - AnonymizeLabels: o.AnonymizeLabels, - AnonymizeSalt: o.AnonymizeSalt, - AnonymizeSaltFile: o.AnonymizeSaltFile, - Debug: o.Verbose, - Interval: o.Interval, - EvaluateInterval: o.EvaluateInterval, - LimitBytes: o.LimitBytes, - Rules: o.Rules, - RulesFile: o.RulesFile, - RecordingRules: o.RecordingRules, - CollectRules: o.CollectRules, - Transformer: transformer, - - Logger: o.Logger, - SimulatedTimeseriesFile: o.SimulatedTimeseriesFile, + switch agent { + case AgentCollectRule: + f := forwarder.Config{ + FromClientConfig: forwarder.FromClientConfig{ + URL: from, + QueryURL: fromQuery, + Token: o.FromToken, + TokenFile: o.FromTokenFile, + CAFile: o.FromCAFile, + }, + ToClientConfig: forwarder.ToClientConfig{ + URL: toUpload, + CAFile: o.ToUploadCA, + CertFile: o.ToUploadCert, + KeyFile: o.ToUploadKey, + }, + + StatusClient: statusClient, + AnonymizeLabels: o.AnonymizeLabels, + AnonymizeSalt: o.AnonymizeSalt, + AnonymizeSaltFile: o.AnonymizeSaltFile, + Debug: o.Verbose, + Interval: o.Interval, + EvaluateInterval: o.EvaluateInterval, + LimitBytes: o.LimitBytes, + Matchers: matchers, + RecordingRules: o.RecordingRules, + CollectRules: o.CollectRules, + Transformer: transformer, + + Logger: o.Logger, + SimulatedTimeseriesFile: o.SimulatedTimeseriesFile, + } + return []*forwarder.Config{&f}, nil + + case AgentRecordingRule: + f := forwarder.Config{ + FromClientConfig: forwarder.FromClientConfig{ + URL: from, + QueryURL: fromQuery, + Token: o.FromToken, + TokenFile: o.FromTokenFile, + CAFile: o.FromCAFile, + }, + ToClientConfig: forwarder.ToClientConfig{ + URL: toUpload, + CAFile: o.ToUploadCA, + CertFile: o.ToUploadCert, + KeyFile: o.ToUploadKey, + }, + + StatusClient: statusClient, + AnonymizeLabels: o.AnonymizeLabels, + AnonymizeSalt: o.AnonymizeSalt, + AnonymizeSaltFile: o.AnonymizeSaltFile, + Debug: o.Verbose, + Interval: o.Interval, + EvaluateInterval: o.EvaluateInterval, + LimitBytes: o.LimitBytes, + RecordingRules: o.RecordingRules, + Transformer: transformer, + + Logger: o.Logger, + SimulatedTimeseriesFile: o.SimulatedTimeseriesFile, + } + return []*forwarder.Config{&f}, nil + + case AgentShardedForwarder: + if len(matchers) < int(o.WorkerNum) { + return nil, errors.New("number of shards is greater than the number of matchers") + } + + shards := splitMatchersIntoShards(matchers, int(o.WorkerNum)) + + shardCfgs := make([]*forwarder.Config, len(shards)) + for i, shard := range shards { + shardCfgs[i] = &forwarder.Config{ + FromClientConfig: forwarder.FromClientConfig{ + URL: from, + QueryURL: fromQuery, + Token: o.FromToken, + TokenFile: o.FromTokenFile, + CAFile: o.FromCAFile, + }, + StatusClient: statusClient, + ToClientConfig: forwarder.ToClientConfig{ + URL: toUpload, + CAFile: o.ToUploadCA, + CertFile: o.ToUploadCert, + KeyFile: o.ToUploadKey, + }, + AnonymizeLabels: o.AnonymizeLabels, + AnonymizeSalt: o.AnonymizeSalt, + AnonymizeSaltFile: o.AnonymizeSaltFile, + Debug: o.Verbose, + Interval: o.Interval, + EvaluateInterval: o.EvaluateInterval, + LimitBytes: o.LimitBytes, + Matchers: shard, + Transformer: transformer, + Logger: log.With(o.Logger, "shard", i), + SimulatedTimeseriesFile: o.SimulatedTimeseriesFile, + } + } + return shardCfgs, nil + default: + return nil, errors.New("invalid agent type") } } -// serveLastMetrics retrieves the last set of metrics served. -func serveLastMetrics(l log.Logger, worker *forwarder.Worker) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if req.Method != "GET" { - w.WriteHeader(http.StatusMethodNotAllowed) - return +func runMultiWorkers(o *Options, cfg *forwarder.Config) error { + for i := 1; i < int(o.WorkerNum); i++ { + opt := &Options{ + From: o.From, + FromQuery: o.FromQuery, + ToUpload: o.ToUpload, + FromCAFile: o.FromCAFile, + FromTokenFile: o.FromTokenFile, + ToUploadCA: o.ToUploadCA, + ToUploadCert: o.ToUploadCert, + ToUploadKey: o.ToUploadKey, + Matchers: o.Matchers, + RecordingRules: o.RecordingRules, + Interval: o.Interval, + Labels: map[string]string{}, + SimulatedTimeseriesFile: o.SimulatedTimeseriesFile, + Logger: o.Logger, + DisableHyperShift: o.DisableHyperShift, + DisableStatusReporting: o.DisableStatusReporting, } - families := worker.LastMetrics() - protoTextFormat := expfmt.NewFormat(expfmt.TypeProtoText) - w.Header().Set("Content-Type", string(protoTextFormat)) - encoder := expfmt.NewEncoder(w, protoTextFormat) - for _, family := range families { - if family == nil { - continue + for _, flag := range o.LabelFlag { + values := strings.SplitN(flag, "=", 2) + if len(values) != 2 { + return fmt.Errorf("--label must be of the form key=value: %s", flag) } - if err := encoder.Encode(family); err != nil { - logger.Log(l, logger.Error, "msg", "unable to write metrics for family", "err", err) - break + if values[0] == "cluster" { + values[1] += "-" + fmt.Sprint(i) + } + if values[0] == "clusterID" { + values[1] = string(uuid.NewUUID()) } + opt.Labels[values[0]] = values[1] } - }) + + forwardCfg, err := initShardedConfigs(opt, AgentCollectRule) + if err != nil { + return err + } + + forwardCfg[0].Metrics = cfg.Metrics + forwardWorker, err := forwarder.New(*forwardCfg[0]) + if err != nil { + return fmt.Errorf("failed to configure metrics collector: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + forwardWorker.Run(ctx) + cancel() + }() + + } + return nil } diff --git a/collectors/metrics/cmd/metrics-collector/main_test.go b/collectors/metrics/cmd/metrics-collector/main_test.go index 3b28bdfa6..5ee34be01 100644 --- a/collectors/metrics/cmd/metrics-collector/main_test.go +++ b/collectors/metrics/cmd/metrics-collector/main_test.go @@ -15,18 +15,17 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/forwarder" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/logger" -) + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" -func init() { - os.Setenv("UNIT_TEST", "true") -} + oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1" +) func TestMultiWorkers(t *testing.T) { - opt := &Options{ Listen: "localhost:9002", LimitBytes: 200 * 1024, - Rules: []string{`{__name__="instance:node_vmstat_pgmajfault:rate1m"}`}, + Matchers: []string{`{__name__="instance:node_vmstat_pgmajfault:rate1m"}`}, Interval: 4*time.Minute + 30*time.Second, WorkerNum: 2, SimulatedTimeseriesFile: "../../testdata/timeseries.txt", @@ -36,8 +35,8 @@ func TestMultiWorkers(t *testing.T) { "cluster=local-cluster", "clusterID=245c2253-7b0d-4080-8e33-f6f0d6c6ff73", }, - FromCAFile: "../../testdata/service-ca.crt", - FromTokenFile: "../../testdata/token", + DisableHyperShift: true, + DisableStatusReporting: true, } l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) @@ -47,7 +46,19 @@ func TestMultiWorkers(t *testing.T) { stdlog.SetOutput(log.NewStdlibAdapter(l)) opt.Logger = l - err := runMultiWorkers(opt, &forwarder.Config{Metrics: forwarder.NewWorkerMetrics(prometheus.NewRegistry())}) + sc := scheme.Scheme + if err := oav1beta1.AddToScheme(sc); err != nil { + t.Fatal("failed to add observabilityaddon into scheme") + } + kubeClient := fake.NewClientBuilder(). + WithScheme(sc). + WithStatusSubresource(&oav1beta1.ObservabilityAddon{}). + Build() + + err := runMultiWorkers(opt, &forwarder.Config{ + Metrics: forwarder.NewWorkerMetrics(prometheus.NewRegistry()), + StatusClient: kubeClient, + }) if err != nil { t.Fatal(err) } diff --git a/collectors/metrics/pkg/collectrule/evaluator.go b/collectors/metrics/pkg/collectrule/evaluator.go index 850379675..381cfe7f4 100644 --- a/collectors/metrics/pkg/collectrule/evaluator.go +++ b/collectors/metrics/pkg/collectrule/evaluator.go @@ -66,15 +66,8 @@ type Evaluator struct { func New(cfg forwarder.Config) (*Evaluator, error) { config = forwarder.Config{ - From: cfg.From, - FromToken: cfg.FromToken, - FromTokenFile: cfg.FromTokenFile, - FromCAFile: cfg.FromCAFile, - - ToUpload: cfg.ToUpload, - ToUploadCA: cfg.ToUploadCA, - ToUploadCert: cfg.ToUploadCert, - ToUploadKey: cfg.ToUploadKey, + FromClientConfig: cfg.FromClientConfig, + ToClientConfig: cfg.ToClientConfig, AnonymizeLabels: cfg.AnonymizeLabels, AnonymizeSalt: cfg.AnonymizeSalt, @@ -88,8 +81,8 @@ func New(cfg forwarder.Config) (*Evaluator, error) { Metrics: cfg.Metrics, } from := &url.URL{ - Scheme: cfg.From.Scheme, - Host: cfg.From.Host, + Scheme: cfg.FromClientConfig.URL.Scheme, + Host: cfg.FromClientConfig.URL.Host, Path: "/api/v1/query", } evaluator := Evaluator{ @@ -108,7 +101,7 @@ func New(cfg forwarder.Config) (*Evaluator, error) { evaluator.interval = 30 * time.Second } - fromClient, err := forwarder.CreateFromClient(cfg, cfg.Metrics, evaluator.interval, "evaluate_query", cfg.Logger) + fromClient, err := cfg.CreateFromClient(cfg.Metrics, evaluator.interval, "evaluate_query", cfg.Logger) if err != nil { return nil, err } @@ -336,7 +329,7 @@ func (e *Evaluator) evaluate(ctx context.Context) { from.RawQuery = v.Encode() req := &http.Request{Method: "GET", URL: from} - result, err := e.fromClient.RetrievRecordingMetrics(ctx, req, r.Name) + result, err := e.fromClient.RetrieveRecordingMetrics(ctx, req, r.Name) if err != nil { rlogger.Log(e.logger, rlogger.Error, "msg", "failed to evaluate collect rule", "err", err, "rule", r.Expr) continue @@ -347,9 +340,9 @@ func (e *Evaluator) evaluate(ctx context.Context) { } } if isUpdate { - config.Rules = getMatches() + config.Matchers = getMatches() - if len(config.Rules) == 0 { + if len(config.Matchers) == 0 { if forwardWorker != nil && cancel != nil { cancel() forwardWorker = nil diff --git a/collectors/metrics/pkg/forwarder/forwarder.go b/collectors/metrics/pkg/forwarder/forwarder.go index 679ac683a..fd5a41e9d 100644 --- a/collectors/metrics/pkg/forwarder/forwarder.go +++ b/collectors/metrics/pkg/forwarder/forwarder.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" clientmodel "github.com/prometheus/client_model/go" + "sigs.k8s.io/controller-runtime/pkg/client" metricshttp "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/http" rlogger "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/logger" @@ -35,95 +36,102 @@ import ( const ( failedStatusReportMsg = "Failed to report status" uwlPromURL = "https://prometheus-user-workload.openshift-user-workload-monitoring.svc:9092" -) -type RuleMatcher interface { - MatchRules() []string -} + matchParam = "match[]" + queryParam = "query" +) // Config defines the parameters that can be used to configure a worker. -// The only required field is `From`. type Config struct { - From *url.URL - FromQuery *url.URL - ToUpload *url.URL - FromToken string - FromTokenFile string - FromCAFile string - ToUploadCA string - ToUploadCert string - ToUploadKey string - - AnonymizeLabels []string - AnonymizeSalt string - AnonymizeSaltFile string - Debug bool - Interval time.Duration - EvaluateInterval time.Duration - LimitBytes int64 - Rules []string - RulesFile string - RecordingRules []string - RecordingRulesFile string - CollectRules []string - CollectRulesFile string - Transformer metricfamily.Transformer - - Logger log.Logger + // StatusClient is a kube client used to report status to the hub. + StatusClient client.Client + Logger log.Logger + Metrics *workerMetrics + + // FromClientConfig is the config for the client used in sending /federate requests to Prometheus. + FromClientConfig FromClientConfig + // ToClientConfig is the config for the client used in sending remote write requests to Thanos Receive. + ToClientConfig ToClientConfig + // Enable debug roundtrippers for from and to clients. + Debug bool + // LimitBytes limits the size of the requests made to from and to clients. + LimitBytes int64 + + // Interval is the interval at which workers will federate Prometheus and send remote write requests. + // 4m30s by default + Interval time.Duration + // EvaluateInterval is actually used to configure collectrule evaluator in collectrule/evaluator.go. + EvaluateInterval time.Duration + + // Fields for anonymizing metrics. + AnonymizeLabels []string + AnonymizeSalt string + AnonymizeSaltFile string + + // Matchers is the list of matchers to use for filtering metrics, they are appended to URL during /federate calls. + Matchers []string + // RecordingRules is the list of recording rules to evaluate and send as a new series in remote write. + // TODO(saswatamcode): Kill this feature. + RecordingRules []string + // CollectRules are unique rules, that basically add matchers, based on some PromQL rule. + // They are used to collect additional metrics when things are going wrong. + // TODO(saswatamcode): Do this some place else or re-evaluate if we even need this. + CollectRules []string + // SimulateTimeseriesFile provides configuration for sending simulated data. Used in perfscale tests? + // TODO(saswatamcode): Kill this feature, simulation testing logic should not be included in business logic. SimulatedTimeseriesFile string - Metrics *workerMetrics + // Transformer is used to transform metrics before sending them to Thanos Receive. + // We pass in transformers for eliding labels, hypershift etc. + Transformer metricfamily.Transformer } -// Worker represents a metrics forwarding agent. It collects metrics from a source URL and forwards them to a sink. -// A Worker should be configured with a `Config` and instantiated with the `New` func. -// Workers are thread safe; all access to shared fields are synchronized. -type Worker struct { - fromClient *metricsclient.Client - toClient *metricsclient.Client - from *url.URL - fromQuery *url.URL - to *url.URL - - interval time.Duration - transformer metricfamily.Transformer - rules []string - recordingRules []string - - lastMetrics []*clientmodel.MetricFamily - lock sync.Mutex - reconfigure chan struct{} - - logger log.Logger - - simulatedTimeseriesFile string - - status status.StatusReport +type FromClientConfig struct { + URL *url.URL + QueryURL *url.URL + CAFile string + Token string + TokenFile string +} - metrics *workerMetrics - forwardFailures int +type ToClientConfig struct { + URL *url.URL + CAFile string + CertFile string + KeyFile string } -func CreateFromClient(cfg Config, metrics *workerMetrics, interval time.Duration, name string, - logger log.Logger) (*metricsclient.Client, error) { - fromTransport := metricsclient.DefaultTransport(logger, false) - if len(cfg.FromCAFile) > 0 { +// CreateFromClient creates a new metrics client for the from URL. +// Needs to be exported here so that it can be used in collectrule evaluator. +func (cfg Config) CreateFromClient( + metrics *workerMetrics, + interval time.Duration, + name string, + logger log.Logger, +) (*metricsclient.Client, error) { + fromTransport := metricsclient.DefaultTransport(logger) + + if len(cfg.FromClientConfig.CAFile) > 0 { if fromTransport.TLSClientConfig == nil { fromTransport.TLSClientConfig = &tls.Config{ MinVersion: tls.VersionTLS12, } } + pool, err := x509.SystemCertPool() if err != nil { return nil, fmt.Errorf("failed to read system certificates: %w", err) } - data, err := os.ReadFile(cfg.FromCAFile) + + data, err := os.ReadFile(cfg.FromClientConfig.CAFile) if err != nil { return nil, fmt.Errorf("failed to read from-ca-file: %w", err) } + if !pool.AppendCertsFromPEM(data) { rlogger.Log(logger, rlogger.Warn, "msg", "no certs found in from-ca-file") } + fromTransport.TLSClientConfig.RootCAs = pool } else { if fromTransport.TLSClientConfig == nil { @@ -140,25 +148,60 @@ func CreateFromClient(cfg Config, metrics *workerMetrics, interval time.Duration if cfg.Debug { fromClient.Transport = metricshttp.NewDebugRoundTripper(logger, fromClient.Transport) } - if len(cfg.FromToken) == 0 && len(cfg.FromTokenFile) > 0 { - data, err := os.ReadFile(cfg.FromTokenFile) + + if len(cfg.FromClientConfig.Token) == 0 && len(cfg.FromClientConfig.TokenFile) > 0 { + data, err := os.ReadFile(cfg.FromClientConfig.TokenFile) if err != nil { return nil, fmt.Errorf("unable to read from-token-file: %w", err) } - cfg.FromToken = strings.TrimSpace(string(data)) - } - if len(cfg.FromToken) > 0 { - fromClient.Transport = metricshttp.NewBearerRoundTripper(cfg.FromToken, fromClient.Transport) + cfg.FromClientConfig.Token = strings.TrimSpace(string(data)) } - from := metricsclient.New(logger, metrics.clientMetrics, fromClient, cfg.LimitBytes, interval, "federate_from") + if len(cfg.FromClientConfig.Token) > 0 { + fromClient.Transport = metricshttp.NewBearerRoundTripper(cfg.FromClientConfig.Token, fromClient.Transport) + } - return from, nil + return metricsclient.New(logger, metrics.clientMetrics, fromClient, cfg.LimitBytes, interval, "federate_from"), nil } -func createClients(cfg Config, metrics *metricsclient.ClientMetrics, interval time.Duration, - logger log.Logger) (*metricsclient.Client, *metricsclient.Client, metricfamily.MultiTransformer, error) { +// CreateToClient creates a new metrics client for the to URL. +// Uses config for CA, Cert, Key for configuring mTLS transport. +// Skips if nothing is provided. +func (cfg Config) CreateToClient( + metrics *workerMetrics, + interval time.Duration, + name string, + logger log.Logger, +) (*metricsclient.Client, error) { + var err error + toTransport := metricsclient.DefaultTransport(logger) + + if len(cfg.ToClientConfig.CAFile) > 0 { + toTransport, err = metricsclient.MTLSTransport(logger, cfg.ToClientConfig.CAFile, cfg.ToClientConfig.CertFile, cfg.ToClientConfig.KeyFile) + if err != nil { + return nil, fmt.Errorf("failed to create TLS transport: %w", err) + } + } else { + if toTransport.TLSClientConfig == nil { + // #nosec G402 -- Only used if no TLS config is provided. + toTransport.TLSClientConfig = &tls.Config{ + MinVersion: tls.VersionTLS12, + InsecureSkipVerify: true, + } + } + } + toTransport.Proxy = http.ProxyFromEnvironment + toClient := &http.Client{Transport: toTransport} + if cfg.Debug { + toClient.Transport = metricshttp.NewDebugRoundTripper(logger, toClient.Transport) + } + + return metricsclient.New(logger, metrics.clientMetrics, toClient, cfg.LimitBytes, interval, name), nil +} + +// GetTransformer creates a new transformer based on the provided Config. +func (cfg Config) GetTransformer(logger log.Logger) (metricfamily.MultiTransformer, error) { var transformer metricfamily.MultiTransformer // Configure the anonymization. @@ -166,42 +209,56 @@ func createClients(cfg Config, metrics *metricsclient.ClientMetrics, interval ti if len(cfg.AnonymizeSalt) == 0 && len(cfg.AnonymizeSaltFile) > 0 { data, err := os.ReadFile(cfg.AnonymizeSaltFile) if err != nil { - return nil, nil, transformer, fmt.Errorf("failed to read anonymize-salt-file: %w", err) + return transformer, fmt.Errorf("failed to read anonymize-salt-file: %w", err) } anonymizeSalt = strings.TrimSpace(string(data)) } + if len(cfg.AnonymizeLabels) != 0 && len(anonymizeSalt) == 0 { - return nil, nil, transformer, errors.New("anonymize-salt must be specified if anonymize-labels is set") + return transformer, errors.New("anonymize-salt must be specified if anonymize-labels is set") } + if len(cfg.AnonymizeLabels) == 0 { rlogger.Log(logger, rlogger.Warn, "msg", "not anonymizing any labels") } - // Configure a transformer. + // Combine with config transformer if cfg.Transformer != nil { transformer.With(cfg.Transformer) } + if len(cfg.AnonymizeLabels) > 0 { transformer.With(metricfamily.NewMetricsAnonymizer(anonymizeSalt, cfg.AnonymizeLabels, nil)) } - from, err := CreateFromClient(cfg, cfg.Metrics, interval, "federate_from", logger) - if err != nil { - return nil, nil, transformer, err - } - // Create the `toClient`. + return transformer, nil +} + +// Worker represents a metrics forwarding agent. It collects metrics from a source URL and forwards them to a sink. +// A Worker should be configured with a `Config` and instantiated with the `New` func. +// Workers are thread safe; all access to shared fields are synchronized. +type Worker struct { + logger log.Logger + status status.Reporter + reconfigure chan struct{} + lock sync.Mutex + metrics *workerMetrics + forwardFailures int - toTransport, err := metricsclient.MTLSTransport(logger, cfg.ToUploadCA, cfg.ToUploadCert, cfg.ToUploadKey) - if err != nil { - return nil, nil, transformer, fmt.Errorf("failed to create TLS transport: %w", err) - } - toTransport.Proxy = http.ProxyFromEnvironment - toClient := &http.Client{Transport: toTransport} - if cfg.Debug { - toClient.Transport = metricshttp.NewDebugRoundTripper(logger, toClient.Transport) - } - to := metricsclient.New(logger, metrics, toClient, cfg.LimitBytes, interval, "federate_to") - return from, to, transformer, nil + fromClient *metricsclient.Client + toClient *metricsclient.Client + from *url.URL + fromQuery *url.URL + to *url.URL + + interval time.Duration + + transformer metricfamily.Transformer + matchers []string + recordingRules []string + simulatedTimeseriesFile string + + lastMetrics []*clientmodel.MetricFamily } type workerMetrics struct { @@ -236,20 +293,21 @@ func NewWorkerMetrics(reg *prometheus.Registry) *workerMetrics { } } -// New creates a new Worker based on the provided Config. If the Config contains invalid -// values, then an error is returned. +// New creates a new Worker based on the provided Config. func New(cfg Config) (*Worker, error) { - if cfg.From == nil { + if cfg.FromClientConfig.URL == nil { return nil, errors.New("a URL from which to scrape is required") } + logger := log.With(cfg.Logger, "component", "forwarder") - rlogger.Log(logger, rlogger.Warn, "msg", cfg.ToUpload) + rlogger.Log(logger, rlogger.Warn, "msg", cfg.ToClientConfig.URL) + w := Worker{ - from: cfg.From, - fromQuery: cfg.FromQuery, + from: cfg.FromClientConfig.URL, + fromQuery: cfg.FromClientConfig.QueryURL, interval: cfg.Interval, reconfigure: make(chan struct{}), - to: cfg.ToUpload, + to: cfg.ToClientConfig.URL, logger: log.With(cfg.Logger, "component", "forwarder/worker"), simulatedTimeseriesFile: cfg.SimulatedTimeseriesFile, metrics: cfg.Metrics, @@ -259,33 +317,26 @@ func New(cfg Config) (*Worker, error) { w.interval = 4*time.Minute + 30*time.Second } - fromClient, toClient, transformer, err := createClients(cfg, w.metrics.clientMetrics, w.interval, logger) + fromClient, err := cfg.CreateFromClient(w.metrics, w.interval, "federate_from", logger) + if err != nil { + return nil, err + } + + toClient, err := cfg.CreateToClient(w.metrics, w.interval, "federate_to", logger) + if err != nil { + return nil, err + } + + transformer, err := cfg.GetTransformer(logger) if err != nil { return nil, err } + w.fromClient = fromClient w.toClient = toClient w.transformer = transformer - // Configure the matching rules. - rules := cfg.Rules - if len(cfg.RulesFile) > 0 { - data, err := os.ReadFile(cfg.RulesFile) - if err != nil { - return nil, fmt.Errorf("unable to read match-file: %w", err) - } - rules = append(rules, strings.Split(string(data), "\n")...) - } - for i := 0; i < len(rules); { - s := strings.TrimSpace(rules[i]) - if len(s) == 0 { - rules = append(rules[:i], rules[i+1:]...) - continue - } - rules[i] = s - i++ - } - w.rules = rules + w.matchers = cfg.Matchers // Configure the recording rules. recordingRules := cfg.RecordingRules @@ -300,18 +351,25 @@ func New(cfg Config) (*Worker, error) { } w.recordingRules = recordingRules - standalone := os.Getenv("STANDALONE") == "true" - isUwl := strings.Contains(os.Getenv("FROM"), uwlPromURL) - s, err := status.New(logger, standalone, isUwl) - if err != nil { - return nil, fmt.Errorf("unable to create StatusReport: %w", err) + w.status = &status.NoopReporter{} + if cfg.StatusClient != nil { + standalone := os.Getenv("STANDALONE") == "true" + isUwl := strings.Contains(os.Getenv("FROM"), uwlPromURL) + s, err := status.New(cfg.StatusClient, logger, standalone, isUwl) + if err != nil { + return nil, fmt.Errorf("unable to create StatusReport: %w", err) + } + w.status = s } - w.status = *s return &w, nil } -// Reconfigure temporarily stops a worker and reconfigures is with the provided Config. +// TODO(saswatamcode): This is a relic of telemeter code, but with how our workers are configured, there will often +// be no meaningful reload semantics, as most values are often kept as a flags which need restarts. +// There is an option to explore this later, by instead "watching" matcherfile. +// Keeping this method for now, but it is effectively unused. +// Reconfigure temporarily stops a worker and reconfigures is with the provided Condfig. // Is thread safe and can run concurrently with `LastMetrics` and `Run`. func (w *Worker) Reconfigure(cfg Config) error { worker, err := New(cfg) @@ -328,7 +386,7 @@ func (w *Worker) Reconfigure(cfg Config) error { w.from = worker.from w.to = worker.to w.transformer = worker.transformer - w.rules = worker.rules + w.matchers = worker.matchers w.recordingRules = worker.recordingRules // Signal a restart to Run func. @@ -337,6 +395,9 @@ func (w *Worker) Reconfigure(cfg Config) error { return nil } +// TODO(saswatamcode): This is a relic of telemeter code, remove this. +// There is no such utility to exposing this information as to what the last value of metrics sent was +// Rarely would this be used as a tool for debugging, when you already have remote write metrics. func (w *Worker) LastMetrics() []*clientmodel.MetricFamily { w.lock.Lock() defer w.lock.Unlock() @@ -344,25 +405,24 @@ func (w *Worker) LastMetrics() []*clientmodel.MetricFamily { } func (w *Worker) Run(ctx context.Context) { - for { - // Ensure that the Worker does not access critical configuration during a reconfiguration. - w.lock.Lock() - wait := w.interval - // The critical section ends here. - w.lock.Unlock() - - if err := w.forward(ctx); err != nil { - rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err) - wait = time.Minute - } + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + for { select { // If the context is canceled, then we're done. case <-ctx.Done(): + ticker.Stop() return - case <-time.After(wait): + case <-ticker.C: + if err := w.forward(ctx); err != nil { + rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err) + } // We want to be able to interrupt a sleep to immediately apply a new configuration. case <-w.reconfigure: + w.lock.Lock() + ticker.Reset(w.interval) + w.lock.Unlock() } } } @@ -460,8 +520,12 @@ func (w *Worker) getFederateMetrics(ctx context.Context) ([]*clientmodel.MetricF from := w.from from.RawQuery = "" v := from.Query() - for _, rule := range w.rules { - v.Add("match[]", rule) + if len(w.matchers) == 0 { + return families, nil + } + + for _, matcher := range w.matchers { + v.Add(matchParam, matcher) } from.RawQuery = v.Encode() @@ -481,6 +545,10 @@ func (w *Worker) getRecordingMetrics(ctx context.Context) ([]*clientmodel.Metric from := w.fromQuery + if len(w.recordingRules) == 0 { + return families, nil + } + for _, rule := range w.recordingRules { var r map[string]string err := json.Unmarshal(([]byte)(rule), &r) @@ -495,11 +563,11 @@ func (w *Worker) getRecordingMetrics(ctx context.Context) ([]*clientmodel.Metric // reset query from last invocation, otherwise match rules will be appended from.RawQuery = "" v := w.fromQuery.Query() - v.Add("query", rquery) + v.Add(queryParam, rquery) from.RawQuery = v.Encode() req := &http.Request{Method: "GET", URL: from} - rfamilies, err := w.fromClient.RetrievRecordingMetrics(ctx, req, rname) + rfamilies, err := w.fromClient.RetrieveRecordingMetrics(ctx, req, rname) if err != nil { rlogger.Log(w.logger, rlogger.Warn, "msg", "Failed to retrieve recording metrics", "err", err, "url", from) e = err diff --git a/collectors/metrics/pkg/forwarder/forwarder_test.go b/collectors/metrics/pkg/forwarder/forwarder_test.go index 2b224b6ff..292874ab2 100644 --- a/collectors/metrics/pkg/forwarder/forwarder_test.go +++ b/collectors/metrics/pkg/forwarder/forwarder_test.go @@ -5,13 +5,8 @@ package forwarder import ( - "context" - stdlog "log" - "net/http" - "net/http/httptest" "net/url" "os" - "sync" "testing" "github.com/go-kit/log" @@ -21,10 +16,6 @@ import ( // Base64 encoded CA cert string var customCA = "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURXVENDQWtHZ0F3SUJBZ0lVWTRHWjZPWk5uTnZySjFjNUk1RjNYZzQrRTFjd0RRWUpLb1pJaHZjTkFRRUwKQlFBd1BERUxNQWtHQTFVRUJoTUNSRVV4RHpBTkJnTlZCQWdNQm1KbGNteHBiakVQTUEwR0ExVUVCd3dHWW1WeQpiR2x1TVFzd0NRWURWUVFLREFKeWFEQWVGdzB5TXpFeU1URXhNelF6TURaYUZ3MHpNekV5TURneE16UXpNRFphCk1Ed3hDekFKQmdOVkJBWVRBa1JGTVE4d0RRWURWUVFJREFaaVpYSnNhVzR4RHpBTkJnTlZCQWNNQm1KbGNteHAKYmpFTE1Ba0dBMVVFQ2d3Q2NtZ3dnZ0VpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCRHdBd2dnRUtBb0lCQVFDdwprNEhLV3VBOFptN0JQR2IvZEJjaGtNUFZhWGw0dzJlVHhxRG14OVhYaGVCRFZva0lKZkFGTGZ6a3YwYUd0NWV4ClprenQxc0tQVHk0NEY5ckRKSEg2dWpEODA4U1FPV0p3WFJCakI4Tk1zSjhTTVRCUm5KUE5YNTJ0akdQNjc3UEUKNWpINnc2OW9hMG9tcGVvRDk2eUM2RTZmWU9pbFl0cVF5UFdsT0MzNEQ3TnNXU1gxdnN4cmx3VTBsQXJCbWdQYQpuZURFMnQ1cU1aK1F5TXBhQi80SFh4L2NLYU5XYXJWN3FzV3ZwSE9mOGN2OUNKd1c3VkhWdjJvNUVReVI1MkcrCitOYXE4bTduSVBzaFJSMjBHMjRsR01sVUFaTjFaMkl6VjN3UExUUmZNTXRYdGtIMFVKT3pnZTQvaExSWVJBSzMKTnhZU0xJYmFscWJsa2lUTWxFbEpBZ01CQUFHalV6QlJNQjBHQTFVZERnUVdCQlNJVFZVY2s2Wmg2WTZkY2RxZwo0VHVYRjMxcjFqQWZCZ05WSFNNRUdEQVdnQlNJVFZVY2s2Wmg2WTZkY2RxZzRUdVhGMzFyMWpBUEJnTlZIUk1CCkFmOEVCVEFEQVFIL01BMEdDU3FHU0liM0RRRUJDd1VBQTRJQkFRQ0FKUWFKM2RkYVkvNVMydHU0TnNVeXNiVG8KY3BrL3YyZkxpUkthdmtiZk1kTjBFdkV6K2gwd3FqOUpQdGJjUm5Md2tlQWdmQ3Uzb29zSG4rOXc4SkFaRjJNcwpEM1FucVovaVNNVjVHSDdQTjlIK0h0M1lVQTIwWWh3QkY0RFVXYm5wS0lnL2p4NWdmVTFYZEljK2JpUWJhdHk3CmxUL0hVOVhPRmlqM3VwbWRFakgrQVlJT2QxSFh4M3dsZlFhNHFrdWhHeUMwWXNkeldidWFxaE1tdnJkQksrSDAKUUxPcnAzN3l2OHVwUFVlMXhwTzZTeUg5QjVEeXhEWkVjMXN6WVpSVXdNVzZxc3NkWEZvWGZ0SjYxZmo3S05XagoyamcwZkQ1ZEhFT1RObDFDT3p3Q1lvR1k5ejVWOHNhYy9sSDg3UkxYWXdBcXdvcEdpanM4QXBCeklURm8KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=" -func init() { - os.Setenv("UNIT_TEST", "true") -} - func TestNew(t *testing.T) { from, err := url.Parse("https://redhat.com") if err != nil { @@ -41,13 +32,17 @@ func TestNew(t *testing.T) { }{ { // Empty configuration should error. - c: Config{Logger: log.NewNopLogger()}, + c: Config{ + Logger: log.NewNopLogger(), + }, err: true, }, { // Only providing a `From` should not error. c: Config{ - From: from, + FromClientConfig: FromClientConfig{ + URL: from, + }, Logger: log.NewNopLogger(), }, err: false, @@ -55,25 +50,33 @@ func TestNew(t *testing.T) { { // Providing `From` and `ToUpload` should not error. c: Config{ - From: from, - ToUpload: toUpload, - Logger: log.NewNopLogger(), + FromClientConfig: FromClientConfig{ + URL: from, + }, + ToClientConfig: ToClientConfig{ + URL: toUpload, + }, + Logger: log.NewNopLogger(), }, err: false, }, { // Providing an invalid `FromTokenFile` file should error. c: Config{ - From: from, - FromTokenFile: "/this/path/does/not/exist", - Logger: log.NewNopLogger(), + FromClientConfig: FromClientConfig{ + URL: from, + TokenFile: "/this/path/does/not/exist", + }, + Logger: log.NewNopLogger(), }, err: true, }, { // Providing only `AnonymizeSalt` should not error. c: Config{ - From: from, + FromClientConfig: FromClientConfig{ + URL: from, + }, AnonymizeSalt: "1", Logger: log.NewNopLogger(), }, @@ -82,7 +85,9 @@ func TestNew(t *testing.T) { { // Providing only `AnonymizeLabels` should error. c: Config{ - From: from, + FromClientConfig: FromClientConfig{ + URL: from, + }, AnonymizeLabels: []string{"foo"}, Logger: log.NewNopLogger(), }, @@ -91,7 +96,9 @@ func TestNew(t *testing.T) { { // Providing only `AnonymizeSalt` and `AnonymizeLabels should not error. c: Config{ - From: from, + FromClientConfig: FromClientConfig{ + URL: from, + }, AnonymizeLabels: []string{"foo"}, AnonymizeSalt: "1", Logger: log.NewNopLogger(), @@ -101,7 +108,9 @@ func TestNew(t *testing.T) { { // Providing an invalid `AnonymizeSaltFile` should error. c: Config{ - From: from, + FromClientConfig: FromClientConfig{ + URL: from, + }, AnonymizeLabels: []string{"foo"}, AnonymizeSaltFile: "/this/path/does/not/exist", Logger: log.NewNopLogger(), @@ -111,30 +120,45 @@ func TestNew(t *testing.T) { { // Providing `AnonymizeSalt` takes preference over an invalid `AnonymizeSaltFile` and should not error. c: Config{ - From: from, + FromClientConfig: FromClientConfig{ + URL: from, + }, AnonymizeLabels: []string{"foo"}, AnonymizeSalt: "1", AnonymizeSaltFile: "/this/path/does/not/exist", Logger: log.NewNopLogger(), + ToClientConfig: ToClientConfig{ + CAFile: "../../testdata/tls/ca.crt", + CertFile: "../../testdata/tls/tls.crt", + KeyFile: "../../testdata/tls/tls.key", + }, }, err: false, }, { // Providing an invalid `FromCAFile` should error. c: Config{ - From: from, - FromCAFile: "/this/path/does/not/exist", - Logger: log.NewNopLogger(), + FromClientConfig: FromClientConfig{ + URL: from, + CAFile: "/this/path/does/not/exist", + }, + Logger: log.NewNopLogger(), }, err: true, }, { // Providing CustomCA should not error. c: Config{ - From: from, - ToUpload: toUpload, - ToUploadCA: customCA, - Logger: log.NewNopLogger(), + FromClientConfig: FromClientConfig{ + URL: from, + }, + ToClientConfig: ToClientConfig{ + URL: toUpload, + CAFile: "../../testdata/tls/ca.crt", + CertFile: "../../testdata/tls/tls.crt", + KeyFile: "../../testdata/tls/tls.key", + }, + Logger: log.NewNopLogger(), }, err: false, }, @@ -142,7 +166,7 @@ func TestNew(t *testing.T) { for i := range tc { tc[i].c.Metrics = NewWorkerMetrics(prometheus.NewRegistry()) - if tc[i].c.ToUploadCA == customCA { + if i == 10 { os.Setenv("HTTPS_PROXY_CA_BUNDLE", customCA) } if _, err := New(tc[i].c); (err != nil) != tc[i].err { @@ -161,7 +185,9 @@ func TestReconfigure(t *testing.T) { t.Fatalf("failed to parse `from` URL: %v", err) } c := Config{ - From: from, + FromClientConfig: FromClientConfig{ + URL: from, + }, Logger: log.NewNopLogger(), Metrics: NewWorkerMetrics(prometheus.NewRegistry()), } @@ -181,13 +207,17 @@ func TestReconfigure(t *testing.T) { }{ { // Empty configuration should error. - c: Config{Logger: log.NewNopLogger()}, + c: Config{ + Logger: log.NewNopLogger(), + }, err: true, }, { // Configuration with new `From` should not error. c: Config{ - From: from2, + FromClientConfig: FromClientConfig{ + URL: from2, + }, Logger: log.NewNopLogger(), }, err: false, @@ -195,9 +225,11 @@ func TestReconfigure(t *testing.T) { { // Configuration with new invalid field should error. c: Config{ - From: from, - FromTokenFile: "/this/path/does/not/exist", - Logger: log.NewNopLogger(), + FromClientConfig: FromClientConfig{ + URL: from, + TokenFile: "/this/path/does/not/exist", + }, + Logger: log.NewNopLogger(), }, err: true, }, @@ -214,75 +246,3 @@ func TestReconfigure(t *testing.T) { } } } - -// TestRun tests the Run method of the Worker type. -// This test will: -// * instantiate a worker -// * configure the worker to make requests against a test server -// * in that test server, reconfigure the worker to make requests against a second test server -// * in the second test server, cancel the worker's context. -// This test will only succeed if the worker is able to be correctly reconfigured and canceled -// such that the Run method returns. -func TestRun(t *testing.T) { - c := Config{ - // Use a dummy URL. - From: &url.URL{}, - FromQuery: &url.URL{}, - Logger: log.NewNopLogger(), - Metrics: NewWorkerMetrics(prometheus.NewRegistry()), - } - w, err := New(c) - if err != nil { - t.Fatalf("failed to create new worker: %v", err) - } - - ctx, cancel := context.WithCancel(context.Background()) - var once sync.Once - var wg sync.WaitGroup - - wg.Add(1) - // This is the second test server. We need to define it early so we can use its URL in the - // handler for the first test server. - // In this handler, we decrement the wait group and cancel the worker's context. - ts2 := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { - cancel() - once.Do(wg.Done) - })) - defer ts2.Close() - - // This is the first test server. - // In this handler, we test the Reconfigure method of the worker and point it to the second - // test server. - ts1 := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { - go func() { - from, err := url.Parse(ts2.URL) - if err != nil { - stdlog.Fatalf("failed to parse second test server URL: %v", err) - } - if err := w.Reconfigure(Config{From: from, FromQuery: from, Logger: log.NewNopLogger(), Metrics: NewWorkerMetrics(prometheus.NewRegistry())}); err != nil { - stdlog.Fatalf("failed to reconfigure worker with second test server url: %v", err) - } - }() - })) - defer ts1.Close() - - from, err := url.Parse(ts1.URL) - if err != nil { - t.Fatalf("failed to parse first test server URL: %v", err) - } - if err := w.Reconfigure(Config{From: from, FromQuery: from, - RecordingRules: []string{"{\"name\":\"test\",\"query\":\"test\"}"}, - Logger: log.NewNopLogger(), Metrics: NewWorkerMetrics(prometheus.NewRegistry())}); err != nil { - t.Fatalf("failed to reconfigure worker with first test server url: %v", err) - } - - wg.Add(1) - // In this goroutine we run the worker and only decrement - // the wait group when the worker finishes running. - go func() { - w.Run(ctx) - wg.Done() - }() - - wg.Wait() -} diff --git a/collectors/metrics/pkg/metricfamily/hypershift_transformer.go b/collectors/metrics/pkg/metricfamily/hypershift_transformer.go index b3baf84a8..4486cb89d 100644 --- a/collectors/metrics/pkg/metricfamily/hypershift_transformer.go +++ b/collectors/metrics/pkg/metricfamily/hypershift_transformer.go @@ -7,17 +7,11 @@ package metricfamily import ( "context" "fmt" - "os" - - "errors" "github.com/go-kit/log" hyperv1 "github.com/openshift/hypershift/api/hypershift/v1alpha1" prom "github.com/prometheus/client_model/go" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/clientcmd" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/stolostron/multicluster-observability-operator/collectors/metrics/pkg/logger" "github.com/stolostron/multicluster-observability-operator/operators/pkg/util" @@ -39,41 +33,14 @@ type hypershiftTransformer struct { managementClusterID string } -func NewHypershiftTransformer(l log.Logger, c client.Client, labels map[string]string) (Transformer, error) { - - //clusters := map[string]string{} - hClient := c - if hClient == nil { - if _, ok := os.LookupEnv("UNIT_TEST"); !ok { - config, err := clientcmd.BuildConfigFromFlags("", "") - if err != nil { - return nil, errors.New("failed to create the kube config") - } - s := scheme.Scheme - if err := hyperv1.AddToScheme(s); err != nil { - return nil, errors.New("failed to add observabilityaddon into scheme") - } - hClient, err = client.New(config, client.Options{Scheme: s}) - if err != nil { - return nil, errors.New("failed to create the kube client") - } - } else { - s := scheme.Scheme - err := hyperv1.AddToScheme(s) - if err != nil { - return nil, err - } - hClient = fake.NewClientBuilder().Build() - } - } - - clusters, err := getHostedClusters(hClient, l) +func NewHypershiftTransformer(c client.Client, l log.Logger, labels map[string]string) (Transformer, error) { + clusters, err := getHostedClusters(c, l) if err != nil { return nil, err } return &hypershiftTransformer{ - kubeClient: hClient, + kubeClient: c, logger: l, hostedClusters: clusters, managementCluster: labels[CLUSTER_LABEL], @@ -141,9 +108,6 @@ func getClusterName(h *hypershiftTransformer, id string) (string, error) { } func CheckCRDExist(l log.Logger) (bool, error) { - if os.Getenv("UNIT_TEST") == "true" { - return true, nil - } c, err := util.GetOrCreateCRDClient() if err != nil { return false, nil diff --git a/collectors/metrics/pkg/metricfamily/hypershift_transformer_test.go b/collectors/metrics/pkg/metricfamily/hypershift_transformer_test.go index 65be56069..2a1ad4a7f 100644 --- a/collectors/metrics/pkg/metricfamily/hypershift_transformer_test.go +++ b/collectors/metrics/pkg/metricfamily/hypershift_transformer_test.go @@ -49,13 +49,12 @@ var ( } ) -func init() { - os.Setenv("UNIT_TEST", "true") +func TestTransform(t *testing.T) { s := scheme.Scheme - hyperv1.AddToScheme(s) -} + if err := hyperv1.AddToScheme(s); err != nil { + t.Fatal("couldn't add hyperv1 to scheme") + } -func TestTransform(t *testing.T) { c := fake.NewClientBuilder().WithRuntimeObjects(hCluster).Build() l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) @@ -63,10 +62,12 @@ func TestTransform(t *testing.T) { "cluster": "test-cluster", "clusterID": "test-clusterID", } - h, err := NewHypershiftTransformer(l, c, labels) + + h, err := NewHypershiftTransformer(c, l, labels) if err != nil { t.Fatal("Failed to new HyperShiftTransformer", err) } + family := &prom.MetricFamily{ Name: &metricsName, Metric: []*prom.Metric{ diff --git a/collectors/metrics/pkg/metricsclient/metricsclient.go b/collectors/metrics/pkg/metricsclient/metricsclient.go index 12a700b5a..d74f4c872 100644 --- a/collectors/metrics/pkg/metricsclient/metricsclient.go +++ b/collectors/metrics/pkg/metricsclient/metricsclient.go @@ -88,7 +88,7 @@ type MetricsResult struct { Value []interface{} `json:"value"` } -func (c *Client) RetrievRecordingMetrics( +func (c *Client) RetrieveRecordingMetrics( ctx context.Context, req *http.Request, name string) ([]*clientmodel.MetricFamily, error) { @@ -319,12 +319,6 @@ func withCancel(ctx context.Context, client *http.Client, req *http.Request, fn } func MTLSTransport(logger log.Logger, caCertFile, tlsCrtFile, tlsKeyFile string) (*http.Transport, error) { - testMode := os.Getenv("UNIT_TEST") != "" - if testMode { - caCertFile = "../../testdata/tls/ca.crt" - tlsKeyFile = "../../testdata/tls/tls.key" - tlsCrtFile = "../../testdata/tls/tls.crt" - } // Load Server CA cert var caCert []byte var err error @@ -369,12 +363,12 @@ func MTLSTransport(logger log.Logger, caCertFile, tlsCrtFile, tlsKeyFile string) } -func DefaultTransport(logger log.Logger, isTLS bool) *http.Transport { +func DefaultTransport(logger log.Logger) *http.Transport { return &http.Transport{ - Dial: (&net.Dialer{ + DialContext: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, - }).Dial, + }).DialContext, TLSHandshakeTimeout: 10 * time.Second, DisableKeepAlives: true, } diff --git a/collectors/metrics/pkg/metricsclient/metricsclient_test.go b/collectors/metrics/pkg/metricsclient/metricsclient_test.go index 65776185f..0c3e3aece 100644 --- a/collectors/metrics/pkg/metricsclient/metricsclient_test.go +++ b/collectors/metrics/pkg/metricsclient/metricsclient_test.go @@ -22,8 +22,8 @@ func TestDefaultTransport(t *testing.T) { TLSHandshakeTimeout: 10 * time.Second, DisableKeepAlives: true, } - http := DefaultTransport(logger, true) - if http.Dial == nil || reflect.TypeOf(http) != reflect.TypeOf(want) { + http := DefaultTransport(logger) + if http.DialContext == nil || reflect.TypeOf(http) != reflect.TypeOf(want) { t.Errorf("Default transport doesn't match expected format") } diff --git a/collectors/metrics/pkg/status/status.go b/collectors/metrics/pkg/status/status.go index 0009bde8b..c81817bae 100644 --- a/collectors/metrics/pkg/status/status.go +++ b/collectors/metrics/pkg/status/status.go @@ -6,18 +6,13 @@ package status import ( "context" - "errors" "log/slog" "os" "github.com/go-kit/log" "github.com/go-logr/logr" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/clientcmd" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1" "github.com/stolostron/multicluster-observability-operator/operators/pkg/status" ) @@ -26,6 +21,13 @@ const ( addonNamespace = "open-cluster-management-addon-observability" ) +type Reporter interface { + UpdateStatus(ctx context.Context, reason status.Reason, message string) error +} + +var _ Reporter = &StatusReport{} +var _ Reporter = &NoopReporter{} + type StatusReport struct { statusClient client.Client standalone bool @@ -34,41 +36,15 @@ type StatusReport struct { logger log.Logger } -func New(logger log.Logger, standalone, isUwl bool) (*StatusReport, error) { - testMode := os.Getenv("UNIT_TEST") != "" - var kubeClient client.Client - if testMode { - s := scheme.Scheme - if err := oav1beta1.AddToScheme(s); err != nil { - return nil, errors.New("failed to add observabilityaddon into scheme") - } - kubeClient = fake.NewClientBuilder(). - WithScheme(s). - WithStatusSubresource(&oav1beta1.ObservabilityAddon{}). - Build() - } else { - config, err := clientcmd.BuildConfigFromFlags("", "") - if err != nil { - return nil, errors.New("failed to create the kube config") - } - s := scheme.Scheme - if err := oav1beta1.AddToScheme(s); err != nil { - return nil, errors.New("failed to add observabilityaddon into scheme") - } - kubeClient, err = client.New(config, client.Options{Scheme: s}) - if err != nil { - return nil, errors.New("failed to create the kube client") - } - } - +func New(c client.Client, logger log.Logger, standalone, isUwl bool) (*StatusReport, error) { logger.Log("msg", "Creating status client", "standalone", standalone, "isUwl", isUwl) statusLogger := logr.FromSlogHandler(slog.New(slog.NewTextHandler(os.Stdout, nil)).With("component", "statusclient").Handler()) return &StatusReport{ - statusClient: kubeClient, + statusClient: c, standalone: standalone, isUwl: isUwl, - statusReporter: status.NewStatus(kubeClient, addonName, addonNamespace, statusLogger), + statusReporter: status.NewStatus(c, addonName, addonNamespace, statusLogger), logger: logger, }, nil } @@ -93,3 +69,9 @@ func (s *StatusReport) UpdateStatus(ctx context.Context, reason status.Reason, m return nil } + +type NoopReporter struct{} + +func (s *NoopReporter) UpdateStatus(_ context.Context, _ status.Reason, _ string) error { + return nil +} diff --git a/collectors/metrics/pkg/status/status_test.go b/collectors/metrics/pkg/status/status_test.go index d4ee78622..1f3497812 100644 --- a/collectors/metrics/pkg/status/status_test.go +++ b/collectors/metrics/pkg/status/status_test.go @@ -15,19 +15,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" oav1beta1 "github.com/stolostron/multicluster-observability-operator/operators/multiclusterobservability/api/v1beta1" "github.com/stolostron/multicluster-observability-operator/operators/pkg/status" ) -func init() { - os.Setenv("UNIT_TEST", "true") - s := scheme.Scheme - _ = oav1beta1.AddToScheme(s) -} - func TestUpdateStatus(t *testing.T) { - testCases := map[string]struct { reason status.Reason message string @@ -121,7 +115,16 @@ func TestUpdateStatus(t *testing.T) { }, } - s, err := New(log.NewLogfmtLogger(os.Stdout), false, tc.isUwl) + sc := scheme.Scheme + if err := oav1beta1.AddToScheme(sc); err != nil { + t.Fatal("failed to add observabilityaddon into scheme") + } + kubeClient := fake.NewClientBuilder(). + WithScheme(sc). + WithStatusSubresource(&oav1beta1.ObservabilityAddon{}). + Build() + + s, err := New(kubeClient, log.NewLogfmtLogger(os.Stdout), false, tc.isUwl) if err != nil { t.Fatalf("Failed to create new Status struct: (%v)", err) } diff --git a/examples/mco/e2e/v1beta1/observability-v1beta1-to-v1beta2-golden.yaml b/examples/mco/e2e/v1beta1/observability-v1beta1-to-v1beta2-golden.yaml index 706e8f4bf..1f8d7813e 100644 --- a/examples/mco/e2e/v1beta1/observability-v1beta1-to-v1beta2-golden.yaml +++ b/examples/mco/e2e/v1beta1/observability-v1beta1-to-v1beta2-golden.yaml @@ -17,6 +17,7 @@ spec: enableMetrics: true interval: 300 scrapeSizeLimitBytes: 1073741824 + workers: 1 storageConfig: alertmanagerStorageSize: 1Gi compactStorageSize: 1Gi diff --git a/examples/mco/e2e/v1beta2/custom-certs/observability.yaml b/examples/mco/e2e/v1beta2/custom-certs/observability.yaml index d1dca89a9..3267a7845 100644 --- a/examples/mco/e2e/v1beta2/custom-certs/observability.yaml +++ b/examples/mco/e2e/v1beta2/custom-certs/observability.yaml @@ -104,6 +104,7 @@ spec: observabilityAddonSpec: scrapeSizeLimitBytes: 1073741824 enableMetrics: true + workers: 1 interval: 30 resources: limits: diff --git a/examples/mco/e2e/v1beta2/observability.yaml b/examples/mco/e2e/v1beta2/observability.yaml index 53c4b35f9..37a076a5d 100644 --- a/examples/mco/e2e/v1beta2/observability.yaml +++ b/examples/mco/e2e/v1beta2/observability.yaml @@ -103,6 +103,7 @@ spec: kubernetes.io/os: linux observabilityAddonSpec: scrapeSizeLimitBytes: 1073741824 + workers: 1 enableMetrics: true interval: 30 resources: diff --git a/operators/endpointmetrics/pkg/collector/metrics_collector.go b/operators/endpointmetrics/pkg/collector/metrics_collector.go index 8c2243874..d9ec1f75d 100644 --- a/operators/endpointmetrics/pkg/collector/metrics_collector.go +++ b/operators/endpointmetrics/pkg/collector/metrics_collector.go @@ -792,6 +792,11 @@ func (m *MetricsCollector) getCommands(isUSW bool, deployParams *deploymentParam interval = fmt.Sprintf("%ds", m.ObsAddon.Spec.Interval) } + workers := 1 + if m.ObsAddon.Spec.Workers != 0 { + workers = int(m.ObsAddon.Spec.Workers) + } + evaluateInterval := "30s" if m.ObsAddon.Spec.Interval < 30 { evaluateInterval = interval @@ -819,6 +824,7 @@ func (m *MetricsCollector) getCommands(isUSW bool, deployParams *deploymentParam "/usr/bin/metrics-collector", "--listen=:8080", "--from=$(FROM)", + "--worker-number=" + strconv.Itoa(workers), "--from-query=$(FROM_QUERY)", "--to-upload=$(TO)", "--to-upload-ca=/tlscerts/ca/ca.crt", diff --git a/operators/multiclusterobservability/api/shared/multiclusterobservability_shared.go b/operators/multiclusterobservability/api/shared/multiclusterobservability_shared.go index 8d3ff6207..25447f00c 100644 --- a/operators/multiclusterobservability/api/shared/multiclusterobservability_shared.go +++ b/operators/multiclusterobservability/api/shared/multiclusterobservability_shared.go @@ -49,6 +49,15 @@ type ObservabilityAddonSpec struct { // +kubebuilder:default:=1073741824 ScrapeSizeLimitBytes int `json:"scrapeSizeLimitBytes,omitempty"` + // Workers is the number of workers in metrics-collector that work in parallel to + // push metrics to hub server. If set to > 1, metrics-collector will shard + // /federate calls to Prometheus, based on matcher rules provided by allowlist. + // Ensure that number of matchers exceeds number of workers. + // +optional + // +kubebuilder:default:=1 + // +kubebuilder:validation:Minimum=1 + Workers int32 `json:"workers,omitempty"` + // Resource requirement for metrics-collector // +optional Resources *corev1.ResourceRequirements `json:"resources,omitempty"` diff --git a/operators/multiclusterobservability/bundle/manifests/multicluster-observability-operator.clusterserviceversion.yaml b/operators/multiclusterobservability/bundle/manifests/multicluster-observability-operator.clusterserviceversion.yaml index 742e305ce..7a3c5347c 100644 --- a/operators/multiclusterobservability/bundle/manifests/multicluster-observability-operator.clusterserviceversion.yaml +++ b/operators/multiclusterobservability/bundle/manifests/multicluster-observability-operator.clusterserviceversion.yaml @@ -49,7 +49,7 @@ metadata: } ] capabilities: Basic Install - createdAt: "2024-09-09T14:04:08Z" + createdAt: "2024-09-23T12:07:01Z" operators.operatorframework.io/builder: operator-sdk-v1.34.2 operators.operatorframework.io/project_layout: go.kubebuilder.io/v3 name: multicluster-observability-operator.v0.1.0 diff --git a/operators/multiclusterobservability/bundle/manifests/observability.open-cluster-management.io_multiclusterobservabilities.yaml b/operators/multiclusterobservability/bundle/manifests/observability.open-cluster-management.io_multiclusterobservabilities.yaml index 83ed7eebe..3e0adaed7 100644 --- a/operators/multiclusterobservability/bundle/manifests/observability.open-cluster-management.io_multiclusterobservabilities.yaml +++ b/operators/multiclusterobservability/bundle/manifests/observability.open-cluster-management.io_multiclusterobservabilities.yaml @@ -169,6 +169,16 @@ spec: ScrapeSizeLimitBytes is the max size in bytes for a single metrics scrape from in-cluster Prometheus. Default is 1 GiB. type: integer + workers: + default: 1 + description: |- + Workers is the number of workers in metrics-collector that work in parallel to + push metrics to hub server. If set to > 1, metrics-collector will shard + /federate calls to Prometheus, based on matcher rules provided by allowlist. + Ensure that number of matchers exceeds number of workers. + format: int32 + minimum: 1 + type: integer type: object retentionResolution1h: default: 30d @@ -10198,6 +10208,16 @@ spec: ScrapeSizeLimitBytes is the max size in bytes for a single metrics scrape from in-cluster Prometheus. Default is 1 GiB. type: integer + workers: + default: 1 + description: |- + Workers is the number of workers in metrics-collector that work in parallel to + push metrics to hub server. If set to > 1, metrics-collector will shard + /federate calls to Prometheus, based on matcher rules provided by allowlist. + Ensure that number of matchers exceeds number of workers. + format: int32 + minimum: 1 + type: integer type: object storageConfig: description: Specifies the storage to be used by Observability diff --git a/operators/multiclusterobservability/bundle/manifests/observability.open-cluster-management.io_observabilityaddons.yaml b/operators/multiclusterobservability/bundle/manifests/observability.open-cluster-management.io_observabilityaddons.yaml index 152931a85..cd52f4a9f 100644 --- a/operators/multiclusterobservability/bundle/manifests/observability.open-cluster-management.io_observabilityaddons.yaml +++ b/operators/multiclusterobservability/bundle/manifests/observability.open-cluster-management.io_observabilityaddons.yaml @@ -115,6 +115,16 @@ spec: ScrapeSizeLimitBytes is the max size in bytes for a single metrics scrape from in-cluster Prometheus. Default is 1 GiB. type: integer + workers: + default: 1 + description: |- + Workers is the number of workers in metrics-collector that work in parallel to + push metrics to hub server. If set to > 1, metrics-collector will shard + /federate calls to Prometheus, based on matcher rules provided by allowlist. + Ensure that number of matchers exceeds number of workers. + format: int32 + minimum: 1 + type: integer type: object status: description: ObservabilityAddonStatus defines the observed state of ObservabilityAddon diff --git a/operators/multiclusterobservability/config/crd/bases/observability.open-cluster-management.io_multiclusterobservabilities.yaml b/operators/multiclusterobservability/config/crd/bases/observability.open-cluster-management.io_multiclusterobservabilities.yaml index 9798a643b..19b8365dc 100644 --- a/operators/multiclusterobservability/config/crd/bases/observability.open-cluster-management.io_multiclusterobservabilities.yaml +++ b/operators/multiclusterobservability/config/crd/bases/observability.open-cluster-management.io_multiclusterobservabilities.yaml @@ -151,6 +151,16 @@ spec: ScrapeSizeLimitBytes is the max size in bytes for a single metrics scrape from in-cluster Prometheus. Default is 1 GiB. type: integer + workers: + default: 1 + description: |- + Workers is the number of workers in metrics-collector that work in parallel to + push metrics to hub server. If set to > 1, metrics-collector will shard + /federate calls to Prometheus, based on matcher rules provided by allowlist. + Ensure that number of matchers exceeds number of workers. + format: int32 + minimum: 1 + type: integer type: object retentionResolution1h: default: 30d @@ -10182,6 +10192,16 @@ spec: ScrapeSizeLimitBytes is the max size in bytes for a single metrics scrape from in-cluster Prometheus. Default is 1 GiB. type: integer + workers: + default: 1 + description: |- + Workers is the number of workers in metrics-collector that work in parallel to + push metrics to hub server. If set to > 1, metrics-collector will shard + /federate calls to Prometheus, based on matcher rules provided by allowlist. + Ensure that number of matchers exceeds number of workers. + format: int32 + minimum: 1 + type: integer type: object storageConfig: description: Specifies the storage to be used by Observability diff --git a/operators/multiclusterobservability/config/crd/bases/observability.open-cluster-management.io_observabilityaddons.yaml b/operators/multiclusterobservability/config/crd/bases/observability.open-cluster-management.io_observabilityaddons.yaml index 5c87a6ef1..c55ce05c6 100644 --- a/operators/multiclusterobservability/config/crd/bases/observability.open-cluster-management.io_observabilityaddons.yaml +++ b/operators/multiclusterobservability/config/crd/bases/observability.open-cluster-management.io_observabilityaddons.yaml @@ -115,6 +115,16 @@ spec: ScrapeSizeLimitBytes is the max size in bytes for a single metrics scrape from in-cluster Prometheus. Default is 1 GiB. type: integer + workers: + default: 1 + description: |- + Workers is the number of workers in metrics-collector that work in parallel to + push metrics to hub server. If set to > 1, metrics-collector will shard + /federate calls to Prometheus, based on matcher rules provided by allowlist. + Ensure that number of matchers exceeds number of workers. + format: int32 + minimum: 1 + type: integer type: object status: description: ObservabilityAddonStatus defines the observed state of ObservabilityAddon