Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/database/sql: Close DB Stats goroutine on db.Close() #3025

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions contrib/database/sql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package sql // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql"

import (
"database/sql"
"sync"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal"
Expand All @@ -33,20 +34,29 @@ var interval = 10 * time.Second

// pollDBStats calls (*DB).Stats on the db at a predetermined interval. It pushes the DBStats off to the statsd client.
// the caller should always ensure that db & statsd are non-nil
func pollDBStats(statsd internal.StatsdClient, db *sql.DB) {
func pollDBStats(statsd internal.StatsdClient, db *sql.DB, tracerStop chan struct{}) {
log.Debug("DB stats will be gathered and sent every %v.", interval)
for range time.NewTicker(interval).C {
log.Debug("Reporting DB.Stats metrics...")
stat := db.Stats()
statsd.Gauge(MaxOpenConnections, float64(stat.MaxOpenConnections), []string{}, 1)
statsd.Gauge(OpenConnections, float64(stat.OpenConnections), []string{}, 1)
statsd.Gauge(InUse, float64(stat.InUse), []string{}, 1)
statsd.Gauge(Idle, float64(stat.Idle), []string{}, 1)
statsd.Gauge(WaitCount, float64(stat.WaitCount), []string{}, 1)
statsd.Timing(WaitDuration, stat.WaitDuration, []string{}, 1)
statsd.Gauge(MaxIdleClosed, float64(stat.MaxIdleClosed), []string{}, 1)
statsd.Gauge(MaxIdleTimeClosed, float64(stat.MaxIdleTimeClosed), []string{}, 1)
statsd.Gauge(MaxLifetimeClosed, float64(stat.MaxLifetimeClosed), []string{}, 1)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debug("Reporting DB.Stats metrics...")
stat := db.Stats()
statsd.Gauge(MaxOpenConnections, float64(stat.MaxOpenConnections), []string{}, 1)
statsd.Gauge(OpenConnections, float64(stat.OpenConnections), []string{}, 1)
statsd.Gauge(InUse, float64(stat.InUse), []string{}, 1)
statsd.Gauge(Idle, float64(stat.Idle), []string{}, 1)
statsd.Gauge(WaitCount, float64(stat.WaitCount), []string{}, 1)
statsd.Timing(WaitDuration, stat.WaitDuration, []string{}, 1)
statsd.Gauge(MaxIdleClosed, float64(stat.MaxIdleClosed), []string{}, 1)
statsd.Gauge(MaxIdleTimeClosed, float64(stat.MaxIdleTimeClosed), []string{}, 1)
statsd.Gauge(MaxLifetimeClosed, float64(stat.MaxLifetimeClosed), []string{}, 1)
case <-tracerStop:
return
case <-dbStop:
return
}
}
}

Expand All @@ -63,3 +73,17 @@ func statsTags(c *config) []string {
}
return tags
}

var (
dbStop chan struct{} = make(chan struct{})
once sync.Once
mu sync.Mutex
)

func dbClose() {
mu.Lock()
defer mu.Unlock()
once.Do(func() {
close(dbStop)
})
}
30 changes: 30 additions & 0 deletions contrib/database/sql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
package sql

import (
"sync"
"testing"

"github.com/DataDog/datadog-go/v5/statsd"
"github.com/stretchr/testify/assert"
"gopkg.in/DataDog/dd-trace-go.v1/internal/contribroutines"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
)

Expand Down Expand Up @@ -64,3 +67,30 @@ func TestStatsTags(t *testing.T) {
})
resetGlobalConfig()
}

func TestPollDBStats(t *testing.T) {
db := setupPostgres(t)
tracerStop := contribroutines.GetStopChan()
t.Run("tracerStop", func(t *testing.T) {
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
pollDBStats(&statsd.NoOpClientDirect{}, db, tracerStop)
}()
contribroutines.Stop()
wg.Wait()
})
t.Run("dbStop", func(t *testing.T) {
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
pollDBStats(&statsd.NoOpClientDirect{}, db, tracerStop)
}()
db.Close()
wg.Wait()
})
}
10 changes: 9 additions & 1 deletion contrib/database/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

sqlinternal "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql/internal"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/contribroutines"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
)
Expand Down Expand Up @@ -171,6 +172,13 @@ func (t *tracedConnector) Driver() driver.Driver {
return t.connector.Driver()
}

// Close sends a signal on any goroutines that rely on an open DB to stop.
// This method will be invoked when DB.Close() is called: https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/database/sql/sql.go;l=943-947
func (t *tracedConnector) Close() error {
dbClose()
return nil
}

// from Go stdlib implementation of sql.Open
type dsnConnector struct {
dsn string
Expand Down Expand Up @@ -211,7 +219,7 @@ func OpenDB(c driver.Connector, opts ...Option) *sql.DB {
}
db := sql.OpenDB(tc)
if cfg.dbStats && cfg.statsdClient != nil {
go pollDBStats(cfg.statsdClient, db)
go pollDBStats(cfg.statsdClient, db, contribroutines.GetStopChan())
}
return db
}
Expand Down
24 changes: 24 additions & 0 deletions contrib/database/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package sql

import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"fmt"
Expand Down Expand Up @@ -529,3 +530,26 @@ func TestNamingSchema(t *testing.T) {
t.Run("SpanName", namingschematest.NewSpanNameTest(genSpans, assertOpV0, assertOpV1))
})
}

func TestDBClose(t *testing.T) {
db := setupPostgres(t)

// assert that dbStop channel is closed on the call to db.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-dbStop
wg.Done()
}()
db.Close()
wg.Wait()
}

func setupPostgres(t *testing.T) *sql.DB {
driverName := "postgres"
Register(driverName, &pq.Driver{})
defer unregister(driverName)
db, err := Open(driverName, "postgres://postgres:[email protected]:5432/postgres?sslmode=disable")
require.NoError(t, err)
return db
}
39 changes: 23 additions & 16 deletions contrib/jackc/pgx.v5/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,30 @@ const (
var interval = 10 * time.Second

// pollPoolStats calls (*pgxpool).Stats on the pool at a predetermined interval. It pushes the pool Stats off to the statsd client.
func pollPoolStats(statsd internal.StatsdClient, pool *pgxpool.Pool) {
func pollPoolStats(statsd internal.StatsdClient, pool *pgxpool.Pool, stop chan struct{}) {
log.Debug("contrib/jackc/pgx.v5: Traced pool connection found: Pool stats will be gathered and sent every %v.", interval)
for range time.NewTicker(interval).C {
log.Debug("contrib/jackc/pgx.v5: Reporting pgxpool.Stat metrics...")
stat := pool.Stat()
statsd.Gauge(AcquireCount, float64(stat.AcquireCount()), []string{}, 1)
statsd.Timing(AcquireDuration, stat.AcquireDuration(), []string{}, 1)
statsd.Gauge(AcquiredConns, float64(stat.AcquiredConns()), []string{}, 1)
statsd.Gauge(CanceledAcquireCount, float64(stat.CanceledAcquireCount()), []string{}, 1)
statsd.Gauge(ConstructingConns, float64(stat.ConstructingConns()), []string{}, 1)
statsd.Gauge(EmptyAcquireCount, float64(stat.EmptyAcquireCount()), []string{}, 1)
statsd.Gauge(IdleConns, float64(stat.IdleConns()), []string{}, 1)
statsd.Gauge(MaxConns, float64(stat.MaxConns()), []string{}, 1)
statsd.Gauge(TotalConns, float64(stat.TotalConns()), []string{}, 1)
statsd.Gauge(NewConnsCount, float64(stat.NewConnsCount()), []string{}, 1)
statsd.Gauge(MaxLifetimeDestroyCount, float64(stat.MaxLifetimeDestroyCount()), []string{}, 1)
statsd.Gauge(MaxIdleDestroyCount, float64(stat.MaxIdleDestroyCount()), []string{}, 1)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debug("contrib/jackc/pgx.v5: Reporting pgxpool.Stat metrics...")
stat := pool.Stat()
statsd.Gauge(AcquireCount, float64(stat.AcquireCount()), []string{}, 1)
statsd.Timing(AcquireDuration, stat.AcquireDuration(), []string{}, 1)
statsd.Gauge(AcquiredConns, float64(stat.AcquiredConns()), []string{}, 1)
statsd.Gauge(CanceledAcquireCount, float64(stat.CanceledAcquireCount()), []string{}, 1)
statsd.Gauge(ConstructingConns, float64(stat.ConstructingConns()), []string{}, 1)
statsd.Gauge(EmptyAcquireCount, float64(stat.EmptyAcquireCount()), []string{}, 1)
statsd.Gauge(IdleConns, float64(stat.IdleConns()), []string{}, 1)
statsd.Gauge(MaxConns, float64(stat.MaxConns()), []string{}, 1)
statsd.Gauge(TotalConns, float64(stat.TotalConns()), []string{}, 1)
statsd.Gauge(NewConnsCount, float64(stat.NewConnsCount()), []string{}, 1)
statsd.Gauge(MaxLifetimeDestroyCount, float64(stat.MaxLifetimeDestroyCount()), []string{}, 1)
statsd.Gauge(MaxIdleDestroyCount, float64(stat.MaxIdleDestroyCount()), []string{}, 1)
case <-stop:
return
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion contrib/jackc/pgx.v5/pgxpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"

"github.com/jackc/pgx/v5/pgxpool"
"gopkg.in/DataDog/dd-trace-go.v1/internal/contribroutines"
)

func NewPool(ctx context.Context, connString string, opts ...Option) (*pgxpool.Pool, error) {
Expand All @@ -30,7 +31,7 @@ func NewPoolWithConfig(ctx context.Context, config *pgxpool.Config, opts ...Opti
return nil, err
}
if tracer.cfg.poolStats && tracer.cfg.statsdClient != nil {
go pollPoolStats(tracer.cfg.statsdClient, pool)
go pollPoolStats(tracer.cfg.statsdClient, pool, contribroutines.GetStopChan())
}
return pool, nil
}
4 changes: 2 additions & 2 deletions contrib/net/http/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ func TestTraceAndServe(t *testing.T) {
t.Setenv("DD_TRACE_HTTP_SERVER_ERROR_STATUSES", "500")

cfg := &ServeConfig{
Service: "service",
Resource: "resource",
Service: "service",
Resource: "resource",
}

handler := func(w http.ResponseWriter, r *http.Request) {
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec"
appsecConfig "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/config"
"gopkg.in/DataDog/dd-trace-go.v1/internal/contribroutines"
"gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/internal/hostname"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -203,6 +204,7 @@ func Start(opts ...StartOption) {
}

_ = t.hostname() // Prime the hostname cache
contribroutines.InitStopChan()
}

// Stop stops the started tracer. Subsequent calls are valid but become no-op.
Expand Down Expand Up @@ -727,6 +729,7 @@ func (t *tracer) Stop() {
if t.logFile != nil {
t.logFile.Close()
}
contribroutines.Stop()
}

// Inject uses the configured or default TextMap Propagator.
Expand Down
22 changes: 22 additions & 0 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
maininternal "gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/contribroutines"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/statsdtest"
Expand Down Expand Up @@ -255,6 +256,27 @@ func TestTracerStart(t *testing.T) {
tr.Stop()
tr.Stop()
})
t.Run("contribroutines", func(t *testing.T) {
// assert tracer.Start initializes contribroutines.stop
Start()
s1 := contribroutines.GetStopChan()
assert.NotNil(t, s1)

// assert tracer.Stop closes the channel
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-s1
}()
Stop()
wg.Wait()

// assert tracer.Start initializes contribroutines.stop to a new channel every time
Start()
s2 := contribroutines.GetStopChan()
assert.NotEqual(t, s1, s2)
})
}

func TestTracerLogFile(t *testing.T) {
Expand Down
34 changes: 34 additions & 0 deletions internal/contribroutines/contribroutines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
package contribroutines

import "sync"

var (
stop chan struct{}
once sync.Once
mu sync.Mutex
)

func InitStopChan() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func InitStopChan() {
func Init() {

Just in case this package evolves and needs to initialize more things beyond the stop chan.

stop = make(chan struct{})
}

func Stop() {
mu.Lock()
defer mu.Unlock()
if stop == nil {
InitStopChan()
}
once.Do(func() {
close(stop)
})
}

func GetStopChan() chan struct{} {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func GetStopChan() chan struct{} {
func StopChan() chan struct{} {

This would be a better name.

mu.Lock()
defer mu.Unlock()
return stop
}
Loading
Loading