Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgsql wire #1852

Merged
merged 23 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b7b6a0f
chore(embedded/sql): wip emulate pg_type system table
jeroiraz Oct 24, 2023
36ddf89
chore(pkg/pgsql): uuid and float types conversion
jeroiraz Oct 24, 2023
e485f01
chore(pkg/pgsql): handle deallocate prepared stmt
jeroiraz Oct 24, 2023
a48a86c
chore(pkg/pgsql): decouple error from ready to query messages
jeroiraz Oct 24, 2023
b0e42fb
chore(pkg/pgsql): pgsql write protocol improvements
jeroiraz Oct 24, 2023
5644dac
chore(pkg/pgsql): transactional query machine
jeroiraz Oct 26, 2023
cb413ea
chore(cmd/immudb): upgrade to new pgsql changes
jeroiraz Oct 26, 2023
ce9ec82
chore(pkg/client): possibility to retrieve session id
jeroiraz Oct 26, 2023
9e5886c
chore(pkg/server): upgrade to transactional pgsql server
jeroiraz Oct 26, 2023
f9fcb17
chore(pkg/pgsql): proper handling of queries with empty resultsets
jeroiraz Oct 26, 2023
5be79ff
chore(pkg/pgsql): support multiple-statements in simple-query mode
jeroiraz Oct 27, 2023
81c9794
chore(pkg/pgsql): comment describing pgsql wire protocol constraints
jeroiraz Oct 27, 2023
5da2ae7
feat(embedded/sql): show databases/tables stmt
jeroiraz Oct 27, 2023
8f4e1db
chore(pkg/server): set dynamic immudb server port in pgsql server
jeroiraz Oct 27, 2023
8ef4251
chore(pkg/pgsql): protocol enhancements
jeroiraz Oct 27, 2023
e724e98
test(pkg/pgsql): integration tests
jeroiraz Oct 27, 2023
4ddfa7b
chore(embedded/sql): continue to support databases and tables datasou…
jeroiraz Oct 27, 2023
e5a6265
chore(pkg/server): pgsql server creation only when enabled
jeroiraz Oct 28, 2023
59212aa
chore(pkg/pgsql): tls support
jeroiraz Nov 3, 2023
bab0935
chore(pkg/pgsql): single command complete message
jeroiraz Nov 3, 2023
e786e56
test(pkg/pgsql): query machine protocol upgrade
jeroiraz Nov 6, 2023
ed6903a
test(embedded/sql): cover pg_type reserved table
jeroiraz Nov 7, 2023
feefb30
test(pkg/pgsql): unit testing for deallocate stmt
jeroiraz Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/immudb/command/immudbcmdtest/immuServerMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ type ImmuServerMock struct {
Logger logger.Logger
StateSigner server.StateSigner
Ssf stream.ServiceFactory
PgsqlSrv pgsqlsrv.Server
PgsqlSrv pgsqlsrv.PGSQLServer
DbList database.DatabaseList
}

func (s *ImmuServerMock) WithPgsqlServer(psrv pgsqlsrv.Server) server.ImmuServerIf {
func (s *ImmuServerMock) WithPgsqlServer(psrv pgsqlsrv.PGSQLServer) server.ImmuServerIf {
s.PgsqlSrv = psrv
return s
}
Expand Down
47 changes: 46 additions & 1 deletion embedded/sql/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,56 @@ type Column struct {
}

func newCatalog(enginePrefix []byte) *Catalog {
return &Catalog{
ctlg := &Catalog{
enginePrefix: enginePrefix,
tablesByID: make(map[uint32]*Table),
tablesByName: make(map[string]*Table),
}

pgTypeTable := &Table{
catalog: ctlg,
name: "pg_type",
cols: []*Column{
{
colName: "oid",
colType: VarcharType,
maxLen: 10,
},
{
colName: "typbasetype",
colType: VarcharType,
maxLen: 10,
},
{
colName: "typname",
colType: VarcharType,
maxLen: 50,
},
},
}

pgTypeTable.colsByName = make(map[string]*Column, len(pgTypeTable.cols))

for _, col := range pgTypeTable.cols {
pgTypeTable.colsByName[col.colName] = col
}

pgTypeTable.indexes = []*Index{
{
unique: true,
cols: []*Column{
pgTypeTable.colsByName["oid"],
},
colsByID: map[uint32]*Column{
0: pgTypeTable.colsByName["oid"],
},
},
}

pgTypeTable.primaryIndex = pgTypeTable.indexes[0]
ctlg.tablesByName[pgTypeTable.name] = pgTypeTable

return ctlg
}

func (catlg *Catalog) ExistTable(table string) bool {
Expand Down
44 changes: 44 additions & 0 deletions embedded/sql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ func TestUseDatabaseWithoutMultiDBHandler(t *testing.T) {
_, err := engine.Query(context.Background(), nil, "SELECT * FROM DATABASES()", nil)
require.ErrorIs(t, err, ErrUnspecifiedMultiDBHandler)
})

r, err := engine.Query(context.Background(), nil, "SELECT ts FROM pg_type WHERE ts < 1 + NOW()", nil)
require.NoError(t, err)
defer r.Close()

_, err = r.Read(context.Background())
require.ErrorIs(t, err, ErrNoMoreRows)
}

func TestCreateTable(t *testing.T) {
Expand Down Expand Up @@ -6017,6 +6024,25 @@ func TestMultiDBCatalogQueries(t *testing.T) {
require.NoError(t, err)
})

t.Run("show databases", func(t *testing.T) {
r, err := engine.Query(context.Background(), nil, "SHOW DATABASES", nil)
require.NoError(t, err)

for _, db := range dbs {
row, err := r.Read(context.Background())
require.NoError(t, err)
require.NotNil(t, row)
require.NotNil(t, row)
require.Equal(t, db, row.ValuesBySelector["(databases.name)"].RawValue())
}

_, err = r.Read(context.Background())
require.ErrorIs(t, err, ErrNoMoreRows)

err = r.Close()
require.NoError(t, err)
})

t.Run("query databases using conditions with table and column aliasing", func(t *testing.T) {
r, err := engine.Query(context.Background(), nil, "SELECT dbs.name as dbname FROM DATABASES() as dbs WHERE name LIKE 'db*'", nil)
require.NoError(t, err)
Expand Down Expand Up @@ -6136,6 +6162,24 @@ func TestSingleDBCatalogQueries(t *testing.T) {
require.ErrorIs(t, err, ErrNoMoreRows)
})

t.Run("show tables", func(t *testing.T) {
r, err := engine.Query(context.Background(), tx, "SHOW TABLES", nil)
require.NoError(t, err)

defer r.Close()

row, err := r.Read(context.Background())
require.NoError(t, err)
require.Equal(t, "mytable1", row.ValuesBySelector["(tables.name)"].RawValue())

row, err = r.Read(context.Background())
require.NoError(t, err)
require.Equal(t, "mytable2", row.ValuesBySelector["(tables.name)"].RawValue())

_, err = r.Read(context.Background())
require.ErrorIs(t, err, ErrNoMoreRows)
})

t.Run("unconditional index query should return all the indexes of mytable1", func(t *testing.T) {
params := map[string]interface{}{
"tableName": "mytable1",
Expand Down
3 changes: 3 additions & 0 deletions embedded/sql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ var reservedWords = map[string]int{
"IS": IS,
"CAST": CAST,
"::": SCAST,
"SHOW": SHOW,
"DATABASES": DATABASES,
"TABLES": TABLES,
}

var joinTypes = map[string]JoinType{
Expand Down
31 changes: 28 additions & 3 deletions embedded/sql/row_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,25 @@ func (d *ColDescriptor) Selector() string {
return EncodeSelector(d.AggFn, d.Table, d.Column)
}

type emptyKeyReader struct {
}

func (r emptyKeyReader) Read(ctx context.Context) (key []byte, val store.ValueRef, err error) {
return nil, nil, store.ErrNoMoreEntries
}

func (r emptyKeyReader) ReadBetween(ctx context.Context, initialTxID uint64, finalTxID uint64) (key []byte, val store.ValueRef, err error) {
return nil, nil, store.ErrNoMoreEntries
}

func (r emptyKeyReader) Reset() error {
return nil
}

func (r emptyKeyReader) Close() error {
return nil
}

func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, period period, tableAlias string, scanSpecs *ScanSpecs) (*rawRowReader, error) {
if table == nil || scanSpecs == nil || scanSpecs.Index == nil {
return nil, ErrIllegalArguments
Expand All @@ -155,9 +174,15 @@ func newRawRowReader(tx *SQLTx, params map[string]interface{}, table *Table, per
return nil, err
}

r, err := tx.newKeyReader(*rSpec)
if err != nil {
return nil, err
var r store.KeyReader

if table.name == "pg_type" {
r = &emptyKeyReader{}
} else {
r, err = tx.newKeyReader(*rSpec)
if err != nil {
return nil, err
}
}

if tableAlias == "" {
Expand Down
25 changes: 25 additions & 0 deletions embedded/sql/sql_grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func setResult(l yyLexer, stmts []SQLStmt) {
%token SELECT DISTINCT FROM JOIN HAVING WHERE GROUP BY LIMIT OFFSET ORDER ASC DESC AS UNION ALL
%token NOT LIKE IF EXISTS IN IS
%token AUTO_INCREMENT NULL CAST SCAST
%token SHOW DATABASES TABLES
%token <id> NPARAM
%token <pparam> PPARAM
%token <joinType> JOINTYPE
Expand Down Expand Up @@ -523,6 +524,20 @@ dqlstmt:
right: $4.(DataSource),
}
}
|
SHOW DATABASES
{
$$ = &SelectStmt{
ds: &FnDataSourceStmt{fnCall: &FnCall{fn: "databases"}},
}
}
|
SHOW TABLES
{
$$ = &SelectStmt{
ds: &FnDataSourceStmt{fnCall: &FnCall{fn: "tables"}},
}
}

select_stmt: SELECT opt_distinct opt_selectors FROM ds opt_indexon opt_joins opt_where opt_groupby opt_having opt_orderby opt_limit opt_offset
{
Expand Down Expand Up @@ -625,6 +640,16 @@ ds:
$2.(*SelectStmt).as = $4
$$ = $2.(DataSource)
}
|
DATABASES '(' ')' opt_as
{
$$ = &FnDataSourceStmt{fnCall: &FnCall{fn: "databases"}, as: $4}
}
|
TABLES '(' ')' opt_as
{
$$ = &FnDataSourceStmt{fnCall: &FnCall{fn: "tables"}, as: $4}
}
|
fnCall opt_as
{
Expand Down
Loading