Skip to content

Commit

Permalink
database_observability: extend parsing of table names in queries (#2556)
Browse files Browse the repository at this point in the history
database_observability: extend parsing of table names in queries

Better support for subqueries, unions and inserts.
  • Loading branch information
cristiangreco authored Jan 29, 2025
1 parent 7cb72f6 commit 68adba2
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 13 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ Main (unreleased)

- (_Experimental_) Fix handling of view table types when detecting schema in `database_observability.mysql` (@matthewnolf)

- (_Experimental_) fix error handling during result set iteration in `database_observability.mysql` (@cristiangreco)
- (_Experimental_) Fix error handling during result set iteration in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Better support for table name parsing in `database_observability.mysql` (@cristiangreco)

- Add json format support for log export via faro receiver (@ravishankar15)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
},
}

tables := c.tablesFromQuery(stmt)
tables := c.tablesFromQuery(digest, stmt)
for _, table := range tables {
c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": database_observability.JobName},
Expand Down Expand Up @@ -199,29 +199,49 @@ func (c QuerySample) stmtType(stmt sqlparser.Statement) string {
return "update"
case *sqlparser.Delete:
return "delete"
case *sqlparser.Union:
return "select" // label union as a select
default:
return ""
}
}

func (c QuerySample) tablesFromQuery(stmt sqlparser.Statement) []string {
func (c QuerySample) tablesFromQuery(digest string, stmt sqlparser.Statement) []string {
var parsedTables []string

switch stmt := stmt.(type) {
case *sqlparser.Select:
parsedTables = c.parseTableExprs(stmt.From)
case *sqlparser.Insert:
parsedTables = []string{c.parseTableName(stmt.Table)}
parsedTables = c.parseTableExprs(digest, stmt.From)
case *sqlparser.Update:
parsedTables = c.parseTableExprs(stmt.TableExprs)
parsedTables = c.parseTableExprs(digest, stmt.TableExprs)
case *sqlparser.Delete:
parsedTables = c.parseTableExprs(stmt.TableExprs)
parsedTables = c.parseTableExprs(digest, stmt.TableExprs)
case *sqlparser.Insert:
parsedTables = []string{c.parseTableName(stmt.Table)}
switch insRowsStmt := stmt.Rows.(type) {
case *sqlparser.Select:
parsedTables = append(parsedTables, c.tablesFromQuery(digest, insRowsStmt)...)
case *sqlparser.Union:
for _, side := range []sqlparser.SelectStatement{insRowsStmt.Left, insRowsStmt.Right} {
parsedTables = append(parsedTables, c.tablesFromQuery(digest, side)...)
}
case *sqlparser.ParenSelect:
parsedTables = append(parsedTables, c.tablesFromQuery(digest, insRowsStmt.Select)...)
default:
level.Error(c.logger).Log("msg", "unknown insert type", "digest", digest)
}
case *sqlparser.Union:
for _, side := range []sqlparser.SelectStatement{stmt.Left, stmt.Right} {
parsedTables = append(parsedTables, c.tablesFromQuery(digest, side)...)
}
default:
level.Error(c.logger).Log("msg", "unknown statement type", "digest", digest)
}

return parsedTables
}

func (c QuerySample) parseTableExprs(tables sqlparser.TableExprs) []string {
func (c QuerySample) parseTableExprs(digest string, tables sqlparser.TableExprs) []string {
parsedTables := []string{}
for i := 0; i < len(tables); i++ {
t := tables[i]
Expand All @@ -231,16 +251,28 @@ func (c QuerySample) parseTableExprs(tables sqlparser.TableExprs) []string {
case sqlparser.TableName:
parsedTables = append(parsedTables, c.parseTableName(expr))
case *sqlparser.Subquery:
subquery := expr.Select.(*sqlparser.Select)
parsedTables = append(parsedTables, c.parseTableExprs(subquery.From)...)
switch subqueryExpr := expr.Select.(type) {
case *sqlparser.Select:
parsedTables = append(parsedTables, c.parseTableExprs(digest, subqueryExpr.From)...)
case *sqlparser.Union:
for _, side := range []sqlparser.SelectStatement{subqueryExpr.Left, subqueryExpr.Right} {
parsedTables = append(parsedTables, c.tablesFromQuery(digest, side)...)
}
case *sqlparser.ParenSelect:
parsedTables = append(parsedTables, c.tablesFromQuery(digest, subqueryExpr.Select)...)
default:
level.Error(c.logger).Log("msg", "unknown subquery type", "digest", digest)
}
default:
level.Error(c.logger).Log("msg", "unknown nested table expression", "table", tableExpr)
level.Error(c.logger).Log("msg", "unknown nested table expression", "digest", digest, "table", tableExpr)
}
case *sqlparser.JoinTableExpr:
// continue parsing both sides of join
tables = append(tables, tableExpr.LeftExpr, tableExpr.RightExpr)
case *sqlparser.ParenTableExpr:
tables = append(tables, tableExpr.Exprs...)
default:
level.Error(c.logger).Log("msg", "unknown table type", "table", t)
level.Error(c.logger).Log("msg", "unknown table type", "digest", digest, "table", t)
}
}
return parsedTables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,103 @@ func TestQuerySample(t *testing.T) {
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="other_schema" digest="abc123" table="some_table"`,
},
},
{
name: "union query",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"SELECT id, name FROM employees_ny UNION SELECT id, name FROM employees_ca UNION SELECT id, name FROM employees_tx",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select id, name from employees_ny union select id, name from employees_ca union select id, name from employees_tx"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_ny"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_ca"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_tx"`,
},
},
{
name: "from subquery with union",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"SELECT COUNT(DISTINCT t.role_id) AS roles, COUNT(DISTINCT r.id) AS fixed_roles FROM (SELECT role_id FROM user_role UNION ALL SELECT role_id FROM team_role) AS t LEFT JOIN (SELECT id FROM role WHERE name LIKE 'prefix%') AS r ON t.role_id = r.id",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select COUNT(distinct t.role_id) as roles, COUNT(distinct r.id) as fixed_roles from (select role_id from user_role union all select role_id from team_role) as t left join (select id from role where name like :redacted1) as r on t.role_id = r.id"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="user_role"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="team_role"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="role"`,
},
},
{
name: "subquery and union",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"SELECT * FROM (SELECT id, name FROM employees_us_east UNION SELECT id, name FROM employees_us_west) as employees_us UNION SELECT id, name FROM employees_emea",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from (select id, name from employees_us_east union select id, name from employees_us_west) as employees_us union select id, name from employees_emea"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us_east"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us_west"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_emea"`,
},
},
{
name: "insert with subquery and union",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"INSERT INTO customers (id, name) SELECT id, name FROM customers_us UNION SELECT id, name FROM customers_eu",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into customers(id, name) select id, name from customers_us union select id, name from customers_eu"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="customers"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="customers_us"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="customers_eu"`,
},
},
{
name: "join with subquery and union",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"SELECT * FROM departments dep JOIN (SELECT id, name FROM employees_us UNION SELECT id, name FROM employees_eu) employees ON dep.id = employees.id",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from departments as dep join (select id, name from employees_us union select id, name from employees_eu) as employees on dep.id = employees.id"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="departments"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_eu"`,
},
},
{
name: "insert with subquery and join",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"INSERT INTO some_table SELECT * FROM departments dep JOIN (SELECT id, name FROM employees_us UNION SELECT id, name FROM employees_eu) employees ON dep.id = employees.id",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table select * from departments as dep join (select id, name from employees_us union select id, name from employees_eu) as employees on dep.id = employees.id"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="departments"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_eu"`,
},
},
}

for _, tc := range testcases {
Expand Down

0 comments on commit 68adba2

Please sign in to comment.