Skip to content

Commit

Permalink
Implement workqueue.MetricsProvider (#678)
Browse files Browse the repository at this point in the history
The kubernetes workqueue provides a facility for collecting metrics by registering a workqueue.MetricsProvider
via workqueue.SetProvider.

This change implements that interface to expose the workqueue metrics into opencensus.

This is loosely based on some work started by @grantr [here](kubernetes-sigs/controller-runtime@master...grantr:opencensus-replace#diff-bb94124aff8d568cb4e82854c7d44fd1)

Fixes: knative/pkg#522
  • Loading branch information
mattmoor authored and knative-prow-robot committed Sep 16, 2019
1 parent ed1a121 commit ecb9800
Show file tree
Hide file tree
Showing 7 changed files with 431 additions and 26 deletions.
67 changes: 49 additions & 18 deletions controller/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/metrics"
)

Expand All @@ -48,28 +49,58 @@ var (
)

func init() {
wp := &metrics.WorkqueueProvider{
Adds: stats.Int64(
"workqueue_adds_total",
"Total number of adds handled by workqueue",
stats.UnitNone,
),
Depth: stats.Int64(
"workqueue_depth",
"Current depth of workqueue",
stats.UnitNone,
),
Latency: stats.Float64(
"workqueue_queue_latency_seconds",
"How long in seconds an item stays in workqueue before being requested.",
"s",
),
Retries: stats.Int64(
"workqueue_retries_total",
"Total number of retries handled by workqueue",
"s",
),
WorkDuration: stats.Float64(
"workqueue_work_duration_seconds",
"How long in seconds processing an item from workqueue takes.",
"s",
),
}
workqueue.SetProvider(wp)

// Create views to see our measurements. This can return an error if
// a previously-registered view has the same name with a different value.
// View name defaults to the measure name if unspecified.
err := view.Register(
&view.View{
Description: "Depth of the work queue",
Measure: workQueueDepthStat,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{reconcilerTagKey},
},
&view.View{
Description: "Number of reconcile operations",
Measure: reconcileCountStat,
Aggregation: view.Count(),
TagKeys: []tag.Key{reconcilerTagKey, keyTagKey, successTagKey},
},
&view.View{
Description: "Latency of reconcile operations",
Measure: reconcileLatencyStat,
Aggregation: reconcileDistribution,
TagKeys: []tag.Key{reconcilerTagKey, keyTagKey, successTagKey},
},
append(wp.DefaultViews(),
&view.View{
Description: "Depth of the work queue",
Measure: workQueueDepthStat,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{reconcilerTagKey},
},
&view.View{
Description: "Number of reconcile operations",
Measure: reconcileCountStat,
Aggregation: view.Count(),
TagKeys: []tag.Key{reconcilerTagKey, keyTagKey, successTagKey},
},
&view.View{
Description: "Latency of reconcile operations",
Measure: reconcileLatencyStat,
Aggregation: reconcileDistribution,
TagKeys: []tag.Key{reconcilerTagKey, keyTagKey, successTagKey},
})...,
)
if err != nil {
panic(err)
Expand Down
76 changes: 76 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"context"
"sync/atomic"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"k8s.io/client-go/util/workqueue"
)

type counterMetric struct {
mutators []tag.Mutator
measure *stats.Int64Measure
}

var (
_ workqueue.CounterMetric = (*counterMetric)(nil)
)

// Inc implements CounterMetric
func (m counterMetric) Inc() {
Record(context.Background(), m.measure.M(1), stats.WithTags(m.mutators...))
}

type gaugeMetric struct {
mutators []tag.Mutator
measure *stats.Int64Measure
total int64
}

var (
_ workqueue.GaugeMetric = (*gaugeMetric)(nil)
)

// Inc implements CounterMetric
func (m *gaugeMetric) Inc() {
total := atomic.AddInt64(&m.total, 1)
Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...))
}

// Dec implements GaugeMetric
func (m *gaugeMetric) Dec() {
total := atomic.AddInt64(&m.total, -1)
Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...))
}

type floatMetric struct {
mutators []tag.Mutator
measure *stats.Float64Measure
}

var (
_ workqueue.SummaryMetric = (*floatMetric)(nil)
)

// Observe implements SummaryMetric
func (m floatMetric) Observe(v float64) {
Record(context.Background(), m.measure.M(v), stats.WithTags(m.mutators...))
}
8 changes: 4 additions & 4 deletions metrics/metricstest/metricstest.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func CheckCountData(t *testing.T, name string, wantTags map[string]string, wantV
checkRowTags(t, row, name, wantTags)

if s, ok := row.Data.(*view.CountData); !ok {
t.Errorf("For metric %s: Reporter expected a CountData type", name)
t.Errorf("%s: got %T, want CountData", name, row.Data)
} else if s.Value != wantValue {
t.Errorf("For metric %s: value = %v, want: %d", name, s.Value, wantValue)
}
Expand All @@ -74,7 +74,7 @@ func CheckDistributionData(t *testing.T, name string, wantTags map[string]string
checkRowTags(t, row, name, wantTags)

if s, ok := row.Data.(*view.DistributionData); !ok {
t.Errorf("For metric %s: Reporter expected a DistributionData type", name)
t.Errorf("%s: got %T, want DistributionData", name, row.Data)
} else {
if s.Count != expectedCount {
t.Errorf("For metric %s: reporter count = %d, want = %d", name, s.Count, expectedCount)
Expand All @@ -97,7 +97,7 @@ func CheckLastValueData(t *testing.T, name string, wantTags map[string]string, w
checkRowTags(t, row, name, wantTags)

if s, ok := row.Data.(*view.LastValueData); !ok {
t.Errorf("For metric %s: Reporter.Report() expected a LastValueData type", name)
t.Errorf("%s: got %T, want LastValueData", name, row.Data)
} else if s.Value != wantValue {
t.Errorf("For metric %s: Reporter.Report() expected %v got %v", name, s.Value, wantValue)
}
Expand All @@ -112,7 +112,7 @@ func CheckSumData(t *testing.T, name string, wantTags map[string]string, wantVal
checkRowTags(t, row, name, wantTags)

if s, ok := row.Data.(*view.SumData); !ok {
t.Errorf("For metric %s: Reporter expected a SumData type", name)
t.Errorf("%s: got %T, want SumData", name, row.Data)
} else if s.Value != wantValue {
t.Errorf("For metric %s: value = %v, want: %v", name, s.Value, wantValue)
}
Expand Down
20 changes: 16 additions & 4 deletions metrics/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,20 @@ import (
// 3) The backend is Stackdriver and it is allowed to use custom metrics.
// 4) The backend is Stackdriver and the metric is one of the built-in metrics: "knative_revision", "knative_broker",
// "knative_trigger", "knative_source".
func Record(ctx context.Context, ms stats.Measurement) {
func Record(ctx context.Context, ms stats.Measurement, ros ...stats.Options) {
mc := getCurMetricsConfig()

ros = append(ros, stats.WithMeasurements(ms))

// Condition 1)
if mc == nil {
stats.Record(ctx, ms)
stats.RecordWithOptions(ctx, ros...)
return
}

// Condition 2) and 3)
if !mc.isStackdriverBackend || mc.allowStackdriverCustomMetrics {
stats.Record(ctx, ms)
stats.RecordWithOptions(ctx, ros...)
return
}

Expand All @@ -60,7 +62,7 @@ func Record(ctx context.Context, ms stats.Measurement) {
metricskey.KnativeSourceMetrics.Has(metricType)

if isServingBuiltIn || isEventingBuiltIn {
stats.Record(ctx, ms)
stats.RecordWithOptions(ctx, ros...)
}
}

Expand All @@ -74,3 +76,13 @@ func Buckets125(low, high float64) []float64 {
}
return buckets
}

// BucketsNBy10 generates an array of N buckets starting from low and
// multiplying by 10 n times.
func BucketsNBy10(low float64, n int) []float64 {
buckets := []float64{low}
for last, i := low, len(buckets); i < n; last, i = 10*last, i+1 {
buckets = append(buckets, 10*last)
}
return buckets
}
31 changes: 31 additions & 0 deletions metrics/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package metrics

import (
"context"
"fmt"
"testing"

"knative.dev/pkg/metrics/metricstest"

"github.com/google/go-cmp/cmp"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)
Expand Down Expand Up @@ -133,3 +135,32 @@ func testRecord(t *testing.T, measure *stats.Int64Measure, shouldReportCases []c
metricstest.CheckLastValueData(t, test.measurement.Measure().Name(), map[string]string{}, 4) // The value is still the last one of shouldReportCases
}
}

func TestBucketsNBy10(t *testing.T) {
tests := []struct {
base float64
n int
want []float64
}{{
base: 0.001,
n: 5,
want: []float64{0.001, 0.01, 0.1, 1, 10},
}, {
base: 1,
n: 2,
want: []float64{1, 10},
}, {
base: 0.5,
n: 4,
want: []float64{0.5, 5, 50, 500},
}}

for _, test := range tests {
t.Run(fmt.Sprintf("base=%f,n=%d", test.base, test.n), func(t *testing.T) {
got := BucketsNBy10(test.base, test.n)
if diff := cmp.Diff(got, test.want); diff != "" {
t.Errorf("BucketsNBy10 (-want, +got) = %s", diff)
}
})
}
}
Loading

0 comments on commit ecb9800

Please sign in to comment.