Skip to content

Commit

Permalink
fix: ensure connection count is correctly reported (#824)
Browse files Browse the repository at this point in the history
A recent refactor broke the open connections metric and caused all
counts to be reported as either 1 or 0. This commit updates the test to
protect against making the same mistake and fixes the original problem
(using value semanatics and not pointer semantics).
  • Loading branch information
enocom authored Jun 5, 2024
1 parent 2a3257d commit b286049
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 20 deletions.
9 changes: 5 additions & 4 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type connectionInfoCache interface {
// monitoredCache is a wrapper around a connectionInfoCache that tracks the
// number of connections to the associated instance.
type monitoredCache struct {
openConns uint64
openConns *uint64

connectionInfoCache
}
Expand Down Expand Up @@ -339,13 +339,13 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn

latency := time.Since(startTime).Milliseconds()
go func() {
n := atomic.AddUint64(&c.openConns, 1)
n := atomic.AddUint64(c.openConns, 1)
trace.RecordOpenConnections(ctx, int64(n), d.dialerID, cn.String())
trace.RecordDialLatency(ctx, icn, d.dialerID, latency)
}()

return newInstrumentedConn(tlsConn, func() {
n := atomic.AddUint64(&c.openConns, ^uint64(0))
n := atomic.AddUint64(c.openConns, ^uint64(0))
trace.RecordOpenConnections(context.Background(), int64(n), d.dialerID, cn.String())
}), nil
}
Expand Down Expand Up @@ -508,7 +508,8 @@ func (d *Dialer) connectionInfoCache(
d.dialerID, useIAMAuthNDial,
)
}
c = monitoredCache{connectionInfoCache: cache}
var count uint64
c = monitoredCache{openConns: &count, connectionInfoCache: cache}
d.cache[cn] = c
}
}
Expand Down
58 changes: 42 additions & 16 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package cloudsqlconn

import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -24,46 +26,58 @@ import (
)

type spyMetricsExporter struct {
mu sync.Mutex
data []*view.Data
mu sync.Mutex
viewData []*view.Data
}

func (e *spyMetricsExporter) ExportView(vd *view.Data) {
e.mu.Lock()
defer e.mu.Unlock()
e.data = append(e.data, vd)
e.viewData = append(e.viewData, vd)
}

type metric struct {
name string
data view.AggregationData
}

func (e *spyMetricsExporter) Data() []metric {
func (e *spyMetricsExporter) data() []metric {
e.mu.Lock()
defer e.mu.Unlock()
var res []metric
for _, d := range e.data {
for _, d := range e.viewData {
for _, r := range d.Rows {
res = append(res, metric{name: d.View.Name, data: r.Data})
}
}
return res
}

// dump marshals a value to JSON for better test reporting
func dump[T any](t *testing.T, data T) string {
b, err := json.MarshalIndent(data, "", " ")
if err != nil {
t.Fatal(err)
}
return fmt.Sprint(string(b))
}

// wantLastValueMetric ensures the provided metrics include a metric with the
// wanted name and at least data point.
func wantLastValueMetric(t *testing.T, wantName string, ms []metric) {
func wantLastValueMetric(t *testing.T, wantName string, ms []metric, wantValue int) {
t.Helper()
gotNames := make(map[string]view.AggregationData)
for _, m := range ms {
gotNames[m.name] = m.data
_, ok := m.data.(*view.LastValueData)
if m.name == wantName && ok {
d, ok := m.data.(*view.LastValueData)
if ok && m.name == wantName && d.Value == float64(wantValue) {
return
}
}
t.Fatalf("metric name want = %v with LastValueData, all metrics = %#v", wantName, gotNames)
t.Fatalf(
"want metric LastValueData{name = %q, value = %v}, got metrics = %v",
wantName, wantValue, dump(t, gotNames),
)
}

// wantDistributionMetric ensures the provided metrics include a metric with the
Expand All @@ -78,7 +92,10 @@ func wantDistributionMetric(t *testing.T, wantName string, ms []metric) {
return
}
}
t.Fatalf("metric name want = %v with DistributionData, all metrics = %#v", wantName, gotNames)
t.Fatalf(
"metric name want = %v with DistributionData, all metrics = %v",
wantName, dump(t, gotNames),
)
}

// wantCountMetric ensures the provided metrics include a metric with the wanted
Expand All @@ -93,7 +110,10 @@ func wantCountMetric(t *testing.T, wantName string, ms []metric) {
return
}
}
t.Fatalf("metric name want = %v with CountData, all metrics = %#v", wantName, gotNames)
t.Fatalf(
"metric name want = %v with CountData, all metrics = %v",
wantName, dump(t, gotNames),
)
}

func TestDialerWithMetrics(t *testing.T) {
Expand Down Expand Up @@ -134,6 +154,12 @@ func TestDialerWithMetrics(t *testing.T) {
t.Fatalf("expected Dial to succeed, but got error: %v", err)
}
defer conn.Close()
// dial the good instance again to check the counter
conn2, err := d.Dial(context.Background(), "my-project:my-region:my-instance")
if err != nil {
t.Fatalf("expected Dial to succeed, but got error: %v", err)
}
defer conn2.Close()
// dial a bogus instance
_, err = d.Dial(context.Background(), "my-project:my-region:notaninstance")
if err == nil {
Expand All @@ -143,11 +169,11 @@ func TestDialerWithMetrics(t *testing.T) {
time.Sleep(10 * time.Millisecond) // allow exporter a chance to run

// success metrics
wantLastValueMetric(t, "cloudsqlconn/open_connections", spy.Data())
wantDistributionMetric(t, "cloudsqlconn/dial_latency", spy.Data())
wantCountMetric(t, "cloudsqlconn/refresh_success_count", spy.Data())
wantLastValueMetric(t, "cloudsqlconn/open_connections", spy.data(), 2)
wantDistributionMetric(t, "cloudsqlconn/dial_latency", spy.data())
wantCountMetric(t, "cloudsqlconn/refresh_success_count", spy.data())

// failure metrics from dialing bogus instance
wantCountMetric(t, "cloudsqlconn/dial_failure_count", spy.Data())
wantCountMetric(t, "cloudsqlconn/refresh_failure_count", spy.Data())
wantCountMetric(t, "cloudsqlconn/dial_failure_count", spy.data())
wantCountMetric(t, "cloudsqlconn/refresh_failure_count", spy.data())
}

0 comments on commit b286049

Please sign in to comment.