Skip to content

Commit

Permalink
Create in-memory tables
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Dec 12, 2024
1 parent 818092a commit 9afc001
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 68 deletions.
2 changes: 1 addition & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

const VERSION = "0.24.1"
const VERSION = "0.25.0"

func main() {
config := LoadConfig()
Expand Down
2 changes: 1 addition & 1 deletion src/query_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func NewQueryHandler(config *Config, duckdb *Duckdb, icebergReader *IcebergReade
queryHandler := &QueryHandler{
duckdb: duckdb,
icebergReader: icebergReader,
selectRemapper: NewSelectRemapper(config, icebergReader),
selectRemapper: NewSelectRemapper(config, icebergReader, duckdb),
config: config,
}

Expand Down
12 changes: 8 additions & 4 deletions src/query_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func TestHandleQuery(t *testing.T) {
"description": {"oid", "typename"},
"values": {},
},
"SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = 'public' LIMIT 1) LIMIT 1": {
"description": {"relname"},
"values": {"test_table"},
},
// pg_namespace
"SELECT DISTINCT(nspname) FROM pg_catalog.pg_namespace WHERE nspname != 'information_schema' AND nspname != 'pg_catalog'": {
"description": {"nspname"},
Expand Down Expand Up @@ -68,12 +72,12 @@ func TestHandleQuery(t *testing.T) {
},
// Information schema
"SELECT * FROM information_schema.tables": {
"description": {"table_catalog", "table_schema", "table_name", "table_type", "self_referencing_column_name", "reference_generation", "user_defined_type_catalog", "user_defined_type_schema", "user_defined_type_name", "is_insertable_into", "is_typed", "commit_action"},
"values": {"bemidb", "public", "test_table", "BASE TABLE", "NULL", "NULL", "NULL", "NULL", "NULL", "YES", "NO", "NULL"},
"description": {"table_catalog", "table_schema", "table_name", "table_type", "self_referencing_column_name", "reference_generation", "user_defined_type_catalog", "user_defined_type_schema", "user_defined_type_name", "is_insertable_into", "is_typed", "commit_action", "TABLE_COMMENT"},
"values": {"memory", "public", "test_table", "BASE TABLE", "", "", "", "", "", "YES", "NO", "", ""},
},
"SELECT table_catalog, table_schema, table_name AS table FROM information_schema.tables": {
"description": {"table_catalog", "table_schema", "table"},
"values": {"bemidb", "public", "test_table"},
"values": {"memory", "public", "test_table"},
},
// SET
"SET client_encoding TO 'UTF8'": {
Expand Down Expand Up @@ -470,7 +474,7 @@ func TestHandleQuery(t *testing.T) {

expectedErrorMessage := strings.Join([]string{
"Catalog Error: Table with name non_existent_table does not exist!",
"Did you mean \"sqlite_temp_master\"?",
"Did you mean \"test_table\"?",
"LINE 1: SELECT * FROM non_existent_table",
" ^",
}, "\n")
Expand Down
49 changes: 6 additions & 43 deletions src/query_parser_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
PG_TABLE_PG_NAMESPACE = "pg_namespace"
PG_TABLE_PG_ROLES = "pg_roles"
PG_TABLE_PG_SHDESCRIPTION = "pg_shdescription"
PG_TABLE_PG_CLASS = "pg_class"

PG_SCHEMA_INFORMATION_SCHEMA = "information_schema"
PG_TABLE_TABLES = "tables"
Expand Down Expand Up @@ -125,6 +126,11 @@ func (parser *QueryParserTable) MakePgShdescriptionNode(alias string) *pgQuery.N
return parser.utils.MakeSubselectNode(columns, [][]string{rowValues}, alias)
}

// pg_catalog.pg_class
func (parser *QueryParserTable) IsPgClassTable(qSchemaTable QuerySchemaTable) bool {
return parser.isPgCatalogSchema(qSchemaTable) && qSchemaTable.Table == PG_TABLE_PG_CLASS
}

// Other system pg_* tables
func (parser *QueryParserTable) IsTableFromPgCatalog(qSchemaTable QuerySchemaTable) bool {
return parser.isPgCatalogSchema(qSchemaTable) &&
Expand All @@ -136,34 +142,6 @@ func (parser *QueryParserTable) IsInformationSchemaTablesTable(qSchemaTable Quer
return parser.IsTableFromInformationSchema(qSchemaTable) && qSchemaTable.Table == PG_TABLE_TABLES
}

// information_schema.tables -> VALUES(values...) t(columns...)
func (parser *QueryParserTable) MakeInformationSchemaTablesNode(database string, schemaAndTables []IcebergSchemaTable, alias string) *pgQuery.Node {
columns := PG_INFORMATION_SCHEMA_TABLES_VALUE_BY_COLUMN.Keys()
staticRowValues := PG_INFORMATION_SCHEMA_TABLES_VALUE_BY_COLUMN.Values()

var rowsValues [][]string

for _, schemaTable := range schemaAndTables {
rowValues := make([]string, len(staticRowValues))
copy(rowValues, staticRowValues)

for i, column := range columns {
switch column {
case "table_catalog":
rowValues[i] = database
case "table_schema":
rowValues[i] = schemaTable.Schema
case "table_name":
rowValues[i] = schemaTable.Table
}
}

rowsValues = append(rowsValues, rowValues)
}

return parser.utils.MakeSubselectNode(columns, rowsValues, alias)
}

// Other information_schema.* tables
func (parser *QueryParserTable) IsTableFromInformationSchema(qSchemaTable QuerySchemaTable) bool {
return qSchemaTable.Schema == PG_SCHEMA_INFORMATION_SCHEMA
Expand Down Expand Up @@ -407,21 +385,6 @@ var PG_SYSTEM_VIEWS = NewSet([]string{
"pg_statio_user_sequences",
})

var PG_INFORMATION_SCHEMA_TABLES_VALUE_BY_COLUMN = NewOrderedMap([][]string{
{"table_catalog", "bemidb"},
{"table_schema", "public"},
{"table_name", "bemidb_table"},
{"table_type", "BASE TABLE"},
{"self_referencing_column_name", "NULL"},
{"reference_generation", "NULL"},
{"user_defined_type_catalog", "NULL"},
{"user_defined_type_schema", "NULL"},
{"user_defined_type_name", "NULL"},
{"is_insertable_into", "YES"},
{"is_typed", "NO"},
{"commit_action", "NULL"},
})

var PG_STATIO_USER_TABLES_VALUE_BY_COLUMN = NewOrderedMap([][]string{
{"relid", "0"},
{"schemaname", "public"},
Expand Down
6 changes: 4 additions & 2 deletions src/select_remapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ type SelectRemapper struct {
remapperWhere *SelectRemapperWhere
remapperSelect *SelectRemapperSelect
icebergReader *IcebergReader
duckdb *Duckdb
config *Config
}

func NewSelectRemapper(config *Config, icebergReader *IcebergReader) *SelectRemapper {
func NewSelectRemapper(config *Config, icebergReader *IcebergReader, duckdb *Duckdb) *SelectRemapper {
return &SelectRemapper{
parserTable: NewQueryParserTable(config),
remapperTable: NewSelectRemapperTable(config, icebergReader),
remapperTable: NewSelectRemapperTable(config, icebergReader, duckdb),
remapperWhere: NewSelectRemapperWhere(config),
remapperSelect: NewSelectRemapperSelect(config),
icebergReader: icebergReader,
duckdb: duckdb,
config: config,
}
}
Expand Down
44 changes: 27 additions & 17 deletions src/select_remapper_table.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

pgQuery "github.com/pganalyze/pg_query_go/v5"
)

Expand All @@ -12,19 +14,19 @@ type SelectRemapperTable struct {
parserTable *QueryParserTable
icebergSchemaTables []IcebergSchemaTable
icebergReader *IcebergReader
duckdb *Duckdb
config *Config
}

func NewSelectRemapperTable(config *Config, icebergReader *IcebergReader) *SelectRemapperTable {
icebergSchemaTables, err := icebergReader.SchemaTables()
PanicIfError(err)

return &SelectRemapperTable{
parserTable: NewQueryParserTable(config),
icebergSchemaTables: icebergSchemaTables,
icebergReader: icebergReader,
config: config,
func NewSelectRemapperTable(config *Config, icebergReader *IcebergReader, duckdb *Duckdb) *SelectRemapperTable {
remapper := &SelectRemapperTable{
parserTable: NewQueryParserTable(config),
icebergReader: icebergReader,
duckdb: duckdb,
config: config,
}
remapper.reloadIceberSchemaTables()
return remapper
}

// FROM / JOIN [TABLE]
Expand Down Expand Up @@ -56,22 +58,24 @@ func (remapper *SelectRemapperTable) RemapTable(node *pgQuery.Node) *pgQuery.Nod
return remapper.overrideTable(node, tableNode)
}

// pg_catalog.pg_* other system tables
// pg_catalog.pg_class -> reload Iceberg tables
if parser.IsPgClassTable(qSchemaTable) {
remapper.reloadIceberSchemaTables()
return node
}

// pg_catalog.pg_* other system tables -> return as is
if parser.IsTableFromPgCatalog(qSchemaTable) {
return node
}

// information_schema.tables -> return Iceberg tables
// information_schema.tables -> reload Iceberg tables
if parser.IsInformationSchemaTablesTable(qSchemaTable) {
remapper.reloadIceberSchemaTables()
if len(remapper.icebergSchemaTables) == 0 {
return node
}
tableNode := parser.MakeInformationSchemaTablesNode(remapper.config.Database, remapper.icebergSchemaTables, qSchemaTable.Alias)
return remapper.overrideTable(node, tableNode)
return node
}

// information_schema.* other system tables
// information_schema.* other system tables -> return as is
if parser.IsTableFromInformationSchema(qSchemaTable) {
return node
}
Expand Down Expand Up @@ -120,6 +124,12 @@ func (remapper *SelectRemapperTable) overrideTable(node *pgQuery.Node, fromClaus
func (remapper *SelectRemapperTable) reloadIceberSchemaTables() {
icebergSchemaTables, err := remapper.icebergReader.SchemaTables()
PanicIfError(err)

ctx := context.Background()
for _, icebergSchemaTable := range icebergSchemaTables {
remapper.duckdb.ExecContext(ctx, "CREATE TABLE IF NOT EXISTS "+icebergSchemaTable.String()+" (id INT)", nil)
}

remapper.icebergSchemaTables = icebergSchemaTables
}

Expand Down

0 comments on commit 9afc001

Please sign in to comment.