diff --git a/pkg/provider/hpa.go b/pkg/provider/hpa.go index 832894de..06e7fac6 100644 --- a/pkg/provider/hpa.go +++ b/pkg/provider/hpa.go @@ -62,6 +62,7 @@ type HPAProvider struct { recorder kube_record.EventRecorder logger *log.Entry disregardIncompatibleHPAs bool + gcInterval time.Duration } // metricCollection is a container for sending collected metrics across a @@ -72,7 +73,7 @@ type metricCollection struct { } // NewHPAProvider initializes a new HPAProvider. -func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider { +func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool, metricsTTL time.Duration, gcInterval time.Duration) *HPAProvider { metricsc := make(chan metricCollection) return &HPAProvider{ @@ -81,12 +82,13 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim collectorInterval: collectorInterval, metricSink: metricsc, metricStore: NewMetricStore(func() time.Time { - return time.Now().UTC().Add(15 * time.Minute) + return time.Now().UTC().Add(metricsTTL) }), collectorFactory: collectorFactory, recorder: recorder.CreateEventRecorder(client), logger: log.WithFields(log.Fields{"provider": "hpa"}), disregardIncompatibleHPAs: disregardIncompatibleHPAs, + gcInterval: gcInterval, } } @@ -218,7 +220,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) { go func(ctx context.Context) { for { select { - case <-time.After(10 * time.Minute): + case <-time.After(p.gcInterval): p.metricStore.RemoveExpired() case <-ctx.Done(): p.logger.Info("Stopped metrics store garbage collection.") diff --git a/pkg/provider/hpa_test.go b/pkg/provider/hpa_test.go index 3cbfe452..68d9488c 100644 --- a/pkg/provider/hpa_test.go +++ b/pkg/provider/hpa_test.go @@ -106,7 +106,7 @@ func TestUpdateHPAs(t *testing.T) { err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{}) require.NoError(t, err) - provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false) + provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false, 1*time.Second, 1*time.Second) provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink) err = provider.updateHPAs() @@ -171,7 +171,7 @@ func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) { require.NoError(t, err) eventRecorder := &mockEventRecorder{} - provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true) + provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true, 1*time.Second, 1*time.Second) provider.recorder = eventRecorder provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink) @@ -183,7 +183,7 @@ func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) { // check for events when disregardIncompatibleHPAs=false eventRecorder = &mockEventRecorder{} - provider = NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false) + provider = NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false, 1*time.Second, 1*time.Second) provider.recorder = eventRecorder provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink) diff --git a/pkg/server/start.go b/pkg/server/start.go index 6aed9d70..326a9927 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -116,6 +116,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics") flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+ "disregard failing to create collectors for incompatible HPAs") + flags.DurationVar(&o.MetricsTTL, "metrics-ttl", 15*time.Minute, "TTL for metrics that are stored in in-memory cache.") + flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.") return cmd } @@ -243,7 +245,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions)) } - hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs) + hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs, o.MetricsTTL, o.GCInterval) go hpaProvider.Run(ctx) @@ -350,4 +352,8 @@ type AdapterServerOptions struct { // Whether to disregard failing to create collectors for incompatible HPAs - such as when using // kube-metrics-adapter beside another Metrics Provider DisregardIncompatibleHPAs bool + // TTL for metrics that are stored in in-memory cache + MetricsTTL time.Duration + // Interval to clean up metrics that are stored in in-memory cache + GCInterval time.Duration }