From d5c65a485cb582e2a90df72646388c2e1a07f156 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 15 Jan 2025 18:06:38 +0300 Subject: [PATCH 1/6] handle session error on server response instead of callback error --- internal/query/client.go | 6 ------ internal/query/session.go | 20 ++++++++++++++++++++ internal/query/transaction.go | 11 ++--------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/internal/query/client.go b/internal/query/client.go index c7155342f..2d720bf09 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -212,8 +212,6 @@ func do( err := op(ctx, s) if err != nil { - s.SetStatus(session.StatusError) - return xerrors.WithStackTrace(err) } @@ -276,10 +274,6 @@ func doTx( defer func() { _ = tx.Rollback(ctx) - - if opErr != nil { - s.SetStatus(session.StatusError) - } }() err = op(ctx, tx) diff --git a/internal/query/session.go b/internal/query/session.go index 99d3e2100..436d5c2f0 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -4,6 +4,7 @@ import ( "context" "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" @@ -38,6 +39,8 @@ func (s *Session) QueryResultSet( r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { + s.handleSessionErrorStatus(err) + return nil, xerrors.WithStackTrace(err) } @@ -54,6 +57,8 @@ func (s *Session) queryRow( ) (row query.Row, finalErr error) { r, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...) if err != nil { + s.handleSessionErrorStatus(err) + return nil, xerrors.WithStackTrace(err) } @@ -120,6 +125,8 @@ func (s *Session) Begin( txID, err := begin(ctx, s.client, s.ID(), txSettings) if err != nil { + s.handleSessionErrorStatus(err) + return nil, xerrors.WithStackTrace(err) } @@ -140,6 +147,8 @@ func (s *Session) Exec( r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { + s.handleSessionErrorStatus(err) + return xerrors.WithStackTrace(err) } @@ -162,8 +171,19 @@ func (s *Session) Query( r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { + s.handleSessionErrorStatus(err) + return nil, xerrors.WithStackTrace(err) } return r, nil } + +func (s *Session) handleSessionErrorStatus(err error) { + switch { + case xerrors.IsTransportError(err) || xerrors.IsOperationError(err, Ydb.StatusIds_SESSION_BUSY, Ydb.StatusIds_BAD_SESSION): + s.SetStatus(session.StatusError) + case xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION): + s.SetStatus(session.StatusClosed) + } +} diff --git a/internal/query/transaction.go b/internal/query/transaction.go index 99cb96c52..f7cc25dcc 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -5,13 +5,11 @@ import ( "fmt" "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session" queryTx "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" baseTx "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" @@ -338,10 +336,7 @@ func (tx *Transaction) CommitTx(ctx context.Context) (finalErr error) { err = commitTx(ctx, tx.s.client, tx.s.ID(), tx.ID()) if err != nil { - if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) { - tx.s.SetStatus(session.StatusClosed) - } - + tx.s.handleSessionErrorStatus(err) return xerrors.WithStackTrace(err) } @@ -376,9 +371,7 @@ func (tx *Transaction) Rollback(ctx context.Context) (finalErr error) { err := rollback(ctx, tx.s.client, tx.s.ID(), tx.ID()) if err != nil { - if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) { - tx.s.SetStatus(session.StatusClosed) - } + tx.s.handleSessionErrorStatus(err) return xerrors.WithStackTrace(err) } From c0e1abe72eea0124f6d98ca5b0972d65f0816ade Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 15 Jan 2025 18:22:03 +0300 Subject: [PATCH 2/6] add regression test --- CHANGELOG.md | 2 ++ internal/query/session.go | 15 ++++++++------- internal/query/transaction.go | 5 +++-- tests/integration/query_regression_test.go | 19 +++++++++++++++++++ 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00c93592a..73b84fa7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Fixed drop session from pool unnecessary in query service + ## v3.95.6 * Fixed panic on span reporting in `xsql/Tx` diff --git a/internal/query/session.go b/internal/query/session.go index 436d5c2f0..566878c84 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -39,7 +39,7 @@ func (s *Session) QueryResultSet( r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { - s.handleSessionErrorStatus(err) + s.setStatusFromError(err) return nil, xerrors.WithStackTrace(err) } @@ -57,7 +57,7 @@ func (s *Session) queryRow( ) (row query.Row, finalErr error) { r, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...) if err != nil { - s.handleSessionErrorStatus(err) + s.setStatusFromError(err) return nil, xerrors.WithStackTrace(err) } @@ -125,7 +125,7 @@ func (s *Session) Begin( txID, err := begin(ctx, s.client, s.ID(), txSettings) if err != nil { - s.handleSessionErrorStatus(err) + s.setStatusFromError(err) return nil, xerrors.WithStackTrace(err) } @@ -147,7 +147,7 @@ func (s *Session) Exec( r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { - s.handleSessionErrorStatus(err) + s.setStatusFromError(err) return xerrors.WithStackTrace(err) } @@ -171,7 +171,7 @@ func (s *Session) Query( r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { - s.handleSessionErrorStatus(err) + s.setStatusFromError(err) return nil, xerrors.WithStackTrace(err) } @@ -179,9 +179,10 @@ func (s *Session) Query( return r, nil } -func (s *Session) handleSessionErrorStatus(err error) { +func (s *Session) setStatusFromError(err error) { switch { - case xerrors.IsTransportError(err) || xerrors.IsOperationError(err, Ydb.StatusIds_SESSION_BUSY, Ydb.StatusIds_BAD_SESSION): + case xerrors.IsTransportError(err) || + xerrors.IsOperationError(err, Ydb.StatusIds_SESSION_BUSY, Ydb.StatusIds_BAD_SESSION): s.SetStatus(session.StatusError) case xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION): s.SetStatus(session.StatusClosed) diff --git a/internal/query/transaction.go b/internal/query/transaction.go index f7cc25dcc..8f510a339 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -336,7 +336,8 @@ func (tx *Transaction) CommitTx(ctx context.Context) (finalErr error) { err = commitTx(ctx, tx.s.client, tx.s.ID(), tx.ID()) if err != nil { - tx.s.handleSessionErrorStatus(err) + tx.s.setStatusFromError(err) + return xerrors.WithStackTrace(err) } @@ -371,7 +372,7 @@ func (tx *Transaction) Rollback(ctx context.Context) (finalErr error) { err := rollback(ctx, tx.s.client, tx.s.ID(), tx.ID()) if err != nil { - tx.s.handleSessionErrorStatus(err) + tx.s.setStatusFromError(err) return xerrors.WithStackTrace(err) } diff --git a/tests/integration/query_regression_test.go b/tests/integration/query_regression_test.go index 2293d792d..d7a287430 100644 --- a/tests/integration/query_regression_test.go +++ b/tests/integration/query_regression_test.go @@ -376,3 +376,22 @@ func TestReadTwoPartsIntoMemoryIssue1559(t *testing.T) { require.Equal(t, targetCount, len(rows)) require.Greater(t, partReaded, 1) } + +// https://github.com/ydb-platform/ydb-go-sdk/issues/1607 +func TestCloseSessionOnCustomerErrorsIssue1607(t *testing.T) { + scope := newScope(t) + + sessionID1 := "" + _ = scope.Driver().Query().Do(scope.Ctx, func(ctx context.Context, s query.Session) error { + sessionID1 = s.ID() + return errors.New("test") + }) + + sessionID2 := "" + _ = scope.Driver().Query().Do(scope.Ctx, func(ctx context.Context, s query.Session) error { + sessionID2 = s.ID() + return nil + }) + + scope.Require.Equal(sessionID1, sessionID2) +} From c978f54efa9a569fb64c53b6317fcd4d70229520 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 15 Jan 2025 18:32:12 +0300 Subject: [PATCH 3/6] added handle error places --- internal/query/transaction.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/query/transaction.go b/internal/query/transaction.go index 8f510a339..216f9d95e 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -114,6 +114,8 @@ func (tx *Transaction) QueryResultSet( } r, err := execute(ctx, tx.s.ID(), tx.s.client, q, settings, resultOpts...) if err != nil { + tx.s.setStatusFromError(err) + return nil, xerrors.WithStackTrace(err) } @@ -163,6 +165,8 @@ func (tx *Transaction) QueryRow( } r, err := execute(ctx, tx.s.ID(), tx.s.client, q, settings, resultOpts...) if err != nil { + tx.s.setStatusFromError(err) + return nil, xerrors.WithStackTrace(err) } @@ -229,6 +233,7 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu r, err := execute(ctx, tx.s.ID(), tx.s.client, q, settings, resultOpts...) if err != nil { + tx.s.setStatusFromError(err) return xerrors.WithStackTrace(err) } @@ -297,6 +302,8 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec } r, err := execute(ctx, tx.s.ID(), tx.s.client, q, settings, resultOpts...) if err != nil { + tx.s.setStatusFromError(err) + return nil, xerrors.WithStackTrace(err) } From 8fd431787752ae77a36c4d33d372396c75f66444 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 15 Jan 2025 19:10:29 +0300 Subject: [PATCH 4/6] fix style --- internal/query/transaction.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/query/transaction.go b/internal/query/transaction.go index 216f9d95e..c670cae52 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -234,6 +234,7 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu r, err := execute(ctx, tx.s.ID(), tx.s.client, q, settings, resultOpts...) if err != nil { tx.s.setStatusFromError(err) + return xerrors.WithStackTrace(err) } From 2c0380eda682ea13bdc0227b7e2937d20ca1fa36 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 15 Jan 2025 20:18:54 +0300 Subject: [PATCH 5/6] style --- internal/query/session.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/query/session.go b/internal/query/session.go index 566878c84..3d77fe6cf 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -181,8 +181,9 @@ func (s *Session) Query( func (s *Session) setStatusFromError(err error) { switch { - case xerrors.IsTransportError(err) || - xerrors.IsOperationError(err, Ydb.StatusIds_SESSION_BUSY, Ydb.StatusIds_BAD_SESSION): + case xerrors.IsTransportError(err): + s.SetStatus(session.StatusError) + case xerrors.IsOperationError(err, Ydb.StatusIds_SESSION_BUSY, Ydb.StatusIds_BAD_SESSION): s.SetStatus(session.StatusError) case xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION): s.SetStatus(session.StatusClosed) From ebc674883cc6291f8f61d6c908b18a737dce3ba8 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 15 Jan 2025 20:19:56 +0300 Subject: [PATCH 6/6] typo --- internal/query/client.go | 4 ++-- internal/query/client_test.go | 2 +- internal/query/session.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/query/client.go b/internal/query/client.go index 2d720bf09..aa6727ed9 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -533,7 +533,7 @@ func CreateSession(ctx context.Context, c *Client) (*Session, error) { return nil, xerrors.WithStackTrace(err) } - s.laztTx = c.config.LazyTx() + s.lazyTx = c.config.LazyTx() return s, nil }) @@ -584,7 +584,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) * return nil, xerrors.WithStackTrace(err) } - s.laztTx = cfg.LazyTx() + s.lazyTx = cfg.LazyTx() return s, nil }), diff --git a/internal/query/client_test.go b/internal/query/client_test.go index 1a1933ab4..2006b378f 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -1580,7 +1580,7 @@ func newTestSessionWithClient(id string, client Ydb_Query_V1.QueryServiceClient, Core: &sessionControllerMock{id: id}, client: client, trace: &trace.Query{}, - laztTx: lazyTx, + lazyTx: lazyTx, } } diff --git a/internal/query/session.go b/internal/query/session.go index 3d77fe6cf..138cb7ca0 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -24,7 +24,7 @@ type ( client Ydb_Query_V1.QueryServiceClient trace *trace.Query - laztTx bool + lazyTx bool } ) @@ -116,7 +116,7 @@ func (s *Session) Begin( } }() - if s.laztTx { + if s.lazyTx { return &Transaction{ s: s, txSettings: txSettings,