Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgsql wire #1852

Merged
merged 23 commits into from
Nov 7, 2023
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b7b6a0f
chore(embedded/sql): wip emulate pg_type system table
jeroiraz Oct 24, 2023
36ddf89
chore(pkg/pgsql): uuid and float types conversion
jeroiraz Oct 24, 2023
e485f01
chore(pkg/pgsql): handle deallocate prepared stmt
jeroiraz Oct 24, 2023
a48a86c
chore(pkg/pgsql): decouple error from ready to query messages
jeroiraz Oct 24, 2023
b0e42fb
chore(pkg/pgsql): pgsql write protocol improvements
jeroiraz Oct 24, 2023
5644dac
chore(pkg/pgsql): transactional query machine
jeroiraz Oct 26, 2023
cb413ea
chore(cmd/immudb): upgrade to new pgsql changes
jeroiraz Oct 26, 2023
ce9ec82
chore(pkg/client): possibility to retrieve session id
jeroiraz Oct 26, 2023
9e5886c
chore(pkg/server): upgrade to transactional pgsql server
jeroiraz Oct 26, 2023
f9fcb17
chore(pkg/pgsql): proper handling of queries with empty resultsets
jeroiraz Oct 26, 2023
5be79ff
chore(pkg/pgsql): support multiple-statements in simple-query mode
jeroiraz Oct 27, 2023
81c9794
chore(pkg/pgsql): comment describing pgsql wire protocol constraints
jeroiraz Oct 27, 2023
5da2ae7
feat(embedded/sql): show databases/tables stmt
jeroiraz Oct 27, 2023
8f4e1db
chore(pkg/server): set dynamic immudb server port in pgsql server
jeroiraz Oct 27, 2023
8ef4251
chore(pkg/pgsql): protocol enhancements
jeroiraz Oct 27, 2023
e724e98
test(pkg/pgsql): integration tests
jeroiraz Oct 27, 2023
4ddfa7b
chore(embedded/sql): continue to support databases and tables datasou…
jeroiraz Oct 27, 2023
e5a6265
chore(pkg/server): pgsql server creation only when enabled
jeroiraz Oct 28, 2023
59212aa
chore(pkg/pgsql): tls support
jeroiraz Nov 3, 2023
bab0935
chore(pkg/pgsql): single command complete message
jeroiraz Nov 3, 2023
e786e56
test(pkg/pgsql): query machine protocol upgrade
jeroiraz Nov 6, 2023
ed6903a
test(embedded/sql): cover pg_type reserved table
jeroiraz Nov 7, 2023
feefb30
test(pkg/pgsql): unit testing for deallocate stmt
jeroiraz Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore(pkg/pgsql): pgsql write protocol improvements
Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>
jeroiraz committed Nov 3, 2023
commit b0e42fb51b1591aac70998ea86bd0c4c6953e933
183 changes: 103 additions & 80 deletions pkg/pgsql/server/query_machine.go
Original file line number Diff line number Diff line change
@@ -46,13 +46,14 @@ func (s *session) QueriesMachine(ctx context.Context) (err error) {
for {
msg, extQueryMode, err := s.nextMessage()
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
s.log.Warningf("connection is closed")
return nil
}
s.ErrorHandle(err)
continue
}

// When an error is detected while processing any extended-query message, the backend issues ErrorResponse,
// then reads and discards messages until a Sync is reached, then issues ReadyForQuery and returns to normal
// message processing. (But note that no skipping occurs if an error is detected while processing Sync — this
@@ -67,34 +68,71 @@ func (s *session) QueriesMachine(ctx context.Context) (err error) {
case fm.TerminateMsg:
return s.mr.CloseConnection()
case fm.QueryMsg:
if err = s.fetchAndWriteResults(ctx, v.GetStatements(), nil, nil, false); err != nil {
s.ErrorHandle(err)
continue
}
if _, err = s.writeMessage(bm.CommandComplete([]byte(`ok`))); err != nil {
err := s.fetchAndWriteResults(ctx, v.GetStatements(), nil, nil, false)
if err != nil {
waitForSync = extQueryMode
s.ErrorHandle(err)
continue
}

if _, err = s.writeMessage(bm.ReadyForQuery()); err != nil {
s.ErrorHandle(err)
continue
waitForSync = extQueryMode
}
case fm.ParseMsg:
_, ok := s.statements[v.DestPreparedStatementName]
// unnamed prepared statement overrides previous
if ok && v.DestPreparedStatementName != "" {
waitForSync = extQueryMode
s.ErrorHandle(fmt.Errorf("statement '%s' already present", v.DestPreparedStatementName))
continue
}

var paramCols []*schema.Column
var resCols []*schema.Column
var stmt sql.SQLStmt
if !s.isInBlackList(v.Statements) {
if paramCols, resCols, err = s.inferParamAndResultCols(ctx, v.Statements); err != nil {
s.ErrorHandle(err)
waitForSync = true
continue

if s.isInBlackList(v.Statements) {
_, err := s.writeMessage(bm.ParseComplete())
if err != nil {
waitForSync = extQueryMode
}
continue
}
_, ok := s.statements[v.DestPreparedStatementName]
// unnamed prepared statement overrides previous
if ok && v.DestPreparedStatementName != "" {
s.ErrorHandle(errors.New("statement already present"))
waitForSync = true

// todo @Michele The query string contained in a Parse message cannot include more than one SQL statement;
// else a syntax error is reported. This restriction does not exist in the simple-query protocol,
// but it does exist in the extended protocol, because allowing prepared statements or portals to contain
// multiple commands would complicate the protocol unduly.
stmts, err := sql.Parse(strings.NewReader(v.Statements))
if err != nil {
waitForSync = extQueryMode
s.ErrorHandle(err)
continue
}

// The query string contained in a Parse message cannot include more than one SQL statement;
// else a syntax error is reported. This restriction does not exist in the simple-query protocol, but it does exist
// in the extended protocol, because allowing prepared statements or portals to contain multiple commands would
// complicate the protocol unduly.
if len(stmts) > 1 {
waitForSync = extQueryMode
s.ErrorHandle(pserr.ErrMaxStmtNumberExceeded)
continue
}
if len(stmts) == 0 {
waitForSync = extQueryMode
s.ErrorHandle(pserr.ErrNoStatementFound)
continue
}

if paramCols, resCols, err = s.inferParamAndResultCols(ctx, stmts[0]); err != nil {
waitForSync = extQueryMode
s.ErrorHandle(err)
continue
}

_, err = s.writeMessage(bm.ParseComplete())
if err != nil {
waitForSync = extQueryMode
continue
}

@@ -109,11 +147,6 @@ func (s *session) QueriesMachine(ctx context.Context) (err error) {

s.statements[v.DestPreparedStatementName] = newStatement

if _, err = s.writeMessage(bm.ParseComplete()); err != nil {
s.ErrorHandle(err)
waitForSync = true
continue
}
case fm.DescribeMsg:
// The Describe message (statement variant) specifies the name of an existing prepared statement
// (or an empty string for the unnamed prepared statement). The response is a ParameterDescription
@@ -126,18 +159,18 @@ func (s *session) QueriesMachine(ctx context.Context) (err error) {
if v.DescType == "S" {
st, ok := s.statements[v.Name]
if !ok {
s.ErrorHandle(errors.New("statement not found"))
waitForSync = true
waitForSync = extQueryMode
s.ErrorHandle(fmt.Errorf("statement '%s' not found", v.Name))
continue
}

if _, err = s.writeMessage(bm.ParameterDescription(st.Params)); err != nil {
s.ErrorHandle(err)
waitForSync = true
waitForSync = extQueryMode
continue
}

if _, err := s.writeMessage(bm.RowDescription(st.Results, nil)); err != nil {
s.ErrorHandle(err)
waitForSync = true
waitForSync = extQueryMode
continue
}
}
@@ -146,42 +179,46 @@ func (s *session) QueriesMachine(ctx context.Context) (err error) {
// returned by executing the portal; or a NoData message if the portal does not contain a query that
// will return rows; or ErrorResponse if there is no such portal.
if v.DescType == "P" {
st, ok := s.portals[v.Name]
portal, ok := s.portals[v.Name]
if !ok {
s.ErrorHandle(fmt.Errorf("portal %s not found", v.Name))
waitForSync = true
waitForSync = extQueryMode
s.ErrorHandle(fmt.Errorf("portal '%s' not found", v.Name))
continue
}
if _, err = s.writeMessage(bm.RowDescription(st.Statement.Results, st.ResultColumnFormatCodes)); err != nil {
s.ErrorHandle(err)
waitForSync = true

if _, err = s.writeMessage(bm.RowDescription(portal.Statement.Results, portal.ResultColumnFormatCodes)); err != nil {
waitForSync = extQueryMode
continue
}
}
case fm.SyncMsg:
if _, err = s.writeMessage(bm.ReadyForQuery()); err != nil {
s.ErrorHandle(err)
}
waitForSync = false
s.writeMessage(bm.ReadyForQuery())
case fm.BindMsg:
_, ok := s.portals[v.DestPortalName]
// unnamed portal overrides previous
if ok && v.DestPortalName != "" {
s.ErrorHandle(fmt.Errorf("portal %s already present", v.DestPortalName))
waitForSync = true
waitForSync = extQueryMode
s.ErrorHandle(fmt.Errorf("portal '%s' already present", v.DestPortalName))
continue
}

st, ok := s.statements[v.PreparedStatementName]
if !ok {
s.ErrorHandle(fmt.Errorf("statement %s not found", v.PreparedStatementName))
waitForSync = true
waitForSync = extQueryMode
s.ErrorHandle(fmt.Errorf("statement '%s' not found", v.PreparedStatementName))
continue
}

encodedParams, err := buildNamedParams(st.Params, v.ParamVals)
if err != nil {
waitForSync = extQueryMode
s.ErrorHandle(err)
waitForSync = true
continue
}

if _, err = s.writeMessage(bm.BindComplete()); err != nil {
waitForSync = extQueryMode
continue
}

@@ -191,32 +228,36 @@ func (s *session) QueriesMachine(ctx context.Context) (err error) {
Parameters: encodedParams,
ResultColumnFormatCodes: v.ResultColumnFormatCodes,
}
s.portals[v.DestPortalName] = newPortal

if _, err = s.writeMessage(bm.BindComplete()); err != nil {
s.ErrorHandle(err)
waitForSync = true
continue
}
s.portals[v.DestPortalName] = newPortal
case fm.Execute:
//query execution
if err = s.fetchAndWriteResults(ctx, s.portals[v.PortalName].Statement.SQLStatement,
s.portals[v.PortalName].Parameters,
s.portals[v.PortalName].ResultColumnFormatCodes,
portal, ok := s.portals[v.PortalName]
if !ok {
waitForSync = extQueryMode
s.ErrorHandle(fmt.Errorf("portal '%s' not found", v.PortalName))
continue
}

delete(s.portals, v.PortalName)

if err = s.fetchAndWriteResults(ctx, portal.Statement.SQLStatement,
portal.Parameters,
portal.ResultColumnFormatCodes,
true); err != nil {
waitForSync = extQueryMode
s.ErrorHandle(err)
waitForSync = true
continue
}
if _, err := s.writeMessage(bm.CommandComplete([]byte(`ok`))); err != nil {
s.ErrorHandle(err)
waitForSync = true

if _, err := s.writeMessage(bm.CommandComplete([]byte("ok"))); err != nil {
waitForSync = extQueryMode
}
case fm.FlushMsg:
// there is no buffer to be flushed
default:
waitForSync = extQueryMode
s.ErrorHandle(pserr.ErrUnknowMessageType)
continue
}
}
}
@@ -225,17 +266,18 @@ func (s *session) fetchAndWriteResults(ctx context.Context, statements string, p
if s.isInBlackList(statements) {
return nil
}

if i := s.isEmulableInternally(statements); i != nil {
if err := s.tryToHandleInternally(i); err != nil && err != pserr.ErrMessageCannotBeHandledInternally {
return err
}
return nil
}

stmts, err := sql.Parse(strings.NewReader(statements))
if err != nil {
return err
}

for _, stmt := range stmts {
switch st := stmt.(type) {
case *sql.UseDatabaseStmt:
@@ -256,6 +298,7 @@ func (s *session) fetchAndWriteResults(ctx context.Context, statements string, p
}
}
}

return nil
}

@@ -310,27 +353,7 @@ type statement struct {
Results []*schema.Column
}

func (s *session) inferParamAndResultCols(ctx context.Context, statement string) ([]*schema.Column, []*schema.Column, error) {
// todo @Michele The query string contained in a Parse message cannot include more than one SQL statement;
// else a syntax error is reported. This restriction does not exist in the simple-query protocol,
// but it does exist in the extended protocol, because allowing prepared statements or portals to contain
// multiple commands would complicate the protocol unduly.
stmts, err := sql.Parse(strings.NewReader(statement))
if err != nil {
return nil, nil, err
}
// The query string contained in a Parse message cannot include more than one SQL statement;
// else a syntax error is reported. This restriction does not exist in the simple-query protocol, but it does exist
// in the extended protocol, because allowing prepared statements or portals to contain multiple commands would
// complicate the protocol unduly.
if len(stmts) > 1 {
return nil, nil, pserr.ErrMaxStmtNumberExceeded
}
if len(stmts) == 0 {
return nil, nil, pserr.ErrNoStatementFound
}
stmt := stmts[0]

func (s *session) inferParamAndResultCols(ctx context.Context, stmt sql.SQLStmt) ([]*schema.Column, []*schema.Column, error) {
resCols := make([]*schema.Column, 0)

sel, ok := stmt.(*sql.SelectStmt)