diff --git a/CHANGELOG.md b/CHANGELOG.md index f8a3380282..5196871581 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ Main (unreleased) - Add `otelcol.exporter.syslog` component to export logs in syslog format (@dehaansa) +- (_Experimental_) Add a `database_observability.mysql` component to collect mysql performance data. + ### Enhancements - Add second metrics sample to the support bundle to provide delta information (@dehaansa) @@ -39,7 +41,7 @@ Main (unreleased) - Fixed an issue in the `otelcol.processor.attribute` component where the actions `delete` and `hash` could not be used with the `pattern` argument. (@wildum) -- Fixed a race condition that could lead to a deadlock when using `import` statements, which could lead to a memory leak on `/metrics` endpoint of an Alloy instance. (@thampiotr) +- Fixed a race condition that could lead to a deadlock when using `import` statements, which could lead to a memory leak on `/metrics` endpoint of an Alloy instance. (@thampiotr) - Fix a race condition where the ui service was dependent on starting after the remotecfg service, which is not guaranteed. (@dehaansa & @erikbaranowski) @@ -97,7 +99,7 @@ v1.5.0 - Add support for relative paths to `import.file`. This new functionality allows users to use `import.file` blocks in modules imported via `import.git` and other `import.file`. (@wildum) -- `prometheus.exporter.cloudwatch`: The `discovery` block now has a `recently_active_only` configuration attribute +- `prometheus.exporter.cloudwatch`: The `discovery` block now has a `recently_active_only` configuration attribute to return only metrics which have been active in the last 3 hours. - Add Prometheus bearer authentication to a `prometheus.write.queue` component (@freak12techno) @@ -110,9 +112,9 @@ v1.5.0 - Fixed a bug in `import.git` which caused a `"non-fast-forward update"` error message. (@ptodev) -- Do not log error on clean shutdown of `loki.source.journal`. (@thampiotr) +- Do not log error on clean shutdown of `loki.source.journal`. (@thampiotr) -- `prometheus.operator.*` components: Fixed a bug which would sometimes cause a +- `prometheus.operator.*` components: Fixed a bug which would sometimes cause a "failed to create service discovery refresh metrics" error after a config reload. (@ptodev) ### Other changes @@ -151,7 +153,7 @@ v1.4.3 - `pyroscope.scrape` no longer tries to scrape endpoints which are not active targets anymore. (@wildum @mattdurham @dehaansa @ptodev) -- Fixed a bug with `loki.source.podlogs` not starting in large clusters due to short informer sync timeout. (@elburnetto-intapp) +- Fixed a bug with `loki.source.podlogs` not starting in large clusters due to short informer sync timeout. (@elburnetto-intapp) - `prometheus.exporter.windows`: Fixed bug with `exclude` regular expression config arguments which caused missing metrics. (@ptodev) @@ -170,7 +172,7 @@ v1.4.2 - Fix parsing of the Level configuration attribute in debug_metrics config block - Ensure "optional" debug_metrics config block really is optional -- Fixed an issue with `loki.process` where `stage.luhn` and `stage.timestamp` would not apply +- Fixed an issue with `loki.process` where `stage.luhn` and `stage.timestamp` would not apply default configuration settings correctly (@thampiotr) - Fixed an issue with `loki.process` where configuration could be reloaded even if there diff --git a/docs/sources/reference/components/database_observability/database_observability.mysql.md b/docs/sources/reference/components/database_observability/database_observability.mysql.md index 2bfc85953e..e51af705f8 100644 --- a/docs/sources/reference/components/database_observability/database_observability.mysql.md +++ b/docs/sources/reference/components/database_observability/database_observability.mysql.md @@ -25,9 +25,10 @@ The following arguments are supported: | Name | Type | Description | Default | Required | | -------------------- | -------------- | ------------------------------------------------------------------------------------------------------------------- | ------- | -------- | -| `data_source_name` | `secret` | [Data Source Name](https://github.com/go-sql-driver/mysql#dsn-data-source-name) for the MySQL server to connect to. | | yes | -| `forward_to` | `list(LogsReceiver)` | Where to forward log entries after processing. | | yes | -| `collect_interval` | `duration` | How frequently to collect query samples from database | `"10s"` | no | +| `data_source_name` | `secret` | [Data Source Name](https://github.com/go-sql-driver/mysql#dsn-data-source-name) for the MySQL server to connect to. | | yes | +| `forward_to` | `list(LogsReceiver)` | Where to forward log entries after processing. | | yes | +| `collect_interval` | `duration` | How frequently to collect information from database | `"10s"` | no | +| `query_samples_enabled` | `bool` | Whether to enable collection of query samples | `true` | no | ## Blocks @@ -67,7 +68,6 @@ loki.write "logs_service" { } } ``` - ## Compatible components diff --git a/internal/component/database_observability/database_observability.go b/internal/component/database_observability/database_observability.go new file mode 100644 index 0000000000..176aedc5eb --- /dev/null +++ b/internal/component/database_observability/database_observability.go @@ -0,0 +1,3 @@ +package database_observability + +const JobName = "integrations/db-o11y" diff --git a/internal/component/database_observability/mysql/collector/query_sample.go b/internal/component/database_observability/mysql/collector/query_sample.go index bb8d354385..8632f57085 100644 --- a/internal/component/database_observability/mysql/collector/query_sample.go +++ b/internal/component/database_observability/mysql/collector/query_sample.go @@ -12,6 +12,7 @@ import ( "github.com/xwb1989/sqlparser" "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/database_observability" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -108,27 +109,28 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error { var digest, query_sample_text, query_sample_seen, query_sample_timer_wait string err := rs.Scan(&digest, &query_sample_text, &query_sample_seen, &query_sample_timer_wait) if err != nil { - level.Error(c.logger).Log("msg", "failed to scan query samples", "err", err) + level.Error(c.logger).Log("msg", "failed to scan query samples", "digest", digest, "err", err) break } - redacted, err := sqlparser.RedactSQLQuery(query_sample_text) + query_sample_redacted, err := sqlparser.RedactSQLQuery(query_sample_text) if err != nil { - level.Error(c.logger).Log("msg", "failed to redact sql query", "err", err) + level.Error(c.logger).Log("msg", "failed to redact sql query", "digest", digest, "err", err) + break } c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Labels: model.LabelSet{"job": database_observability.JobName}, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), - Line: fmt.Sprintf(`level=info msg="query samples fetched" op="%s" digest="%s" query_sample_text="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_redacted="%s"`, OP_QUERY_SAMPLE, digest, query_sample_text, query_sample_seen, query_sample_timer_wait, redacted), + Line: fmt.Sprintf(`level=info msg="query samples fetched" op="%s" digest="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`, OP_QUERY_SAMPLE, digest, query_sample_seen, query_sample_timer_wait, query_sample_redacted), }, } - tables := c.tablesFromQuery(query_sample_text) + tables := c.tablesFromQuery(digest, query_sample_text) for _, table := range tables { c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Labels: model.LabelSet{"job": database_observability.JobName}, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), Line: fmt.Sprintf(`level=info msg="table name parsed" op="%s" digest="%s" table="%s"`, OP_QUERY_PARSED_TABLE_NAME, digest, table), @@ -140,15 +142,15 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error { return nil } -func (c QuerySample) tablesFromQuery(query string) []string { +func (c QuerySample) tablesFromQuery(digest, query string) []string { if strings.HasSuffix(query, "...") { - level.Info(c.logger).Log("msg", "skipping parsing truncated query") + level.Info(c.logger).Log("msg", "skipping parsing truncated query", "digest", digest) return []string{} } stmt, err := sqlparser.Parse(query) if err != nil { - level.Error(c.logger).Log("msg", "failed to parse sql query", "err", err) + level.Error(c.logger).Log("msg", "failed to parse sql query", "digest", digest, "err", err) return []string{} } diff --git a/internal/component/database_observability/mysql/collector/query_sample_test.go b/internal/component/database_observability/mysql/collector/query_sample_test.go index f8ed1d03b1..8288a821ef 100644 --- a/internal/component/database_observability/mysql/collector/query_sample_test.go +++ b/internal/component/database_observability/mysql/collector/query_sample_test.go @@ -7,6 +7,7 @@ import ( "time" loki_fake "github.com/grafana/alloy/internal/component/common/loki/client/fake" + "github.com/grafana/alloy/internal/component/database_observability" "github.com/prometheus/common/model" "go.uber.org/goleak" @@ -59,9 +60,9 @@ func TestQuerySample(t *testing.T) { lokiEntries := lokiClient.Received() for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": "integrations/db-o11y"}, entry.Labels) + require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) } - require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_sample_text="select * from some_table where id = 1" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) + require.Equal(t, `level=info msg="query samples fetched" op="query_sample" digest="abc123" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" digest="abc123" table="some_table"`, lokiEntries[1].Line) err = mock.ExpectationsWereMet() diff --git a/internal/component/database_observability/mysql/collector/schema_table.go b/internal/component/database_observability/mysql/collector/schema_table.go index d4f59e8a28..e9de0e4392 100644 --- a/internal/component/database_observability/mysql/collector/schema_table.go +++ b/internal/component/database_observability/mysql/collector/schema_table.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/database_observability" "github.com/grafana/alloy/internal/runtime/logging/level" ) @@ -146,7 +147,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { schemas = append(schemas, schema) c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Labels: model.LabelSet{"job": database_observability.JobName}, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" schema="%s"`, OP_SCHEMA_DETECTION, schema), @@ -179,7 +180,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { tables = append(tables, tableInfo{schema: schema, tableName: table, createTime: createTime, updateTime: updateTime}) c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Labels: model.LabelSet{"job": database_observability.JobName}, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), Line: fmt.Sprintf(`level=info msg="table detected" op="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, schema, table), @@ -215,7 +216,7 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { c.cache.Add(cacheKey, table) c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": "integrations/db-o11y"}, + Labels: model.LabelSet{"job": database_observability.JobName}, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), Line: fmt.Sprintf(`level=info msg="create table" op="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, table.schema, table.tableName, createStmt), diff --git a/internal/component/database_observability/mysql/collector/schema_table_test.go b/internal/component/database_observability/mysql/collector/schema_table_test.go index eb32b50585..7bf29a2977 100644 --- a/internal/component/database_observability/mysql/collector/schema_table_test.go +++ b/internal/component/database_observability/mysql/collector/schema_table_test.go @@ -9,6 +9,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/go-kit/log" loki_fake "github.com/grafana/alloy/internal/component/common/loki/client/fake" + "github.com/grafana/alloy/internal/component/database_observability" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -76,7 +77,7 @@ func TestSchemaTable(t *testing.T) { lokiEntries := lokiClient.Received() for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": "integrations/db-o11y"}, entry.Labels) + require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) } require.Equal(t, `level=info msg="schema detected" op="schema_detection" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, `level=info msg="table detected" op="table_detection" schema="some_schema" table="some_table"`, lokiEntries[1].Line) diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index 35ed0d8857..14a7bf1a5a 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/database_observability" "github.com/grafana/alloy/internal/component/database_observability/mysql/collector" "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/featuregate" @@ -46,14 +47,18 @@ var ( _ syntax.Validator = (*Arguments)(nil) ) +// TODO(cristian) consider using something like "enabled_collectors" +// to allow users to enable/disable collectors. type Arguments struct { - DataSourceName alloytypes.Secret `alloy:"data_source_name,attr"` - CollectInterval time.Duration `alloy:"collect_interval,attr,optional"` - ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"` + DataSourceName alloytypes.Secret `alloy:"data_source_name,attr"` + CollectInterval time.Duration `alloy:"collect_interval,attr,optional"` + QuerySamplesEnabled bool `alloy:"query_samples_enabled,attr,optional"` + ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"` } var DefaultArguments = Arguments{ - CollectInterval: 10 * time.Second, + CollectInterval: 10 * time.Second, + QuerySamplesEnabled: true, } func (a *Arguments) SetToDefault() { @@ -155,7 +160,7 @@ func (c *Component) getBaseTarget() (discovery.Target, error) { model.SchemeLabel: "http", model.MetricsPathLabel: path.Join(httpData.HTTPPathForComponent(c.opts.ID), "metrics"), "instance": c.instanceKey(), - "job": "integrations/db-o11y", + "job": database_observability.JobName, }, nil } @@ -194,21 +199,23 @@ func (c *Component) Update(args component.Arguments) error { entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {}) - qsCollector, err := collector.NewQuerySample(collector.QuerySampleArguments{ - DB: dbConnection, - CollectInterval: c.args.CollectInterval, - EntryHandler: entryHandler, - Logger: c.opts.Logger, - }) - if err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to create QuerySample collector", "err", err) - return err - } - if err := qsCollector.Start(context.Background()); err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to start QuerySample collector", "err", err) - return err + if c.args.QuerySamplesEnabled { + qsCollector, err := collector.NewQuerySample(collector.QuerySampleArguments{ + DB: dbConnection, + CollectInterval: c.args.CollectInterval, + EntryHandler: entryHandler, + Logger: c.opts.Logger, + }) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to create QuerySample collector", "err", err) + return err + } + if err := qsCollector.Start(context.Background()); err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to start QuerySample collector", "err", err) + return err + } + c.collectors = append(c.collectors, qsCollector) } - c.collectors = append(c.collectors, qsCollector) stCollector, err := collector.NewSchemaTable(collector.SchemaTableArguments{ DB: dbConnection,