diff --git a/controller/stats_reporter.go b/controller/stats_reporter.go index d9a7dc64987..244d65728b2 100644 --- a/controller/stats_reporter.go +++ b/controller/stats_reporter.go @@ -25,6 +25,7 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" "go.uber.org/zap" + "k8s.io/client-go/util/workqueue" "knative.dev/pkg/metrics" ) @@ -48,28 +49,58 @@ var ( ) func init() { + wp := &metrics.WorkqueueProvider{ + Adds: stats.Int64( + "workqueue_adds_total", + "Total number of adds handled by workqueue", + stats.UnitNone, + ), + Depth: stats.Int64( + "workqueue_depth", + "Current depth of workqueue", + stats.UnitNone, + ), + Latency: stats.Float64( + "workqueue_queue_latency_seconds", + "How long in seconds an item stays in workqueue before being requested.", + "s", + ), + Retries: stats.Int64( + "workqueue_retries_total", + "Total number of retries handled by workqueue", + "s", + ), + WorkDuration: stats.Float64( + "workqueue_work_duration_seconds", + "How long in seconds processing an item from workqueue takes.", + "s", + ), + } + workqueue.SetProvider(wp) + // Create views to see our measurements. This can return an error if // a previously-registered view has the same name with a different value. // View name defaults to the measure name if unspecified. err := view.Register( - &view.View{ - Description: "Depth of the work queue", - Measure: workQueueDepthStat, - Aggregation: view.LastValue(), - TagKeys: []tag.Key{reconcilerTagKey}, - }, - &view.View{ - Description: "Number of reconcile operations", - Measure: reconcileCountStat, - Aggregation: view.Count(), - TagKeys: []tag.Key{reconcilerTagKey, keyTagKey, successTagKey}, - }, - &view.View{ - Description: "Latency of reconcile operations", - Measure: reconcileLatencyStat, - Aggregation: reconcileDistribution, - TagKeys: []tag.Key{reconcilerTagKey, keyTagKey, successTagKey}, - }, + append(wp.DefaultViews(), + &view.View{ + Description: "Depth of the work queue", + Measure: workQueueDepthStat, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{reconcilerTagKey}, + }, + &view.View{ + Description: "Number of reconcile operations", + Measure: reconcileCountStat, + Aggregation: view.Count(), + TagKeys: []tag.Key{reconcilerTagKey, keyTagKey, successTagKey}, + }, + &view.View{ + Description: "Latency of reconcile operations", + Measure: reconcileLatencyStat, + Aggregation: reconcileDistribution, + TagKeys: []tag.Key{reconcilerTagKey, keyTagKey, successTagKey}, + })..., ) if err != nil { panic(err) diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000000..a923bd5e8e8 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,76 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "sync/atomic" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "k8s.io/client-go/util/workqueue" +) + +type counterMetric struct { + mutators []tag.Mutator + measure *stats.Int64Measure +} + +var ( + _ workqueue.CounterMetric = (*counterMetric)(nil) +) + +// Inc implements CounterMetric +func (m counterMetric) Inc() { + Record(context.Background(), m.measure.M(1), stats.WithTags(m.mutators...)) +} + +type gaugeMetric struct { + mutators []tag.Mutator + measure *stats.Int64Measure + total int64 +} + +var ( + _ workqueue.GaugeMetric = (*gaugeMetric)(nil) +) + +// Inc implements CounterMetric +func (m *gaugeMetric) Inc() { + total := atomic.AddInt64(&m.total, 1) + Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...)) +} + +// Dec implements GaugeMetric +func (m *gaugeMetric) Dec() { + total := atomic.AddInt64(&m.total, -1) + Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...)) +} + +type floatMetric struct { + mutators []tag.Mutator + measure *stats.Float64Measure +} + +var ( + _ workqueue.SummaryMetric = (*floatMetric)(nil) +) + +// Observe implements SummaryMetric +func (m floatMetric) Observe(v float64) { + Record(context.Background(), m.measure.M(v), stats.WithTags(m.mutators...)) +} diff --git a/metrics/metricstest/metricstest.go b/metrics/metricstest/metricstest.go index 91416da26c6..3d81726d31a 100644 --- a/metrics/metricstest/metricstest.go +++ b/metrics/metricstest/metricstest.go @@ -58,7 +58,7 @@ func CheckCountData(t *testing.T, name string, wantTags map[string]string, wantV checkRowTags(t, row, name, wantTags) if s, ok := row.Data.(*view.CountData); !ok { - t.Errorf("For metric %s: Reporter expected a CountData type", name) + t.Errorf("%s: got %T, want CountData", name, row.Data) } else if s.Value != wantValue { t.Errorf("For metric %s: value = %v, want: %d", name, s.Value, wantValue) } @@ -74,7 +74,7 @@ func CheckDistributionData(t *testing.T, name string, wantTags map[string]string checkRowTags(t, row, name, wantTags) if s, ok := row.Data.(*view.DistributionData); !ok { - t.Errorf("For metric %s: Reporter expected a DistributionData type", name) + t.Errorf("%s: got %T, want DistributionData", name, row.Data) } else { if s.Count != expectedCount { t.Errorf("For metric %s: reporter count = %d, want = %d", name, s.Count, expectedCount) @@ -97,7 +97,7 @@ func CheckLastValueData(t *testing.T, name string, wantTags map[string]string, w checkRowTags(t, row, name, wantTags) if s, ok := row.Data.(*view.LastValueData); !ok { - t.Errorf("For metric %s: Reporter.Report() expected a LastValueData type", name) + t.Errorf("%s: got %T, want LastValueData", name, row.Data) } else if s.Value != wantValue { t.Errorf("For metric %s: Reporter.Report() expected %v got %v", name, s.Value, wantValue) } @@ -112,7 +112,7 @@ func CheckSumData(t *testing.T, name string, wantTags map[string]string, wantVal checkRowTags(t, row, name, wantTags) if s, ok := row.Data.(*view.SumData); !ok { - t.Errorf("For metric %s: Reporter expected a SumData type", name) + t.Errorf("%s: got %T, want SumData", name, row.Data) } else if s.Value != wantValue { t.Errorf("For metric %s: value = %v, want: %v", name, s.Value, wantValue) } diff --git a/metrics/record.go b/metrics/record.go index c342fe7eae3..a8756837a83 100644 --- a/metrics/record.go +++ b/metrics/record.go @@ -37,18 +37,20 @@ import ( // 3) The backend is Stackdriver and it is allowed to use custom metrics. // 4) The backend is Stackdriver and the metric is one of the built-in metrics: "knative_revision", "knative_broker", // "knative_trigger", "knative_source". -func Record(ctx context.Context, ms stats.Measurement) { +func Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) { mc := getCurMetricsConfig() + ros = append(ros, stats.WithMeasurements(ms)) + // Condition 1) if mc == nil { - stats.Record(ctx, ms) + stats.RecordWithOptions(ctx, ros...) return } // Condition 2) and 3) if !mc.isStackdriverBackend || mc.allowStackdriverCustomMetrics { - stats.Record(ctx, ms) + stats.RecordWithOptions(ctx, ros...) return } @@ -60,7 +62,7 @@ func Record(ctx context.Context, ms stats.Measurement) { metricskey.KnativeSourceMetrics.Has(metricType) if isServingBuiltIn || isEventingBuiltIn { - stats.Record(ctx, ms) + stats.RecordWithOptions(ctx, ros...) } } @@ -74,3 +76,13 @@ func Buckets125(low, high float64) []float64 { } return buckets } + +// BucketsNBy10 generates an array of N buckets starting from low and +// multiplying by 10 n times. +func BucketsNBy10(low float64, n int) []float64 { + buckets := []float64{low} + for last, i := low, len(buckets); i < n; last, i = 10*last, i+1 { + buckets = append(buckets, 10*last) + } + return buckets +} diff --git a/metrics/record_test.go b/metrics/record_test.go index 5adab472d51..5d91972e658 100644 --- a/metrics/record_test.go +++ b/metrics/record_test.go @@ -18,10 +18,12 @@ package metrics import ( "context" + "fmt" "testing" "knative.dev/pkg/metrics/metricstest" + "github.com/google/go-cmp/cmp" "go.opencensus.io/stats" "go.opencensus.io/stats/view" ) @@ -133,3 +135,32 @@ func testRecord(t *testing.T, measure *stats.Int64Measure, shouldReportCases []c metricstest.CheckLastValueData(t, test.measurement.Measure().Name(), map[string]string{}, 4) // The value is still the last one of shouldReportCases } } + +func TestBucketsNBy10(t *testing.T) { + tests := []struct { + base float64 + n int + want []float64 + }{{ + base: 0.001, + n: 5, + want: []float64{0.001, 0.01, 0.1, 1, 10}, + }, { + base: 1, + n: 2, + want: []float64{1, 10}, + }, { + base: 0.5, + n: 4, + want: []float64{0.5, 5, 50, 500}, + }} + + for _, test := range tests { + t.Run(fmt.Sprintf("base=%f,n=%d", test.base, test.n), func(t *testing.T) { + got := BucketsNBy10(test.base, test.n) + if diff := cmp.Diff(got, test.want); diff != "" { + t.Errorf("BucketsNBy10 (-want, +got) = %s", diff) + } + }) + } +} diff --git a/metrics/workqueue.go b/metrics/workqueue.go new file mode 100644 index 00000000000..91e00b8cc35 --- /dev/null +++ b/metrics/workqueue.go @@ -0,0 +1,131 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "k8s.io/client-go/util/workqueue" +) + +// tagName is used to associate the provided name with each metric created +// through the WorkqueueProvider's methods to implement workqueue.MetricsProvider. +// For the kubernetes workqueue implementations this is the queue name provided +// to the workqueue constructor. +var tagName = tag.MustNewKey("name") + +// WorkqueueProvider implements workqueue.MetricsProvider and may be used with +// workqueue.SetProvider to have metrics exported to the provided metrics. +type WorkqueueProvider struct { + Adds *stats.Int64Measure + Depth *stats.Int64Measure + Latency *stats.Float64Measure + Retries *stats.Int64Measure + WorkDuration *stats.Float64Measure +} + +var ( + _ workqueue.MetricsProvider = (*WorkqueueProvider)(nil) +) + +// NewAddsMetric implements MetricsProvider +func (wp *WorkqueueProvider) NewAddsMetric(name string) workqueue.CounterMetric { + return counterMetric{ + mutators: []tag.Mutator{tag.Insert(tagName, name)}, + measure: wp.Adds, + } +} + +// AddsView returns a view of the Adds metric. +func (wp *WorkqueueProvider) AddsView() *view.View { + return measureView(wp.Adds, view.Count()) +} + +// NewDepthMetric implements MetricsProvider +func (wp *WorkqueueProvider) NewDepthMetric(name string) workqueue.GaugeMetric { + return &gaugeMetric{ + mutators: []tag.Mutator{tag.Insert(tagName, name)}, + measure: wp.Depth, + } +} + +// DepthView returns a view of the Depth metric. +func (wp *WorkqueueProvider) DepthView() *view.View { + return measureView(wp.Depth, view.LastValue()) +} + +// NewLatencyMetric implements MetricsProvider +func (wp *WorkqueueProvider) NewLatencyMetric(name string) workqueue.SummaryMetric { + return floatMetric{ + mutators: []tag.Mutator{tag.Insert(tagName, name)}, + measure: wp.Latency, + } +} + +// LatencyView returns a view of the Latency metric. +func (wp *WorkqueueProvider) LatencyView() *view.View { + return measureView(wp.Latency, view.Distribution(BucketsNBy10(1e-08, 10)...)) +} + +// NewRetriesMetric implements MetricsProvider +func (wp *WorkqueueProvider) NewRetriesMetric(name string) workqueue.CounterMetric { + return counterMetric{ + mutators: []tag.Mutator{tag.Insert(tagName, name)}, + measure: wp.Retries, + } +} + +// RetriesView returns a view of the Retries metric. +func (wp *WorkqueueProvider) RetriesView() *view.View { + return measureView(wp.Retries, view.Count()) +} + +// NewWorkDurationMetric implements MetricsProvider +func (wp *WorkqueueProvider) NewWorkDurationMetric(name string) workqueue.SummaryMetric { + return floatMetric{ + mutators: []tag.Mutator{tag.Insert(tagName, name)}, + measure: wp.WorkDuration, + } +} + +// WorkDurationView returns a view of the WorkDuration metric. +func (wp *WorkqueueProvider) WorkDurationView() *view.View { + return measureView(wp.WorkDuration, view.Distribution(BucketsNBy10(1e-08, 10)...)) +} + +// DefaultViews returns a list of views suitable for passing to view.Register +func (wp *WorkqueueProvider) DefaultViews() []*view.View { + return []*view.View{ + wp.AddsView(), + wp.DepthView(), + wp.LatencyView(), + wp.RetriesView(), + wp.WorkDurationView(), + } +} + +// measureView returns a view of the supplied metric. +func measureView(m stats.Measure, agg *view.Aggregation) *view.View { + return &view.View{ + Name: m.Name(), + Description: m.Description(), + Measure: m, + Aggregation: agg, + TagKeys: []tag.Key{tagName}, + } +} diff --git a/metrics/workqueue_test.go b/metrics/workqueue_test.go new file mode 100644 index 00000000000..f2707b9c81e --- /dev/null +++ b/metrics/workqueue_test.go @@ -0,0 +1,124 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "testing" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "k8s.io/client-go/util/workqueue" + + "knative.dev/pkg/metrics/metricstest" +) + +func newInt64(name string) *stats.Int64Measure { + return stats.Int64(name, "bar", "wtfs/s") +} + +func newFloat64(name string) *stats.Float64Measure { + return stats.Float64(name, "bar", "wtfs/s") +} + +func TestWorkqueueMetrics(t *testing.T) { + wp := &WorkqueueProvider{ + Adds: newInt64("adds"), + Depth: newInt64("depth"), + Latency: newFloat64("latency"), + Retries: newInt64("retries"), + WorkDuration: newFloat64("work_duration"), + } + workqueue.SetProvider(wp) + + // Reset the metrics configuration to avoid leaked state from other tests. + setCurMetricsConfig(nil) + + views := wp.DefaultViews() + if got, want := len(views), 5; got != want { + t.Errorf("len(DefaultViews()) = %d, want %d", got, want) + } + if err := view.Register(views...); err != nil { + t.Errorf("view.Register() = %v", err) + } + defer view.Unregister(views...) + + queueName := t.Name() + wq := workqueue.NewNamedRateLimitingQueue( + workqueue.DefaultControllerRateLimiter(), + queueName, + ) + + metricstest.CheckStatsNotReported(t, "adds", "depth", "latency", "retries", "work_duration") + + wq.Add("foo") + + metricstest.CheckStatsReported(t, "adds", "depth") + metricstest.CheckStatsNotReported(t, "latency", "retries", "work_duration") + metricstest.CheckCountData(t, "adds", map[string]string{"name": queueName}, 1) + metricstest.CheckLastValueData(t, "depth", map[string]string{"name": queueName}, 1) + + wq.Add("bar") + + metricstest.CheckStatsNotReported(t, "latency", "retries", "work_duration") + metricstest.CheckCountData(t, "adds", map[string]string{"name": queueName}, 2) + metricstest.CheckLastValueData(t, "depth", map[string]string{"name": queueName}, 2) + + if got, shutdown := wq.Get(); shutdown { + t.Errorf("Get() = %v, true; want false", got) + } else if want := "foo"; got != want { + t.Errorf("Get() = %s, false; want %s", got, want) + } else { + wq.Forget(got) + wq.Done(got) + } + + metricstest.CheckStatsReported(t, "latency", "work_duration") + metricstest.CheckStatsNotReported(t, "retries") + metricstest.CheckCountData(t, "adds", map[string]string{"name": queueName}, 2) + + if got, shutdown := wq.Get(); shutdown { + t.Errorf("Get() = %v, true; want false", got) + } else if want := "bar"; got != want { + t.Errorf("Get() = %s, false; want %s", got, want) + } else { + wq.AddRateLimited(got) + wq.Done(got) + } + + // It should show up as a retry now. + metricstest.CheckStatsReported(t, "retries") + metricstest.CheckCountData(t, "retries", map[string]string{"name": queueName}, 1) + // It is not added right away. + metricstest.CheckCountData(t, "adds", map[string]string{"name": queueName}, 2) + + // It doesn't show up as an "add" until the rate limit has elapsed. + time.Sleep(1 * time.Second) + metricstest.CheckCountData(t, "adds", map[string]string{"name": queueName}, 3) + + wq.ShutDown() + + if got, shutdown := wq.Get(); shutdown { + t.Errorf("Get() = %v, true; want false", got) + } else if want := "bar"; got != want { + t.Errorf("Get() = %s, true; want %s", got, want) + } + + if got, shutdown := wq.Get(); !shutdown { + t.Errorf("Get() = %v, false; want true", got) + } +}