Skip to content

Commit

Permalink
database_observability: extract table schema (#2607)
Browse files Browse the repository at this point in the history
database_observability: extract table schema

This is the first iteration of the implementation for extracting the table schema details.

In this PR:
- parse columns information_schema.columns (ignore indexes and other constraints)
- introduce a json structure to represent the table schema
- base64-encode both the schema and the create statement itself

Still a bunch of things to followup on but didn't want to make this PR too big.
  • Loading branch information
cristiangreco authored Feb 5, 2025
1 parent b9118c3 commit 60f43c5
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 24 deletions.
27 changes: 11 additions & 16 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,19 @@ Main (unreleased)

- Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb)

- (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Improve parsing of truncated queries in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Capture schema name for query samples in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Fix handling of view table types when detecting schema in `database_observability.mysql` (@matthewnolf)

- (_Experimental_) Fix error handling during result set iteration in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Better support for table name parsing in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Better error handling for `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Add namespace to `connection_info` metric in `database_observability.mysql` (@cristiangreco)

- Add json format support for log export via faro receiver (@ravishankar15)

- (_Experimental_) Various changes to the experimental component `database_observability.mysql`:
- Always log `instance` label key (@cristiangreco)
- Improve parsing of truncated queries (@cristiangreco)
- Capture schema name for query samples (@cristiangreco)
- Fix handling of view table types when detecting schema (@matthewnolf)
- Fix error handling during result set iteration (@cristiangreco)
- Better support for table name parsing (@cristiangreco)
- Better error handling for components (@cristiangreco)
- Add namespace to `connection_info` metric (@cristiangreco)
- Added table columns parsing (@cristiagreco)

### Bugfixes

- Fix log rotation for Windows in `loki.source.file` by refactoring the component to use the runner pkg. This should also reduce CPU consumption when tailing a lot of files in a dynamic environment. (@wildum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package collector
import (
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -46,6 +49,20 @@ const (
// Note that the fully qualified table name is appendend to the query,
// for some reason it doesn't work with placeholders.
showCreateTable = `SHOW CREATE TABLE`

selectColumnNames = `
SELECT
COLUMN_NAME,
COLUMN_DEFAULT,
IS_NULLABLE,
COLUMN_TYPE,
COLUMN_KEY,
EXTRA
FROM
information_schema.columns
WHERE
TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY ORDINAL_POSITION ASC`
)

type SchemaTableArguments struct {
Expand Down Expand Up @@ -85,6 +102,18 @@ type tableInfo struct {
createStmt string
}

type tableSpec struct {
Columns []columnSpec `json:"columns"`
}
type columnSpec struct {
Name string `json:"name"`
Type string `json:"type"`
NotNull bool `json:"not_null,omitempty"`
AutoIncrement bool `json:"auto_increment,omitempty"`
PrimaryKey bool `json:"primary_key,omitempty"`
DefaultValue string `json:"default_value,omitempty"`
}

func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) {
return &SchemaTable{
dbConnection: args.DB,
Expand Down Expand Up @@ -219,19 +248,26 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
}
}

if len(tables) == 0 {
level.Info(c.logger).Log("msg", "no tables detected from information_schema.tables")
return nil
}

// TODO(cristian): consider moving this into the loop above
for _, table := range tables {
logKVs := []any{"schema", table.schema, "table", table.tableName}

fullyQualifiedTable := fmt.Sprintf("%s.%s", table.schema, table.tableName)
cacheKey := fmt.Sprintf("%s@%d", fullyQualifiedTable, table.updateTime.Unix())

if c.cache.Contains(cacheKey) {
level.Debug(c.logger).Log("msg", "table definition already in cache", "schema", table.schema, "table", table.tableName)
level.Debug(c.logger).Log("msg", "table definition already in cache", logKVs)
continue
}

row := c.dbConnection.QueryRowContext(ctx, showCreateTable+" "+fullyQualifiedTable)
if row.Err() != nil {
level.Error(c.logger).Log("msg", "failed to show create table", "table", table.tableName, "err", row.Err())
level.Error(c.logger).Log("msg", "failed to show create table", append(logKVs, "err", row.Err()))
break
}

Expand All @@ -241,27 +277,103 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
var collationConnection string
if table.tableType == "BASE TABLE" {
if err = row.Scan(&tableName, &createStmt); err != nil {
level.Error(c.logger).Log("msg", "failed to scan create table", "table", table.tableName, "err", err)
level.Error(c.logger).Log("msg", "failed to scan create table", append(logKVs, "err", err))
break
}
} else if table.tableType == "VIEW" {
if err = row.Scan(&tableName, &createStmt, &characterSetClient, &collationConnection); err != nil {
level.Error(c.logger).Log("msg", "failed to scan create view", "table", table.tableName, "err", err)
level.Error(c.logger).Log("msg", "failed to scan create view", append(logKVs, "err", err))
break
}
}

table.createStmt = createStmt
c.cache.Add(cacheKey, table)

spec, err := c.analyzeTableSpec(ctx, table.schema, table.tableName)
if err != nil {
level.Error(c.logger).Log("msg", "failed to analyze table spec", append(logKVs, "err", err))
break
}
jsonSpec, err := json.Marshal(spec)
if err != nil {
level.Error(c.logger).Log("msg", "failed to marshal table spec", append(logKVs, "err", err))
break
}

c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": database_observability.JobName},
Entry: logproto.Entry{
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(`level=info msg="create table" op="%s" instance="%s" schema="%s" table="%s" create_statement="%s"`, OP_CREATE_STATEMENT, c.instanceKey, table.schema, table.tableName, createStmt),
Line: fmt.Sprintf(
`level=info msg="create table" op="%s" instance="%s" schema="%s" table="%s" create_statement="%s" table_spec="%s"`,
OP_CREATE_STATEMENT, c.instanceKey, table.schema, table.tableName, base64.StdEncoding.EncodeToString([]byte(createStmt)), base64.StdEncoding.EncodeToString(jsonSpec),
),
},
}
}

return nil
}

func (c *SchemaTable) analyzeTableSpec(ctx context.Context, schemaName string, tableName string) (*tableSpec, error) {
rs, err := c.dbConnection.QueryContext(ctx, selectColumnNames, schemaName, tableName)
if err != nil {
level.Error(c.logger).Log("msg", "failed to query table columns", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}
defer rs.Close()

tblSpec := &tableSpec{Columns: []columnSpec{}}

for rs.Next() {
var columnName, isNullable, columnType, columnKey, extra string
var columnDefault sql.NullString
if err := rs.Scan(&columnName, &columnDefault, &isNullable, &columnType, &columnKey, &extra); err != nil {
level.Error(c.logger).Log("msg", "failed to scan table columns", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

extra = strings.ToUpper(extra)

notNull := false
if isNullable == "NO" {
notNull = true
}

autoIncrement := false
if strings.Contains(extra, "AUTO_INCREMENT") {
autoIncrement = true
}

primaryKey := false
if columnKey == "PRI" {
primaryKey = true
}

defaultValue := ""
if columnDefault.Valid {
defaultValue = columnDefault.String
if strings.Contains(extra, "ON UPDATE CURRENT_TIMESTAMP") {
defaultValue += " ON UPDATE CURRENT_TIMESTAMP"
}
}

colSpec := columnSpec{
Name: columnName,
Type: columnType,
NotNull: notNull,
AutoIncrement: autoIncrement,
PrimaryKey: primaryKey,
DefaultValue: defaultValue,
}
tblSpec.Columns = append(tblSpec.Columns, colSpec)
}

if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "error during iterating over table columns result set", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

return tblSpec, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collector

import (
"context"
"encoding/base64"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -76,6 +77,25 @@ func TestSchemaTable(t *testing.T) {
),
)

mock.ExpectQuery(selectColumnNames).WithArgs("some_schema", "some_table").RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"COLUMN_NAME",
"COLUMN_DEFAULT",
"IS_NULLABLE",
"COLUMN_TYPE",
"COLUMN_KEY",
"EXTRA",
}).AddRow(
"id",
"null",
"NO",
"int",
"PRI",
"auto_increment",
),
)

err = collector.Start(context.Background())
require.NoError(t, err)

Expand All @@ -99,7 +119,7 @@ func TestSchemaTable(t *testing.T) {
}
require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line)
})
t.Run("detect view schema", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -161,6 +181,25 @@ func TestSchemaTable(t *testing.T) {
),
)

mock.ExpectQuery(selectColumnNames).WithArgs("some_schema", "some_table").RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"COLUMN_NAME",
"COLUMN_DEFAULT",
"IS_NULLABLE",
"COLUMN_TYPE",
"COLUMN_KEY",
"EXTRA",
}).AddRow(
"id",
"null",
"NO",
"int",
"PRI",
"auto_increment",
),
)

err = collector.Start(context.Background())
require.NoError(t, err)

Expand All @@ -184,7 +223,7 @@ func TestSchemaTable(t *testing.T) {
}
require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="CREATE VIEW some_view (id INT)"`, lokiEntries[2].Line)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE VIEW some_view (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line)
})
t.Run("schemas result set iteration error", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -353,6 +392,25 @@ func TestSchemaTable(t *testing.T) {
),
)

mock.ExpectQuery(selectColumnNames).WithArgs("some_schema", "some_table").RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"COLUMN_NAME",
"COLUMN_DEFAULT",
"IS_NULLABLE",
"COLUMN_TYPE",
"COLUMN_KEY",
"EXTRA",
}).AddRow(
"id",
"null",
"NO",
"int",
"PRI",
"auto_increment",
),
)

err = collector.Start(context.Background())
require.NoError(t, err)

Expand All @@ -373,6 +431,6 @@ func TestSchemaTable(t *testing.T) {
}
require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line)
require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="CREATE TABLE some_table (id INT)"`, lokiEntries[2].Line)
require.Equal(t, fmt.Sprintf(`level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line)
})
}

0 comments on commit 60f43c5

Please sign in to comment.