Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add commandline flags to set GC interval and metrics TTL #309

Merged
merged 1 commit into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/provider/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type HPAProvider struct {
recorder kube_record.EventRecorder
logger *log.Entry
disregardIncompatibleHPAs bool
gcInterval time.Duration
}

// metricCollection is a container for sending collected metrics across a
Expand All @@ -72,7 +73,7 @@ type metricCollection struct {
}

// NewHPAProvider initializes a new HPAProvider.
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool) *HPAProvider {
func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval time.Duration, collectorFactory *collector.CollectorFactory, disregardIncompatibleHPAs bool, metricsTTL time.Duration, gcInterval time.Duration) *HPAProvider {
metricsc := make(chan metricCollection)

return &HPAProvider{
Expand All @@ -81,12 +82,13 @@ func NewHPAProvider(client kubernetes.Interface, interval, collectorInterval tim
collectorInterval: collectorInterval,
metricSink: metricsc,
metricStore: NewMetricStore(func() time.Time {
return time.Now().UTC().Add(15 * time.Minute)
return time.Now().UTC().Add(metricsTTL)
}),
collectorFactory: collectorFactory,
recorder: recorder.CreateEventRecorder(client),
logger: log.WithFields(log.Fields{"provider": "hpa"}),
disregardIncompatibleHPAs: disregardIncompatibleHPAs,
gcInterval: gcInterval,
}
}

Expand Down Expand Up @@ -218,7 +220,7 @@ func (p *HPAProvider) collectMetrics(ctx context.Context) {
go func(ctx context.Context) {
for {
select {
case <-time.After(10 * time.Minute):
case <-time.After(p.gcInterval):
p.metricStore.RemoveExpired()
case <-ctx.Done():
p.logger.Info("Stopped metrics store garbage collection.")
Expand Down
6 changes: 3 additions & 3 deletions pkg/provider/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestUpdateHPAs(t *testing.T) {
err = collectorFactory.RegisterPodsCollector("", mockCollectorPlugin{})
require.NoError(t, err)

provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false, 1*time.Second, 1*time.Second)
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)

err = provider.updateHPAs()
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {
require.NoError(t, err)

eventRecorder := &mockEventRecorder{}
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true)
provider := NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, true, 1*time.Second, 1*time.Second)
provider.recorder = eventRecorder
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)

Expand All @@ -183,7 +183,7 @@ func TestUpdateHPAsDisregardingIncompatibleHPA(t *testing.T) {

// check for events when disregardIncompatibleHPAs=false
eventRecorder = &mockEventRecorder{}
provider = NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false)
provider = NewHPAProvider(fakeClient, 1*time.Second, 1*time.Second, collectorFactory, false, 1*time.Second, 1*time.Second)
provider.recorder = eventRecorder
provider.collectorScheduler = NewCollectorScheduler(context.Background(), provider.metricSink)

Expand Down
8 changes: 7 additions & 1 deletion pkg/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
flags.StringVar(&o.MetricsAddress, "metrics-address", o.MetricsAddress, "The address where to serve prometheus metrics")
flags.BoolVar(&o.DisregardIncompatibleHPAs, "disregard-incompatible-hpas", o.DisregardIncompatibleHPAs, ""+
"disregard failing to create collectors for incompatible HPAs")
flags.DurationVar(&o.MetricsTTL, "metrics-ttl", 15*time.Minute, "TTL for metrics that are stored in in-memory cache.")
flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.")
return cmd
}

Expand Down Expand Up @@ -243,7 +245,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
collectorFactory.RegisterExternalCollector([]string{collector.AWSSQSQueueLengthMetric}, collector.NewAWSCollectorPlugin(awsSessions))
}

hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs)
hpaProvider := provider.NewHPAProvider(client, 30*time.Second, 1*time.Minute, collectorFactory, o.DisregardIncompatibleHPAs, o.MetricsTTL, o.GCInterval)

go hpaProvider.Run(ctx)

Expand Down Expand Up @@ -350,4 +352,8 @@ type AdapterServerOptions struct {
// Whether to disregard failing to create collectors for incompatible HPAs - such as when using
// kube-metrics-adapter beside another Metrics Provider
DisregardIncompatibleHPAs bool
// TTL for metrics that are stored in in-memory cache
MetricsTTL time.Duration
// Interval to clean up metrics that are stored in in-memory cache
GCInterval time.Duration
}