Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the instance struct to handle connections #859

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions collector/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -72,8 +71,9 @@ func (ScrapeBinlogSize) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeBinlogSize) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeBinlogSize) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
var logBin uint8
db := instance.getDB()
err := db.QueryRowContext(ctx, logbinQuery).Scan(&logBin)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion collector/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func TestScrapeBinlogSize(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}

mock.ExpectQuery(logbinQuery).WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))

columns := []string{"Log_name", "File_size"}
Expand All @@ -42,7 +44,7 @@ func TestScrapeBinlogSize(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeBinlogSize{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeBinlogSize{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
4 changes: 2 additions & 2 deletions collector/engine_innodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -52,7 +51,8 @@ func (ScrapeEngineInnodbStatus) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeEngineInnodbStatus) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeEngineInnodbStatus) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx, engineInnodbStatusQuery)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions collector/engine_innodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ END OF INNODB MONITOR OUTPUT
rows := sqlmock.NewRows(columns).AddRow("InnoDB", "", sample)

mock.ExpectQuery(sanitizeQuery(engineInnodbStatusQuery)).WillReturnRows(rows)

inst := &instance{db: db}
ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeEngineInnodbStatus{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeEngineInnodbStatus{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
3 changes: 2 additions & 1 deletion collector/engine_tokudb.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (ScrapeEngineTokudbStatus) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeEngineTokudbStatus) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeEngineTokudbStatus) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
tokudbRows, err := db.QueryContext(ctx, engineTokudbStatusQuery)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion collector/engine_tokudb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestScrapeEngineTokudbStatus(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

columns := []string{"Type", "Name", "Status"}
rows := sqlmock.NewRows(columns).
Expand All @@ -59,7 +60,7 @@ func TestScrapeEngineTokudbStatus(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeEngineTokudbStatus{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeEngineTokudbStatus{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
44 changes: 8 additions & 36 deletions collector/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ package collector

import (
"context"
"database/sql"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -38,18 +35,12 @@ const (

// SQL queries and parameters.
const (
versionQuery = `SELECT @@version`

// System variable params formatting.
// See: https://github.com/go-sql-driver/mysql#system-variables
sessionSettingsParam = `log_slow_filter=%27tmp_table_on_disk,filesort_on_disk%27`
timeoutParam = `lock_wait_timeout=%d`
)

var (
versionRE = regexp.MustCompile(`^\d+\.\d+`)
)

// Tunable flags.
var (
exporterLockTimeout = kingpin.Flag(
Expand Down Expand Up @@ -92,6 +83,7 @@ type Exporter struct {
logger log.Logger
dsn string
scrapers []Scraper
instance *instance
}

// New returns a new MySQL exporter for the provided DSN.
Expand Down Expand Up @@ -135,27 +127,23 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) float64 {
var err error
scrapeTime := time.Now()
db, err := sql.Open("mysql", e.dsn)
instance, err := newInstance(e.dsn)
if err != nil {
level.Error(e.logger).Log("msg", "Error opening connection to database", "err", err)
return 0.0
}
defer db.Close()

// By design exporter should use maximum one connection per request.
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
// Set max lifetime for a connection.
db.SetConnMaxLifetime(1 * time.Minute)
defer instance.Close()
e.instance = instance

if err := db.PingContext(ctx); err != nil {
if err := instance.Ping(); err != nil {
level.Error(e.logger).Log("msg", "Error pinging mysqld", "err", err)
return 0.0
}

ch <- prometheus.MustNewConstMetric(mysqlScrapeDurationSeconds, prometheus.GaugeValue, time.Since(scrapeTime).Seconds(), "connection")

version := getMySQLVersion(db, e.logger)
version := instance.versionMajorMinor

var wg sync.WaitGroup
defer wg.Wait()
for _, scraper := range e.scrapers {
Expand All @@ -169,7 +157,7 @@ func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) floa
label := "collect." + scraper.Name()
scrapeTime := time.Now()
collectorSuccess := 1.0
if err := scraper.Scrape(ctx, db, ch, log.With(e.logger, "scraper", scraper.Name())); err != nil {
if err := scraper.Scrape(ctx, instance, ch, log.With(e.logger, "scraper", scraper.Name())); err != nil {
level.Error(e.logger).Log("msg", "Error from scraper", "scraper", scraper.Name(), "target", e.getTargetFromDsn(), "err", err)
collectorSuccess = 0.0
}
Expand All @@ -189,19 +177,3 @@ func (e *Exporter) getTargetFromDsn() string {
}
return dsnConfig.Addr
}

func getMySQLVersion(db *sql.DB, logger log.Logger) float64 {
var versionStr string
var versionNum float64
if err := db.QueryRow(versionQuery).Scan(&versionStr); err == nil {
versionNum, _ = strconv.ParseFloat(versionRE.FindString(versionStr), 64)
} else {
level.Debug(logger).Log("msg", "Error querying version", "err", err)
}
// If we can't match/parse the version, set it some big value that matches all versions.
if versionNum == 0 {
level.Debug(logger).Log("msg", "Error parsing version string", "version", versionStr)
versionNum = 999
}
return versionNum
}
20 changes: 0 additions & 20 deletions collector/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ package collector

import (
"context"
"database/sql"
"os"
"testing"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/smartystreets/goconvey/convey"
Expand Down Expand Up @@ -68,20 +65,3 @@ func TestExporter(t *testing.T) {
}
})
}

func TestGetMySQLVersion(t *testing.T) {
if testing.Short() {
t.Skip("-short is passed, skipping test")
}

logger := log.NewLogfmtLogger(os.Stderr)
logger = level.NewFilter(logger, level.AllowDebug())

convey.Convey("Version parsing", t, func() {
db, err := sql.Open("mysql", dsn)
convey.So(err, convey.ShouldBeNil)
defer db.Close()

convey.So(getMySQLVersion(db, logger), convey.ShouldBeBetweenOrEqual, 5.6, 12.0)
})
}
3 changes: 2 additions & 1 deletion collector/global_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func (ScrapeGlobalStatus) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeGlobalStatus) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeGlobalStatus) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
globalStatusRows, err := db.QueryContext(ctx, globalStatusQuery)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion collector/global_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestScrapeGlobalStatus(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

columns := []string{"Variable_name", "Value"}
rows := sqlmock.NewRows(columns).
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestScrapeGlobalStatus(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeGlobalStatus{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeGlobalStatus{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
3 changes: 2 additions & 1 deletion collector/global_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func (ScrapeGlobalVariables) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeGlobalVariables) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeGlobalVariables) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
globalVariablesRows, err := db.QueryContext(ctx, globalVariablesQuery)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion collector/global_variables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestScrapeGlobalVariables(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

columns := []string{"Variable_name", "Value"}
rows := sqlmock.NewRows(columns).
Expand All @@ -52,7 +53,7 @@ func TestScrapeGlobalVariables(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeGlobalVariables{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeGlobalVariables{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
3 changes: 2 additions & 1 deletion collector/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func nowExpr() string {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeHeartbeat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeHeartbeat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
query := fmt.Sprintf(heartbeatQuery, nowExpr(), *collectHeartbeatDatabase, *collectHeartbeatTable)
heartbeatRows, err := db.QueryContext(ctx, query)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion collector/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ func TestScrapeHeartbeat(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

rows := sqlmock.NewRows(tt.Columns).
AddRow("1487597613.001320", "1487598113.448042", 1)
mock.ExpectQuery(sanitizeQuery(tt.Query)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
4 changes: 2 additions & 2 deletions collector/info_schema_auto_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -70,7 +69,8 @@ func (ScrapeAutoIncrementColumns) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeAutoIncrementColumns) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeAutoIncrementColumns) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
autoIncrementRows, err := db.QueryContext(ctx, infoSchemaAutoIncrementQuery)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions collector/info_schema_clientstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"
"fmt"
"strings"

Expand Down Expand Up @@ -161,8 +160,9 @@ func (ScrapeClientStat) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeClientStat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeClientStat) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
var varName, varVal string
db := instance.getDB()
err := db.QueryRowContext(ctx, userstatCheckQuery).Scan(&varName, &varVal)
if err != nil {
level.Debug(logger).Log("msg", "Detailed client stats are not available.")
Expand Down
3 changes: 2 additions & 1 deletion collector/info_schema_clientstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestScrapeClientStat(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

mock.ExpectQuery(sanitizeQuery(userstatCheckQuery)).WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("userstat", "ON"))
Expand All @@ -41,7 +42,7 @@ func TestScrapeClientStat(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeClientStat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeClientStat{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
4 changes: 2 additions & 2 deletions collector/info_schema_innodb_cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -77,7 +76,8 @@ func (ScrapeInnodbCmp) Version() float64 {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeInnodbCmp) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
func (ScrapeInnodbCmp) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) error {
db := instance.getDB()
informationSchemaInnodbCmpRows, err := db.QueryContext(ctx, innodbCmpQuery)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion collector/info_schema_innodb_cmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestScrapeInnodbCmp(t *testing.T) {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

columns := []string{"page_size", "compress_ops", "compress_ops_ok", "compress_time", "uncompress_ops", "uncompress_time"}
rows := sqlmock.NewRows(columns).
Expand All @@ -38,7 +39,7 @@ func TestScrapeInnodbCmp(t *testing.T) {

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeInnodbCmp{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
if err = (ScrapeInnodbCmp{}).Scrape(context.Background(), inst, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
Expand Down
Loading