diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index 1272b7e199..432f2b8dd6 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -43,6 +43,10 @@ The following components, grouped by namespace, _export_ Targets. - [beyla.ebpf](../components/beyla/beyla.ebpf) {{< /collapse >}} +{{< collapse title="database_observability" >}} +- [database_observability.mysql](../components/database_observability/database_observability.mysql) +{{< /collapse >}} + {{< collapse title="discovery" >}} - [discovery.azure](../components/discovery/discovery.azure) - [discovery.consul](../components/discovery/discovery.consul) @@ -236,6 +240,10 @@ The following components, grouped by namespace, _consume_ Loki `LogsReceiver`. +{{< collapse title="database_observability" >}} +- [database_observability.mysql](../components/database_observability/database_observability.mysql) +{{< /collapse >}} + {{< collapse title="faro" >}} - [faro.receiver](../components/faro/faro.receiver) {{< /collapse >}} diff --git a/docs/sources/reference/components/database_observability/_index.md b/docs/sources/reference/components/database_observability/_index.md new file mode 100644 index 0000000000..1358f972a0 --- /dev/null +++ b/docs/sources/reference/components/database_observability/_index.md @@ -0,0 +1,12 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/database_observability/ +description: Learn about the database_observability components in Grafana Alloy +title: database_observability +weight: 100 +--- + +# database_observability + +This section contains reference documentation for the `database_observability` components. + +{{< section >}} diff --git a/docs/sources/reference/components/database_observability/database_observability.mysql.md b/docs/sources/reference/components/database_observability/database_observability.mysql.md new file mode 100644 index 0000000000..2bfc85953e --- /dev/null +++ b/docs/sources/reference/components/database_observability/database_observability.mysql.md @@ -0,0 +1,88 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/database_observability.mysql/ +description: Learn about database_observability.mysql +title: database_observability.mysql +--- + +Experimental + +# database_observability.mysql + +{{< docs/shared lookup="stability/experimental.md" source="alloy" version="" >}} + +## Usage + +```alloy +database_observability.mysql "LABEL" { + data_source_name = DATA_SOURCE_NAME + forward_to = [LOKI_RECEIVERS] +} +``` + +## Arguments + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| -------------------- | -------------- | ------------------------------------------------------------------------------------------------------------------- | ------- | -------- | +| `data_source_name` | `secret` | [Data Source Name](https://github.com/go-sql-driver/mysql#dsn-data-source-name) for the MySQL server to connect to. | | yes | +| `forward_to` | `list(LogsReceiver)` | Where to forward log entries after processing. | | yes | +| `collect_interval` | `duration` | How frequently to collect query samples from database | `"10s"` | no | + +## Blocks + +The `database_observability.mysql` component does not support any blocks, and is configured fully through arguments. + +## Example + +```alloy +database_observability.mysql "orders_db" { + data_source_name = "user:pass@mysql:3306/" + forward_to = [loki.write.logs_service.receiver] +} + +prometheus.scrape "orders_db" { + targets = database_observability.mysql.orders_db.targets + honor_labels = true // required to keep job and instance labels + forward_to = [prometheus.remote_write.metrics_service.receiver] +} + +prometheus.remote_write "metrics_service" { + endpoint { + url = sys.env("GCLOUD_HOSTED_METRICS_URL") + basic_auth { + username = sys.env("GCLOUD_HOSTED_METRICS_ID") + password = sys.env("GCLOUD_RW_API_KEY") + } + } +} + +loki.write "logs_service" { + endpoint { + url = sys.env("GCLOUD_HOSTED_LOGS_URL") + basic_auth { + username = sys.env("GCLOUD_HOSTED_LOGS_ID") + password = sys.env("GCLOUD_RW_API_KEY") + } + } +} +``` + + + +## Compatible components + +`database_observability.mysql` can accept arguments from the following components: + +- Components that export [Loki `LogsReceiver`](../../../compatibility/#loki-logsreceiver-exporters) + +`database_observability.mysql` has exports that can be consumed by the following components: + +- Components that consume [Targets](../../../compatibility/#targets-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/go.mod b/go.mod index 816edb5b95..904598f24e 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 github.com/Azure/go-autorest/autorest v0.11.29 + github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/IBM/sarama v1.43.3 github.com/KimMachineGun/automemlimit v0.6.0 github.com/Lusitaniae/apache_exporter v0.11.1-0.20220518131644-f9522724dab4 @@ -768,7 +769,7 @@ require ( github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect github.com/xo/dburl v0.20.0 // indirect - github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect + github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 github.com/yl2chen/cidranger v1.0.2 // indirect github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect diff --git a/go.sum b/go.sum index 5e7169878e..fef3703fb6 100644 --- a/go.sum +++ b/go.sum @@ -1645,6 +1645,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index f1c7323684..4b4655a0c6 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -3,6 +3,7 @@ package all import ( _ "github.com/grafana/alloy/internal/component/beyla/ebpf" // Import beyla.ebpf + _ "github.com/grafana/alloy/internal/component/database_observability/mysql" // Import database_observability.mysql _ "github.com/grafana/alloy/internal/component/discovery/aws" // Import discovery.aws.ec2 and discovery.aws.lightsail _ "github.com/grafana/alloy/internal/component/discovery/azure" // Import discovery.azure _ "github.com/grafana/alloy/internal/component/discovery/consul" // Import discovery.consul diff --git a/internal/component/database_observability/mysql/collector/connection_info.go b/internal/component/database_observability/mysql/collector/connection_info.go new file mode 100644 index 0000000000..bd890788a0 --- /dev/null +++ b/internal/component/database_observability/mysql/collector/connection_info.go @@ -0,0 +1,71 @@ +package collector + +import ( + "context" + "net" + "regexp" + "strings" + + "github.com/go-sql-driver/mysql" + "github.com/prometheus/client_golang/prometheus" +) + +var rdsRegex = regexp.MustCompile(`(?P[^\.]+)\.([^\.]+)\.(?P[^\.]+)\.rds\.amazonaws\.com`) + +type ConnectionInfoArguments struct { + DSN string + Registry *prometheus.Registry +} + +type ConnectionInfo struct { + DSN string + Registry *prometheus.Registry + InfoMetric *prometheus.GaugeVec +} + +func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) { + infoMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "connection_info", + Help: "Information about the connection", + }, []string{"provider_name", "region", "db_instance_identifier"}) + + args.Registry.MustRegister(infoMetric) + + return &ConnectionInfo{ + DSN: args.DSN, + Registry: args.Registry, + InfoMetric: infoMetric, + }, nil +} + +func (c *ConnectionInfo) Start(ctx context.Context) error { + cfg, err := mysql.ParseDSN(c.DSN) + if err != nil { + return err + } + + var ( + providerName = "unknown" + providerRegion = "unknown" + dbInstanceIdentifier = "unknown" + ) + + host, _, err := net.SplitHostPort(cfg.Addr) + if err == nil && host != "" { + if strings.HasSuffix(host, "rds.amazonaws.com") { + providerName = "aws" + matches := rdsRegex.FindStringSubmatch(host) + if len(matches) > 3 { + dbInstanceIdentifier = matches[1] + providerRegion = matches[3] + } + } + } + + c.InfoMetric.WithLabelValues(providerName, providerRegion, dbInstanceIdentifier).Set(1) + return nil +} + +func (c *ConnectionInfo) Stop() { + c.Registry.Unregister(c.InfoMetric) +} diff --git a/internal/component/database_observability/mysql/collector/connection_info_test.go b/internal/component/database_observability/mysql/collector/connection_info_test.go new file mode 100644 index 0000000000..67ec6195c4 --- /dev/null +++ b/internal/component/database_observability/mysql/collector/connection_info_test.go @@ -0,0 +1,57 @@ +package collector + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +func TestConnectionInfo(t *testing.T) { + defer goleak.VerifyNone(t) + + const baseExpectedMetrics = ` + # HELP connection_info Information about the connection + # TYPE connection_info gauge + connection_info{db_instance_identifier="%s",provider_name="%s",region="%s"} 1 +` + + testCases := []struct { + name string + dsn string + expectedMetrics string + }{ + { + name: "generic dsn", + dsn: "user:pass@tcp(localhost:3306)/db", + expectedMetrics: fmt.Sprintf(baseExpectedMetrics, "unknown", "unknown", "unknown"), + }, + { + name: "AWS/RDS dsn", + dsn: "user:pass@tcp(products-db.abc123xyz.us-east-1.rds.amazonaws.com:3306)/db", + expectedMetrics: fmt.Sprintf(baseExpectedMetrics, "products-db", "aws", "us-east-1"), + }, + } + + for _, tc := range testCases { + reg := prometheus.NewRegistry() + + collector, err := NewConnectionInfo(ConnectionInfoArguments{ + DSN: tc.dsn, + Registry: reg, + }) + require.NoError(t, err) + require.NotNil(t, collector) + + err = collector.Start(context.Background()) + require.NoError(t, err) + + err = testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics)) + require.NoError(t, err) + } +} diff --git a/internal/component/database_observability/mysql/collector/query_sample.go b/internal/component/database_observability/mysql/collector/query_sample.go new file mode 100644 index 0000000000..bb8d354385 --- /dev/null +++ b/internal/component/database_observability/mysql/collector/query_sample.go @@ -0,0 +1,195 @@ +package collector + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/common/model" + "github.com/xwb1989/sqlparser" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/loki/v3/pkg/logproto" +) + +const ( + OP_QUERY_SAMPLE = "query_sample" + OP_QUERY_PARSED_TABLE_NAME = "query_parsed_table_name" +) + +const selectQuerySamples = ` + SELECT + digest, + query_sample_text, + query_sample_seen, + query_sample_timer_wait + FROM performance_schema.events_statements_summary_by_digest + WHERE last_seen > DATE_SUB(NOW(), INTERVAL 1 DAY)` + +type QuerySampleArguments struct { + DB *sql.DB + CollectInterval time.Duration + EntryHandler loki.EntryHandler + + Logger log.Logger +} + +type QuerySample struct { + dbConnection *sql.DB + collectInterval time.Duration + entryHandler loki.EntryHandler + + logger log.Logger + + ctx context.Context + cancel context.CancelFunc +} + +func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) { + return &QuerySample{ + dbConnection: args.DB, + collectInterval: args.CollectInterval, + entryHandler: args.EntryHandler, + logger: args.Logger, + }, nil +} + +func (c *QuerySample) Start(ctx context.Context) error { + level.Debug(c.logger).Log("msg", "QuerySample collector started") + + ctx, cancel := context.WithCancel(ctx) + c.ctx = ctx + c.cancel = cancel + + go func() { + ticker := time.NewTicker(c.collectInterval) + + for { + if err := c.fetchQuerySamples(c.ctx); err != nil { + level.Error(c.logger).Log("msg", "collector stopping due to error", "err", err) + c.Stop() + break + } + + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + // continue loop + } + } + }() + + return nil +} + +// Stop should be kept idempotent +func (c *QuerySample) Stop() { + c.cancel() +} + +func (c *QuerySample) fetchQuerySamples(ctx context.Context) error { + rs, err := c.dbConnection.QueryContext(ctx, selectQuerySamples) + if err != nil { + level.Error(c.logger).Log("msg", "failed to fetch query samples", "err", err) + return err + } + defer rs.Close() + + for rs.Next() { + if err := rs.Err(); err != nil { + level.Error(c.logger).Log("msg", "failed to iterate rs", "err", err) + break + } + var digest, query_sample_text, query_sample_seen, query_sample_timer_wait string + err := rs.Scan(&digest, &query_sample_text, &query_sample_seen, &query_sample_timer_wait) + if err != nil { + level.Error(c.logger).Log("msg", "failed to scan query samples", "err", err) + break + } + + redacted, err := sqlparser.RedactSQLQuery(query_sample_text) + if err != nil { + level.Error(c.logger).Log("msg", "failed to redact sql query", "err", err) + } + + c.entryHandler.Chan() <- loki.Entry{ + Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Entry: logproto.Entry{ + Timestamp: time.Unix(0, time.Now().UnixNano()), + Line: fmt.Sprintf(`level=info msg="query samples fetched" op="%s" digest="%s" query_sample_text="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_redacted="%s"`, OP_QUERY_SAMPLE, digest, query_sample_text, query_sample_seen, query_sample_timer_wait, redacted), + }, + } + + tables := c.tablesFromQuery(query_sample_text) + for _, table := range tables { + c.entryHandler.Chan() <- loki.Entry{ + Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Entry: logproto.Entry{ + Timestamp: time.Unix(0, time.Now().UnixNano()), + Line: fmt.Sprintf(`level=info msg="table name parsed" op="%s" digest="%s" table="%s"`, OP_QUERY_PARSED_TABLE_NAME, digest, table), + }, + } + } + } + + return nil +} + +func (c QuerySample) tablesFromQuery(query string) []string { + if strings.HasSuffix(query, "...") { + level.Info(c.logger).Log("msg", "skipping parsing truncated query") + return []string{} + } + + stmt, err := sqlparser.Parse(query) + if err != nil { + level.Error(c.logger).Log("msg", "failed to parse sql query", "err", err) + return []string{} + } + + var parsedTables []string + + switch stmt := stmt.(type) { + case *sqlparser.Select: + parsedTables = c.parseTableExprs(stmt.From) + case *sqlparser.Insert: + parsedTables = []string{c.parseTableName(stmt.Table)} + case *sqlparser.Update: + parsedTables = c.parseTableExprs(stmt.TableExprs) + case *sqlparser.Delete: + parsedTables = c.parseTableExprs(stmt.TableExprs) + } + + return parsedTables +} + +func (c QuerySample) parseTableExprs(tables sqlparser.TableExprs) []string { + parsedTables := []string{} + for i := 0; i < len(tables); i++ { + t := tables[i] + switch tableExpr := t.(type) { + case *sqlparser.AliasedTableExpr: + parsedTables = append(parsedTables, c.parseTableName(tableExpr.Expr.(sqlparser.TableName))) + case *sqlparser.JoinTableExpr: + // continue parsing both sides of join + tables = append(tables, tableExpr.LeftExpr, tableExpr.RightExpr) + default: + level.Error(c.logger).Log("msg", "unknown table type", "table", t) + } + } + return parsedTables +} + +func (c QuerySample) parseTableName(t sqlparser.TableName) string { + qualifier := t.Qualifier.String() + tableName := t.Name.String() + if qualifier != "" { + return qualifier + "." + tableName + } + return tableName +} diff --git a/internal/component/database_observability/mysql/collector/query_sample_test.go b/internal/component/database_observability/mysql/collector/query_sample_test.go new file mode 100644 index 0000000000..f8ed1d03b1 --- /dev/null +++ b/internal/component/database_observability/mysql/collector/query_sample_test.go @@ -0,0 +1,69 @@ +package collector + +import ( + "context" + "os" + "testing" + "time" + + loki_fake "github.com/grafana/alloy/internal/component/common/loki/client/fake" + "github.com/prometheus/common/model" + "go.uber.org/goleak" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-kit/log" + "github.com/stretchr/testify/require" +) + +func TestQuerySample(t *testing.T) { + defer goleak.VerifyNone(t) + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki_fake.NewClient(func() {}) + + collector, err := NewQuerySample(QuerySampleArguments{ + DB: db, + CollectInterval: time.Minute, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + require.NotNil(t, collector) + + mock.ExpectQuery(selectQuerySamples).WithoutArgs().WillReturnRows( + sqlmock.NewRows([]string{ + "digest", + "query_sample_text", + "query_sample_seen", + "query_sample_timer_wait", + }).AddRow( + "abc123", + "select * from some_table where id = 1", + "2024-01-01T00:00:00.000Z", + "1000", + ), + ) + + err = collector.Start(context.Background()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 2 + }, 5*time.Second, 100*time.Millisecond) + + collector.Stop() + lokiClient.Stop() + + lokiEntries := lokiClient.Received() + for _, entry := range lokiEntries { + require.Equal(t, model.LabelSet{"job": "integrations/db-o11y"}, entry.Labels) + } + require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_sample_text="select * from some_table where id = 1" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) + require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, lokiEntries[1].Line) + + err = mock.ExpectationsWereMet() + require.NoError(t, err) +} diff --git a/internal/component/database_observability/mysql/collector/schema_table.go b/internal/component/database_observability/mysql/collector/schema_table.go new file mode 100644 index 0000000000..d4f59e8a28 --- /dev/null +++ b/internal/component/database_observability/mysql/collector/schema_table.go @@ -0,0 +1,227 @@ +package collector + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/go-kit/log" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/prometheus/common/model" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +const ( + OP_SCHEMA_DETECTION = "schema_detection" + OP_TABLE_DETECTION = "table_detection" + OP_CREATE_STATEMENT = "create_statement" +) + +const ( + selectSchemaName = ` + SELECT + SCHEMA_NAME + FROM + information_schema.schemata + WHERE + SCHEMA_NAME NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys')` + + selectTableName = ` + SELECT + TABLE_NAME, + CREATE_TIME, + ifnull(UPDATE_TIME, CREATE_TIME) AS UPDATE_TIME + FROM + information_schema.tables + WHERE + TABLE_SCHEMA = ?` + + // Note that the fully qualified table name is appendend to the query, + // for some reason it doesn't work with placeholders. + showCreateTable = `SHOW CREATE TABLE` +) + +type SchemaTableArguments struct { + DB *sql.DB + CollectInterval time.Duration + EntryHandler loki.EntryHandler + CacheTTL time.Duration + + Logger log.Logger +} + +type SchemaTable struct { + dbConnection *sql.DB + collectInterval time.Duration + entryHandler loki.EntryHandler + // Cache of table definitions. Entries are removed after a configurable TTL. + // Key is a string of the form "schema.table@timestamp", where timestamp is + // the last update time of the table (this allows capturing schema changes + // at each scan, regardless of caching). + // TODO(cristian): allow configuring cache size (currently unlimited). + cache *expirable.LRU[string, tableInfo] + + logger log.Logger + + ctx context.Context + cancel context.CancelFunc +} + +type tableInfo struct { + schema string + tableName string + createTime time.Time + updateTime time.Time + createStmt string +} + +func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) { + return &SchemaTable{ + dbConnection: args.DB, + collectInterval: args.CollectInterval, + entryHandler: args.EntryHandler, + cache: expirable.NewLRU[string, tableInfo](0, nil, args.CacheTTL), + logger: args.Logger, + }, nil +} + +func (c *SchemaTable) Start(ctx context.Context) error { + level.Debug(c.logger).Log("msg", "SchemaTable collector started") + + ctx, cancel := context.WithCancel(ctx) + c.ctx = ctx + c.cancel = cancel + + go func() { + ticker := time.NewTicker(c.collectInterval) + + for { + if err := c.extractSchema(c.ctx); err != nil { + level.Error(c.logger).Log("msg", "collector stopping due to error", "err", err) + c.Stop() + break + } + + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + // continue loop + } + } + }() + + return nil +} + +// Stop should be kept idempotent +func (c *SchemaTable) Stop() { + c.cancel() +} + +func (c *SchemaTable) extractSchema(ctx context.Context) error { + rs, err := c.dbConnection.QueryContext(ctx, selectSchemaName) + if err != nil { + level.Error(c.logger).Log("msg", "failed to query schemata", "err", err) + return err + } + defer rs.Close() + + var schemas []string + for rs.Next() { + if err := rs.Err(); err != nil { + level.Error(c.logger).Log("msg", "failed to iterate rs", "err", err) + break + } + + var schema string + if err := rs.Scan(&schema); err != nil { + level.Error(c.logger).Log("msg", "failed to scan schemata", "err", err) + break + } + schemas = append(schemas, schema) + + c.entryHandler.Chan() <- loki.Entry{ + Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Entry: logproto.Entry{ + Timestamp: time.Unix(0, time.Now().UnixNano()), + Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" schema="%s"`, OP_SCHEMA_DETECTION, schema), + }, + } + } + + tables := []tableInfo{} + + for _, schema := range schemas { + rs, err := c.dbConnection.QueryContext(ctx, selectTableName, schema) + if err != nil { + level.Error(c.logger).Log("msg", "failed to query tables", "err", err) + break + } + defer rs.Close() + + for rs.Next() { + if err := rs.Err(); err != nil { + level.Error(c.logger).Log("msg", "failed to iterate rs", "err", err) + break + } + + var table string + var createTime, updateTime time.Time + if err := rs.Scan(&table, &createTime, &updateTime); err != nil { + level.Error(c.logger).Log("msg", "failed to scan tables", "err", err) + break + } + tables = append(tables, tableInfo{schema: schema, tableName: table, createTime: createTime, updateTime: updateTime}) + + c.entryHandler.Chan() <- loki.Entry{ + Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Entry: logproto.Entry{ + Timestamp: time.Unix(0, time.Now().UnixNano()), + Line: fmt.Sprintf(`level=info msg="table detected" op="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, schema, table), + }, + } + } + } + + // TODO(cristian): consider moving this into the loop above + for _, table := range tables { + fullyQualifiedTable := fmt.Sprintf("%s.%s", table.schema, table.tableName) + cacheKey := fmt.Sprintf("%s@%d", fullyQualifiedTable, table.updateTime.Unix()) + + if c.cache.Contains(cacheKey) { + level.Info(c.logger).Log("msg", "table definition already in cache", "schema", table.schema, "table", table.tableName) + continue + } + + row := c.dbConnection.QueryRowContext(ctx, showCreateTable+" "+fullyQualifiedTable) + if row.Err() != nil { + level.Error(c.logger).Log("msg", "failed to show create table", "err", row.Err()) + break + } + + var tableName string + var createStmt string + if err = row.Scan(&tableName, &createStmt); err != nil { + level.Error(c.logger).Log("msg", "failed to scan create table", "err", err) + break + } + + table.createStmt = createStmt + c.cache.Add(cacheKey, table) + + c.entryHandler.Chan() <- loki.Entry{ + Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Entry: logproto.Entry{ + Timestamp: time.Unix(0, time.Now().UnixNano()), + Line: fmt.Sprintf(`level=info msg="create table" op="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, table.schema, table.tableName, createStmt), + }, + } + } + + return nil +} diff --git a/internal/component/database_observability/mysql/collector/schema_table_test.go b/internal/component/database_observability/mysql/collector/schema_table_test.go new file mode 100644 index 0000000000..eb32b50585 --- /dev/null +++ b/internal/component/database_observability/mysql/collector/schema_table_test.go @@ -0,0 +1,87 @@ +package collector + +import ( + "context" + "os" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-kit/log" + loki_fake "github.com/grafana/alloy/internal/component/common/loki/client/fake" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +func TestSchemaTable(t *testing.T) { + // The goroutine which deletes expired entries runs indefinitely, + // see https://github.com/hashicorp/golang-lru/blob/v2.0.7/expirable/expirable_lru.go#L79-L80 + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki_fake.NewClient(func() {}) + + collector, err := NewSchemaTable(SchemaTableArguments{ + DB: db, + CollectInterval: time.Second, + EntryHandler: lokiClient, + CacheTTL: time.Minute, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + + require.NoError(t, err) + require.NotNil(t, collector) + + mock.ExpectQuery(selectSchemaName).WithoutArgs().WillReturnRows( + sqlmock.NewRows([]string{ + "schema_name", + }).AddRow( + "some_schema", + ), + ) + mock.ExpectQuery(selectTableName).WithArgs("some_schema").WillReturnRows( + sqlmock.NewRows([]string{ + "table_name", + "create_time", + "update_time", + }).AddRow( + "some_table", + time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC), + ), + ) + mock.ExpectQuery("SHOW CREATE TABLE some_schema.some_table").WithoutArgs().WillReturnRows( + sqlmock.NewRows([]string{ + "table_name", + "create_statement", + }).AddRow( + "some_schema.some_table", + "CREATE TABLE some_table (id INT)", + ), + ) + + err = collector.Start(context.Background()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 3 + }, 5*time.Second, 100*time.Millisecond) + + collector.Stop() + lokiClient.Stop() + + lokiEntries := lokiClient.Received() + for _, entry := range lokiEntries { + require.Equal(t, model.LabelSet{"job": "integrations/db-o11y"}, entry.Labels) + } + require.Equal(t, `level=info msg="schema detected" op="schema_detection" schema="some_schema"`, lokiEntries[0].Line) + require.Equal(t, `level=info msg="table detected" op="table_detection" schema="some_schema" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, `level=info msg="create table" op="create_statement" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line) + + err = mock.ExpectationsWereMet() + require.NoError(t, err) +} diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go new file mode 100644 index 0000000000..35ed0d8857 --- /dev/null +++ b/internal/component/database_observability/mysql/component.go @@ -0,0 +1,263 @@ +package mysql + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/http" + "path" + "sync" + "time" + + "github.com/go-sql-driver/mysql" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/common/model" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/database_observability/mysql/collector" + "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" + http_service "github.com/grafana/alloy/internal/service/http" + "github.com/grafana/alloy/syntax" + "github.com/grafana/alloy/syntax/alloytypes" +) + +const name = "database_observability.mysql" + +func init() { + component.Register(component.Registration{ + Name: name, + Stability: featuregate.StabilityExperimental, + Args: Arguments{}, + Exports: Exports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +var ( + _ syntax.Defaulter = (*Arguments)(nil) + _ syntax.Validator = (*Arguments)(nil) +) + +type Arguments struct { + DataSourceName alloytypes.Secret `alloy:"data_source_name,attr"` + CollectInterval time.Duration `alloy:"collect_interval,attr,optional"` + ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"` +} + +var DefaultArguments = Arguments{ + CollectInterval: 10 * time.Second, +} + +func (a *Arguments) SetToDefault() { + *a = DefaultArguments +} + +func (a *Arguments) Validate() error { + _, err := mysql.ParseDSN(string(a.DataSourceName)) + if err != nil { + return err + } + return nil +} + +type Exports struct { + Targets []discovery.Target `alloy:"targets,attr"` +} + +var ( + _ component.Component = (*Component)(nil) + _ http_service.Component = (*Component)(nil) +) + +type Collector interface { + Start(context.Context) error + Stop() +} + +type Component struct { + opts component.Options + args Arguments + mut sync.RWMutex + receivers []loki.LogsReceiver + handler loki.LogsReceiver + registry *prometheus.Registry + baseTarget discovery.Target + collectors []Collector + dbConnection *sql.DB +} + +func New(opts component.Options, args Arguments) (*Component, error) { + c := &Component{ + opts: opts, + args: args, + receivers: args.ForwardTo, + handler: loki.NewLogsReceiver(), + registry: prometheus.NewRegistry(), + } + + baseTarget, err := c.getBaseTarget() + if err != nil { + return nil, err + } + c.baseTarget = baseTarget + + if err := c.Update(args); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Component) Run(ctx context.Context) error { + defer func() { + level.Info(c.opts.Logger).Log("msg", name+" component shutting down, stopping collectors") + c.mut.RLock() + for _, collector := range c.collectors { + collector.Stop() + } + if c.dbConnection != nil { + c.dbConnection.Close() + } + c.mut.RUnlock() + }() + + for { + select { + case <-ctx.Done(): + return nil + case entry := <-c.handler.Chan(): + c.mut.RLock() + for _, receiver := range c.receivers { + receiver.Chan() <- entry + } + c.mut.RUnlock() + } + } +} + +func (c *Component) getBaseTarget() (discovery.Target, error) { + data, err := c.opts.GetServiceData(http_service.ServiceName) + if err != nil { + return nil, fmt.Errorf("failed to get HTTP information: %w", err) + } + httpData := data.(http_service.Data) + + return discovery.Target{ + model.AddressLabel: httpData.MemoryListenAddr, + model.SchemeLabel: "http", + model.MetricsPathLabel: path.Join(httpData.HTTPPathForComponent(c.opts.ID), "metrics"), + "instance": c.instanceKey(), + "job": "integrations/db-o11y", + }, nil +} + +func (c *Component) Update(args component.Arguments) error { + c.mut.Lock() + defer c.mut.Unlock() + + c.opts.OnStateChange(Exports{ + Targets: []discovery.Target{c.baseTarget}, + }) + + for _, collector := range c.collectors { + collector.Stop() + } + c.collectors = nil + + if c.dbConnection != nil { + c.dbConnection.Close() + } + + c.args = args.(Arguments) + + // TODO(cristian): verify before appending parameter + dbConnection, err := sql.Open("mysql", string(c.args.DataSourceName)+"?parseTime=true") + if err != nil { + return err + } + + if dbConnection == nil { + return errors.New("nil DB connection") + } + if err = dbConnection.Ping(); err != nil { + return err + } + c.dbConnection = dbConnection + + entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {}) + + qsCollector, err := collector.NewQuerySample(collector.QuerySampleArguments{ + DB: dbConnection, + CollectInterval: c.args.CollectInterval, + EntryHandler: entryHandler, + Logger: c.opts.Logger, + }) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to create QuerySample collector", "err", err) + return err + } + if err := qsCollector.Start(context.Background()); err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to start QuerySample collector", "err", err) + return err + } + c.collectors = append(c.collectors, qsCollector) + + stCollector, err := collector.NewSchemaTable(collector.SchemaTableArguments{ + DB: dbConnection, + CollectInterval: c.args.CollectInterval, + EntryHandler: entryHandler, + Logger: c.opts.Logger, + }) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to create SchemaTable collector", "err", err) + return err + } + if err := stCollector.Start(context.Background()); err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to start SchemaTable collector", "err", err) + return err + } + c.collectors = append(c.collectors, stCollector) + + ciCollector, err := collector.NewConnectionInfo(collector.ConnectionInfoArguments{ + DSN: string(c.args.DataSourceName), + Registry: c.registry, + }) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to create ConnectionInfo collector", "err", err) + return err + } + if err := ciCollector.Start(context.Background()); err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to start ConnectionInfo collector", "err", err) + return err + } + c.collectors = append(c.collectors, ciCollector) + + return nil +} + +func (c *Component) Handler() http.Handler { + return promhttp.HandlerFor(c.registry, promhttp.HandlerOpts{}) +} + +// instanceKey returns network(hostname:port)/dbname of the MySQL server. +// This is the same key as used by the mysqld_exporter integration. +func (c *Component) instanceKey() string { + m, _ := mysql.ParseDSN(string(c.args.DataSourceName)) + + if m.Addr == "" { + m.Addr = "localhost:3306" + } + if m.Net == "" { + m.Net = "tcp" + } + + return fmt.Sprintf("%s(%s)/%s", m.Net, m.Addr, m.DBName) +}