diff --git a/internal/pool/pool.go b/internal/pool/pool.go index be7177fe6..17be47081 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -17,12 +17,15 @@ import ( ) type ( - Item[T any] interface { - *T + Item interface { IsAlive() bool Close(ctx context.Context) error } - Config[PT Item[T], T any] struct { + ItemConstraint[T any] interface { + *T + Item + } + Config[PT ItemConstraint[T], T any] struct { trace *Trace clock clockwork.Clock limit int @@ -32,15 +35,15 @@ type ( closeItem func(ctx context.Context, item PT) idleThreshold time.Duration } - itemInfo[PT Item[T], T any] struct { + itemInfo[PT ItemConstraint[T], T any] struct { idle *xlist.Element[PT] touched time.Time } - waitChPool[PT Item[T], T any] interface { + waitChPool[PT ItemConstraint[T], T any] interface { GetOrNew() *chan PT Put(t *chan PT) } - Pool[PT Item[T], T any] struct { + Pool[PT ItemConstraint[T], T any] struct { config Config[PT, T] createItem func(ctx context.Context) (PT, error) @@ -55,16 +58,16 @@ type ( done chan struct{} } - option[PT Item[T], T any] func(c *Config[PT, T]) + Option[PT ItemConstraint[T], T any] func(c *Config[PT, T]) ) -func WithCreateItemFunc[PT Item[T], T any](f func(ctx context.Context) (PT, error)) option[PT, T] { +func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context) (PT, error)) Option[PT, T] { return func(c *Config[PT, T]) { c.createItem = f } } -func WithSyncCloseItem[PT Item[T], T any]() option[PT, T] { +func WithSyncCloseItem[PT ItemConstraint[T], T any]() Option[PT, T] { return func(c *Config[PT, T]) { c.closeItem = func(ctx context.Context, item PT) { _ = item.Close(ctx) @@ -72,45 +75,45 @@ func WithSyncCloseItem[PT Item[T], T any]() option[PT, T] { } } -func WithCreateItemTimeout[PT Item[T], T any](t time.Duration) option[PT, T] { +func WithCreateItemTimeout[PT ItemConstraint[T], T any](t time.Duration) Option[PT, T] { return func(c *Config[PT, T]) { c.createTimeout = t } } -func WithCloseItemTimeout[PT Item[T], T any](t time.Duration) option[PT, T] { +func WithCloseItemTimeout[PT ItemConstraint[T], T any](t time.Duration) Option[PT, T] { return func(c *Config[PT, T]) { c.closeTimeout = t } } -func WithLimit[PT Item[T], T any](size int) option[PT, T] { +func WithLimit[PT ItemConstraint[T], T any](size int) Option[PT, T] { return func(c *Config[PT, T]) { c.limit = size } } -func WithTrace[PT Item[T], T any](t *Trace) option[PT, T] { +func WithTrace[PT ItemConstraint[T], T any](t *Trace) Option[PT, T] { return func(c *Config[PT, T]) { c.trace = t } } -func WithIdleThreshold[PT Item[T], T any](idleThreshold time.Duration) option[PT, T] { +func WithIdleThreshold[PT ItemConstraint[T], T any](idleThreshold time.Duration) Option[PT, T] { return func(c *Config[PT, T]) { c.idleThreshold = idleThreshold } } -func WithClock[PT Item[T], T any](clock clockwork.Clock) option[PT, T] { +func WithClock[PT ItemConstraint[T], T any](clock clockwork.Clock) Option[PT, T] { return func(c *Config[PT, T]) { c.clock = clock } } -func New[PT Item[T], T any]( +func New[PT ItemConstraint[T], T any]( ctx context.Context, - opts ...option[PT, T], + opts ...Option[PT, T], ) *Pool[PT, T] { p := &Pool[PT, T]{ config: Config[PT, T]{ @@ -162,14 +165,14 @@ func New[PT Item[T], T any]( } // defaultCreateItem returns a new item -func defaultCreateItem[T any, PT Item[T]](context.Context) (PT, error) { +func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) { var item T return &item, nil } // makeAsyncCreateItemFunc wraps the createItem function with timeout handling -func makeAsyncCreateItemFunc[PT Item[T], T any]( //nolint:funlen +func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen p *Pool[PT, T], ) func(ctx context.Context) (PT, error) { return func(ctx context.Context) (PT, error) { @@ -277,7 +280,7 @@ func (p *Pool[PT, T]) Stats() Stats { return p.stats() } -func makeAsyncCloseItemFunc[PT Item[T], T any]( +func makeAsyncCloseItemFunc[PT ItemConstraint[T], T any]( p *Pool[PT, T], ) func(ctx context.Context, item PT) { return func(ctx context.Context, item PT) { diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index 87fa38570..d22c48a66 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -135,7 +135,7 @@ func caller() string { return fmt.Sprintf("%s:%d", path.Base(file), line) } -func mustGetItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T]) PT { +func mustGetItem[PT ItemConstraint[T], T any](t testing.TB, p *Pool[PT, T]) PT { s, err := p.getItem(context.Background()) if err != nil { t.Helper() @@ -145,7 +145,7 @@ func mustGetItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T]) PT { return s } -func mustPutItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T], item PT) { +func mustPutItem[PT ItemConstraint[T], T any](t testing.TB, p *Pool[PT, T], item PT) { if err := p.putItem(context.Background(), item); err != nil { t.Helper() t.Fatalf("%s: %v", caller(), err) diff --git a/internal/query/client.go b/internal/query/client.go index 3d56dcae8..5f9f04104 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -16,6 +16,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/types" @@ -41,9 +42,9 @@ type ( With(ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option) error } Client struct { - config *config.Config - queryServiceClient Ydb_Query_V1.QueryServiceClient - pool sessionPool + config *config.Config + client Ydb_Query_V1.QueryServiceClient + pool sessionPool done chan struct{} } @@ -100,7 +101,7 @@ func (c *Client) FetchScriptResults(ctx context.Context, opID string, opts ...options.FetchScriptOption, ) (*options.FetchScriptResult, error) { r, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*options.FetchScriptResult, error) { - r, err := fetchScriptResults(ctx, c.queryServiceClient, opID, + r, err := fetchScriptResults(ctx, c.client, opID, append(opts, func(request *options.FetchScriptResultsRequest) { request.Trace = c.config.Trace() })..., @@ -175,7 +176,7 @@ func (c *Client) ExecuteScript( request, grpcOpts := executeQueryScriptRequest(a, q, settings) - op, err = executeScript(ctx, c.queryServiceClient, request, grpcOpts...) + op, err = executeScript(ctx, c.client, request, grpcOpts...) if err != nil { return op, xerrors.WithStackTrace(err) } @@ -200,18 +201,18 @@ func do( opts ...retry.Option, ) (finalErr error) { err := pool.With(ctx, func(ctx context.Context, s *Session) error { - s.setStatus(statusInUse) + s.SetStatus(session.StatusInUse) err := op(ctx, s) if err != nil { if xerrors.IsOperationError(err) { - s.setStatus(statusClosed) + s.SetStatus(session.StatusClosed) } return xerrors.WithStackTrace(err) } - s.setStatus(statusIdle) + s.SetStatus(session.StatusIdle) return nil }, opts...) @@ -337,7 +338,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) { settings := options.ExecuteSettings(opts...) err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) { - _, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, withTrace(s.cfg.Trace())) + _, r, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace)) if err != nil { return xerrors.WithStackTrace(err) } @@ -381,8 +382,8 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option ) { settings := options.ExecuteSettings(opts...) err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) { - _, streamResult, err := execute(ctx, s.id, s.queryServiceClient, q, - options.ExecuteSettings(opts...), withTrace(s.cfg.Trace()), + _, streamResult, err := execute(ctx, s.ID(), s.client, q, + options.ExecuteSettings(opts...), withTrace(s.trace), ) if err != nil { return xerrors.WithStackTrace(err) @@ -433,7 +434,7 @@ func clientQueryResultSet( ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption, ) (rs result.ClosableResultSet, finalErr error) { err := do(ctx, pool, func(ctx context.Context, s *Session) error { - _, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...) + _, r, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...) if err != nil { return xerrors.WithStackTrace(err) } @@ -512,18 +513,18 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options return nil } -func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Config) *Client { +func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *Client { onDone := trace.QueryOnNew(cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"), ) defer onDone() - grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer) + client := Ydb_Query_V1.NewQueryServiceClient(cc) - client := &Client{ - config: cfg, - queryServiceClient: grpcClient, - done: make(chan struct{}), + return &Client{ + config: cfg, + client: client, + done: make(chan struct{}), pool: pool.New(ctx, pool.WithLimit[*Session, Session](cfg.PoolLimit()), pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())), @@ -541,7 +542,11 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con } defer cancelCreate() - s, err := createSession(createCtx, grpcClient, cfg) + s, err := createSession(createCtx, client, + session.WithConn(cc), + session.WithDeleteTimeout(cfg.SessionDeleteTimeout()), + session.WithTrace(cfg.Trace()), + ) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -550,8 +555,6 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con }), ), } - - return client } func poolTrace(t *trace.Query) *pool.Trace { diff --git a/internal/query/client_test.go b/internal/query/client_test.go index adfc303ac..eb6d494e1 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -18,8 +18,8 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" @@ -29,24 +29,24 @@ import ( func TestClient(t *testing.T) { ctx := xtest.Context(t) - t.Run("CreateSession", func(t *testing.T) { + t.Run("createSession", func(t *testing.T) { t.Run("HappyWay", func(t *testing.T) { ctrl := gomock.NewController(t) attachStream := NewMockQueryService_AttachSessionClient(ctrl) attachStream.EXPECT().Recv().Return(&Ydb_Query.SessionState{ Status: Ydb.StatusIds_SUCCESS, }, nil).AnyTimes() - service := NewMockQueryServiceClient(ctrl) - service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{ + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{ Status: Ydb.StatusIds_SUCCESS, SessionId: "test", }, nil) - service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil) - service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{ + client.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil) + client.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{ Status: Ydb.StatusIds_SUCCESS, }, nil) attached := 0 - s, err := createSession(ctx, service, config.New(config.WithTrace( + s, err := createSession(ctx, client, session.WithTrace( &trace.Query{ OnSessionAttach: func(info trace.QuerySessionAttachStartInfo) func(info trace.QuerySessionAttachDoneInfo) { return func(info trace.QuerySessionAttachDoneInfo) { @@ -61,9 +61,9 @@ func TestClient(t *testing.T) { return nil }, }, - ))) + )) require.NoError(t, err) - require.EqualValues(t, "test", s.id) + require.EqualValues(t, "test", s.ID()) require.EqualValues(t, 1, attached) err = s.Close(ctx) require.NoError(t, err) @@ -72,22 +72,22 @@ func TestClient(t *testing.T) { t.Run("TransportError", func(t *testing.T) { t.Run("OnCall", func(t *testing.T) { ctrl := gomock.NewController(t) - service := NewMockQueryServiceClient(ctrl) - service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) - _, err := createSession(ctx, service, config.New()) + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) + _, err := createSession(ctx, client) require.Error(t, err) require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable)) }) t.Run("OnAttach", func(t *testing.T) { ctrl := gomock.NewController(t) - service := NewMockQueryServiceClient(ctrl) - service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{ + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{ Status: Ydb.StatusIds_SUCCESS, SessionId: "test", }, nil) - service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) - service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) - _, err := createSession(ctx, service, config.New()) + client.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) + client.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) + _, err := createSession(ctx, client) require.Error(t, err) require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable)) }) @@ -95,16 +95,16 @@ func TestClient(t *testing.T) { ctrl := gomock.NewController(t) attachStream := NewMockQueryService_AttachSessionClient(ctrl) attachStream.EXPECT().Recv().Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")).AnyTimes() - service := NewMockQueryServiceClient(ctrl) - service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{ + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{ Status: Ydb.StatusIds_SUCCESS, SessionId: "test", }, nil) - service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil) - service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{ + client.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil) + client.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{ Status: Ydb.StatusIds_SUCCESS, }, nil) - _, err := createSession(ctx, service, config.New()) + _, err := createSession(ctx, client) require.Error(t, err) require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable)) }) @@ -112,11 +112,11 @@ func TestClient(t *testing.T) { t.Run("OperationError", func(t *testing.T) { t.Run("OnCall", func(t *testing.T) { ctrl := gomock.NewController(t) - service := NewMockQueryServiceClient(ctrl) - service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAVAILABLE)), ) - _, err := createSession(ctx, service, config.New()) + _, err := createSession(ctx, client) require.Error(t, err) require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE)) }) @@ -126,16 +126,16 @@ func TestClient(t *testing.T) { attachStream.EXPECT().Recv().Return(nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAVAILABLE)), ) - service := NewMockQueryServiceClient(ctrl) - service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{ + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{ Status: Ydb.StatusIds_SUCCESS, SessionId: "test", }, nil) - service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil) - service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{ + client.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil) + client.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{ Status: Ydb.StatusIds_SUCCESS, }, nil) - _, err := createSession(ctx, service, config.New()) + _, err := createSession(ctx, client) require.Error(t, err) require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE)) }) @@ -1145,20 +1145,52 @@ func TestClient(t *testing.T) { }) } +type sessionControllerMock struct { + id string + status session.Status +} + +func (s *sessionControllerMock) IsAlive() bool { + switch s.status { + case session.StatusClosed, session.StatusClosing: + return false + default: + return true + } +} + +func (s *sessionControllerMock) Close(ctx context.Context) error { + return nil +} + +func (s *sessionControllerMock) SetStatus(status session.Status) { + s.status = status +} + +func (s *sessionControllerMock) ID() string { + return s.id +} + +func (s *sessionControllerMock) NodeID() uint32 { + return 0 +} + +func (s sessionControllerMock) Status() string { + return s.status.String() +} + func newTestSession(id string) *Session { return &Session{ - id: id, - statusCode: statusIdle, - cfg: config.New(), + Core: &sessionControllerMock{id: id}, + trace: &trace.Query{}, } } func newTestSessionWithClient(id string, client Ydb_Query_V1.QueryServiceClient) *Session { return &Session{ - id: id, - queryServiceClient: client, - statusCode: statusIdle, - cfg: config.New(), + Core: &sessionControllerMock{id: id}, + client: client, + trace: &trace.Query{}, } } @@ -1176,8 +1208,8 @@ func TestQueryScript(t *testing.T) { ctx := xtest.Context(t) t.Run("HappyWay", func(t *testing.T) { ctrl := gomock.NewController(t) - service := NewMockQueryServiceClient(ctrl) - service.EXPECT().ExecuteScript(gomock.Any(), gomock.Any()).Return(&Ydb_Operations.Operation{ + client := NewMockQueryServiceClient(ctrl) + client.EXPECT().ExecuteScript(gomock.Any(), gomock.Any()).Return(&Ydb_Operations.Operation{ Id: "123", Ready: true, Status: Ydb.StatusIds_SUCCESS, @@ -1219,7 +1251,7 @@ func TestQueryScript(t *testing.T) { })), CostInfo: nil, }, nil) - service.EXPECT().FetchScriptResults(gomock.Any(), gomock.Any()).Return(&Ydb_Query.FetchScriptResultsResponse{ + client.EXPECT().FetchScriptResults(gomock.Any(), gomock.Any()).Return(&Ydb_Query.FetchScriptResultsResponse{ Status: Ydb.StatusIds_SUCCESS, ResultSetIndex: 0, ResultSet: &Ydb.ResultSet{ @@ -1259,10 +1291,10 @@ func TestQueryScript(t *testing.T) { }, NextFetchToken: "456", }, nil) - op, err := executeScript(ctx, service, &Ydb_Query.ExecuteScriptRequest{}) + op, err := executeScript(ctx, client, &Ydb_Query.ExecuteScriptRequest{}) require.NoError(t, err) require.EqualValues(t, "123", op.ID) - r, err := fetchScriptResults(ctx, service, op.ID) + r, err := fetchScriptResults(ctx, client, op.ID) require.NoError(t, err) require.EqualValues(t, 0, r.ResultSetIndex) require.Equal(t, "456", r.NextToken) diff --git a/internal/query/session.go b/internal/query/session.go index b0d41e837..bba66c3b7 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -2,45 +2,40 @@ package query import ( "context" - "sync/atomic" "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/query" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) var _ query.Session = (*Session)(nil) -type Session struct { - cfg *config.Config - id string - queryServiceClient Ydb_Query_V1.QueryServiceClient - nodeID uint32 - statusCode statusCode - closeOnce func(ctx context.Context) error - checks []func(s *Session) bool -} +type ( + Session struct { + session.Core + + client Ydb_Query_V1.QueryServiceClient + trace *trace.Query + } +) func (s *Session) QueryResultSet( ctx context.Context, q string, opts ...options.Execute, ) (rs result.ClosableResultSet, finalErr error) { - onDone := trace.QueryOnSessionQueryResultSet(s.cfg.Trace(), &ctx, + onDone := trace.QueryOnSessionQueryResultSet(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Session).QueryResultSet"), s, q) defer func() { onDone(finalErr) }() - _, r, err := execute(ctx, s.id, s.queryServiceClient, q, options.ExecuteSettings(opts...), withTrace(s.cfg.Trace())) + _, r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -56,7 +51,7 @@ func (s *Session) QueryResultSet( func (s *Session) queryRow( ctx context.Context, q string, settings executeSettings, resultOpts ...resultOption, ) (row query.Row, finalErr error) { - _, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...) + _, r, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -70,13 +65,13 @@ func (s *Session) queryRow( } func (s *Session) QueryRow(ctx context.Context, q string, opts ...options.Execute) (_ query.Row, finalErr error) { - onDone := trace.QueryOnSessionQueryRow(s.cfg.Trace(), &ctx, + onDone := trace.QueryOnSessionQueryRow(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Session).QueryRow"), s, q) defer func() { onDone(finalErr) }() - row, err := s.queryRow(ctx, q, options.ExecuteSettings(opts...), withTrace(s.cfg.Trace())) + row, err := s.queryRow(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -84,165 +79,19 @@ func (s *Session) QueryRow(ctx context.Context, q string, opts ...options.Execut return row, nil } -func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, cfg *config.Config) ( - s *Session, finalErr error, -) { - s = &Session{ - cfg: cfg, - queryServiceClient: client, - statusCode: statusUnknown, - checks: []func(s *Session) bool{ - func(s *Session) bool { - switch s.status() { - case statusClosed, statusClosing: - return false - default: - return true - } - }, - }, - } - - onDone := trace.QueryOnSessionCreate(s.cfg.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.createSession"), - ) - defer func() { - if s == nil && finalErr == nil { - panic("abnormal result: both nil") - } - if s != nil && finalErr != nil { - panic("abnormal result: both not nil") - } - - if finalErr != nil { - onDone(nil, finalErr) - } else if s != nil { - onDone(s, nil) - } - }() - - response, err := client.CreateSession(ctx, &Ydb_Query.CreateSessionRequest{}) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - - s.id = response.GetSessionId() - s.nodeID = uint32(response.GetNodeId()) - - err = s.attach(ctx) +func createSession( + ctx context.Context, client Ydb_Query_V1.QueryServiceClient, opts ...session.Option, +) (*Session, error) { + core, err := session.Open(ctx, client, opts...) if err != nil { - _ = deleteSession(ctx, client, response.GetSessionId()) - return nil, xerrors.WithStackTrace(err) } - s.setStatus(statusIdle) - - return s, nil -} - -func (s *Session) attach(ctx context.Context) (finalErr error) { - onDone := trace.QueryOnSessionAttach(s.cfg.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Session).attach"), s) - defer func() { - onDone(finalErr) - }() - - attachCtx, cancelAttach := xcontext.WithCancel(xcontext.ValueOnly(ctx)) - defer func() { - if finalErr != nil { - cancelAttach() - } - }() - - attach, err := s.queryServiceClient.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{ - SessionId: s.id, - }) - if err != nil { - return xerrors.WithStackTrace(err) - } - - _, err = attach.Recv() - if err != nil { - return xerrors.WithStackTrace(err) - } - - s.closeOnce = xsync.OnceFunc(s.closeAndDeleteSession(cancelAttach)) - - go func() { - defer func() { - _ = s.closeOnce(xcontext.ValueOnly(ctx)) - }() - - for func() bool { - _, recvErr := attach.Recv() - - return recvErr == nil - }() { - } - }() - - return nil -} - -func (s *Session) closeAndDeleteSession(cancelAttach context.CancelFunc) func(ctx context.Context) (err error) { - return func(ctx context.Context) (err error) { - defer cancelAttach() - - s.setStatus(statusClosing) - defer s.setStatus(statusClosed) - - var cancel context.CancelFunc - if d := s.cfg.SessionDeleteTimeout(); d > 0 { - ctx, cancel = xcontext.WithTimeout(ctx, d) - } else { - ctx, cancel = xcontext.WithCancel(ctx) - } - defer cancel() - - if err = deleteSession(ctx, s.queryServiceClient, s.id); err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - } -} - -func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string) error { - _, err := client.DeleteSession(ctx, - &Ydb_Query.DeleteSessionRequest{ - SessionId: sessionID, - }, - ) - if err != nil { - return xerrors.WithStackTrace(err) - } - - return nil -} - -func (s *Session) IsAlive() bool { - for _, check := range s.checks { - if !check(s) { - return false - } - } - - return true -} - -func (s *Session) Close(ctx context.Context) (err error) { - onDone := trace.QueryOnSessionDelete(s.cfg.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Session).Close"), s) - defer func() { - onDone(err) - }() - - if s.closeOnce != nil { - return s.closeOnce(ctx) - } - - return nil + return &Session{ + Core: core, + trace: core.Trace, + client: core.Client, + }, nil } func (s *Session) Begin( @@ -251,7 +100,7 @@ func (s *Session) Begin( ) ( _ query.Transaction, err error, ) { - onDone := trace.QueryOnSessionBegin(s.cfg.Trace(), &ctx, + onDone := trace.QueryOnSessionBegin(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Session).Begin"), s) defer func() { onDone(err, tx.ID("lazy")) @@ -263,36 +112,16 @@ func (s *Session) Begin( }, nil } -func (s *Session) ID() string { - return s.id -} - -func (s *Session) NodeID() uint32 { - return s.nodeID -} - -func (s *Session) status() statusCode { - return statusCode(atomic.LoadUint32((*uint32)(&s.statusCode))) -} - -func (s *Session) setStatus(code statusCode) { - atomic.StoreUint32((*uint32)(&s.statusCode), uint32(code)) -} - -func (s *Session) Status() string { - return s.status().String() -} - func (s *Session) Exec( ctx context.Context, q string, opts ...options.Execute, ) (finalErr error) { - onDone := trace.QueryOnSessionExec(s.cfg.Trace(), &ctx, + onDone := trace.QueryOnSessionExec(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Session).Exec"), s, q) defer func() { onDone(finalErr) }() - _, r, err := execute(ctx, s.id, s.queryServiceClient, q, options.ExecuteSettings(opts...), withTrace(s.cfg.Trace())) + _, r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return xerrors.WithStackTrace(err) } @@ -308,13 +137,13 @@ func (s *Session) Exec( func (s *Session) Query( ctx context.Context, q string, opts ...options.Execute, ) (_ query.Result, finalErr error) { - onDone := trace.QueryOnSessionQuery(s.cfg.Trace(), &ctx, + onDone := trace.QueryOnSessionQuery(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Session).Query"), s, q) defer func() { onDone(finalErr) }() - _, r, err := execute(ctx, s.id, s.queryServiceClient, q, options.ExecuteSettings(opts...), withTrace(s.cfg.Trace())) + _, r, err := execute(ctx, s.ID(), s.client, q, options.ExecuteSettings(opts...), withTrace(s.trace)) if err != nil { return nil, xerrors.WithStackTrace(err) } diff --git a/internal/query/session/session.go b/internal/query/session/session.go new file mode 100644 index 000000000..142728953 --- /dev/null +++ b/internal/query/session/session.go @@ -0,0 +1,254 @@ +package session + +import ( + "context" + "sync/atomic" + "time" + + "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + "google.golang.org/grpc" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn" + balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" + "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +type ( + Core interface { + query.SessionInfo + pool.Item + + SetStatus(code Status) + } + core struct { + cc grpc.ClientConnInterface + Client Ydb_Query_V1.QueryServiceClient + Trace *trace.Query + + deleteTimeout time.Duration + id string + nodeID uint32 + status Status + closeOnce func(ctx context.Context) error + checks []func(s *core) bool + } +) + +func (c *core) ID() string { + return c.id +} + +func (c *core) NodeID() uint32 { + return c.nodeID +} + +func (c *core) statusCode() Status { + return Status(atomic.LoadUint32((*uint32)(&c.status))) +} + +func (c *core) SetStatus(status Status) { + atomic.StoreUint32((*uint32)(&c.status), uint32(status)) +} + +func (c *core) Status() string { + return c.statusCode().String() +} + +type Option func(*core) + +func WithConn(cc grpc.ClientConnInterface) Option { + return func(c *core) { + c.cc = cc + } +} + +func WithDeleteTimeout(deleteTimeout time.Duration) Option { + return func(c *core) { + c.deleteTimeout = deleteTimeout + } +} + +func WithTrace(t *trace.Query) Option { + return func(c *core) { + c.Trace = c.Trace.Compose(t) + } +} + +func Open( //nolint:funlen + ctx context.Context, client Ydb_Query_V1.QueryServiceClient, opts ...Option, +) (_ *core, finalErr error) { + core := &core{ + Client: client, + Trace: &trace.Query{}, + status: statusUnknown, + checks: []func(s *core) bool{ + func(s *core) bool { + switch s.statusCode() { + case StatusClosed, StatusClosing: + return false + default: + return true + } + }, + }, + } + + for _, opt := range opts { + if opt != nil { + opt(core) + } + } + + onDone := trace.QueryOnSessionCreate(core.Trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session.Open"), + ) + defer func() { + if finalErr == nil { + onDone(core, nil) + } else { + onDone(nil, finalErr) + } + }() + + response, err := client.CreateSession(ctx, &Ydb_Query.CreateSessionRequest{}) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + if core.cc != nil { + core.Client = Ydb_Query_V1.NewQueryServiceClient( + conn.WithContextModifier(core.cc, func(ctx context.Context) context.Context { + return balancerContext.WithNodeID(ctx, core.NodeID()) + }), + ) + } + + core.id = response.GetSessionId() + core.nodeID = uint32(response.GetNodeId()) + + err = core.attach(ctx) + if err != nil { + _ = core.deleteSession(ctx) + + return nil, xerrors.WithStackTrace(err) + } + + core.SetStatus(StatusIdle) + + return core, nil +} + +func (c *core) attach(ctx context.Context) (finalErr error) { + onDone := trace.QueryOnSessionAttach(c.Trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session.(*core).attach"), + c, + ) + defer func() { + onDone(finalErr) + }() + + attachCtx, cancelAttach := xcontext.WithCancel(xcontext.ValueOnly(ctx)) + defer func() { + if finalErr != nil { + cancelAttach() + } + }() + + attach, err := c.Client.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{ + SessionId: c.id, + }) + if err != nil { + return xerrors.WithStackTrace(err) + } + + _, err = attach.Recv() + if err != nil { + return xerrors.WithStackTrace(err) + } + + c.closeOnce = xsync.OnceFunc(c.closeAndDelete(cancelAttach)) + + go func() { + defer func() { + _ = c.closeOnce(xcontext.ValueOnly(ctx)) + }() + + for func() bool { + _, recvErr := attach.Recv() + + return recvErr == nil + }() { + } + }() + + return nil +} + +func (c *core) closeAndDelete(cancelAttach context.CancelFunc) func(ctx context.Context) (err error) { + return func(ctx context.Context) (err error) { + defer cancelAttach() + + c.SetStatus(StatusClosing) + defer c.SetStatus(StatusClosed) + + var cancel context.CancelFunc + if d := c.deleteTimeout; d > 0 { + ctx, cancel = xcontext.WithTimeout(ctx, d) + } else { + ctx, cancel = xcontext.WithCancel(ctx) + } + defer cancel() + + if err = c.deleteSession(ctx); err != nil { + return xerrors.WithStackTrace(err) + } + + return nil + } +} + +func (c *core) deleteSession(ctx context.Context) (finalErr error) { + onDone := trace.QueryOnSessionDelete(c.Trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session.(*core).deleteSession"), + c, + ) + defer func() { + onDone(finalErr) + }() + + _, err := c.Client.DeleteSession(ctx, + &Ydb_Query.DeleteSessionRequest{ + SessionId: c.id, + }, + ) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil +} + +func (c *core) IsAlive() bool { + for _, check := range c.checks { + if !check(c) { + return false + } + } + + return true +} + +func (c *core) Close(ctx context.Context) (err error) { + if c.closeOnce != nil { + return c.closeOnce(ctx) + } + + return nil +} diff --git a/internal/query/session/status.go b/internal/query/session/status.go new file mode 100644 index 000000000..e8123dee0 --- /dev/null +++ b/internal/query/session/status.go @@ -0,0 +1,32 @@ +package session + +import ( + "fmt" +) + +type Status uint32 + +const ( + statusUnknown = Status(iota) + StatusIdle + StatusInUse + StatusClosing + StatusClosed +) + +func (s Status) String() string { + switch s { + case statusUnknown: + return "Unknown" + case StatusIdle: + return "Idle" + case StatusInUse: + return "InUse" + case StatusClosing: + return "Closing" + case StatusClosed: + return "Closed" + default: + return fmt.Sprintf("Unknown%d", s) + } +} diff --git a/internal/query/session_fixtures_test.go b/internal/query/session_fixtures_test.go index 86911cc06..b1884a3ed 100644 --- a/internal/query/session_fixtures_test.go +++ b/internal/query/session_fixtures_test.go @@ -10,7 +10,7 @@ import ( func SessionOverGrpcMock(e fixenv.Env) *Session { f := func() (*fixenv.GenericResult[*Session], error) { s := newTestSession(fmt.Sprintf("test-session-id-%v", e.T().Name())) - s.queryServiceClient = QueryGrpcMock(e) + s.client = QueryGrpcMock(e) return fixenv.NewGenericResult(s), nil } diff --git a/internal/query/session_status.go b/internal/query/session_status.go deleted file mode 100644 index 37ae90f2b..000000000 --- a/internal/query/session_status.go +++ /dev/null @@ -1,34 +0,0 @@ -package query - -import ( - "fmt" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/session" -) - -type statusCode uint32 - -const ( - statusUnknown = statusCode(iota) - statusIdle - statusInUse - statusClosing - statusClosed -) - -func (s statusCode) String() string { - switch s { - case statusUnknown: - return session.StatusUnknown - case statusIdle: - return session.StatusIdle - case statusInUse: - return session.StatusInUse - case statusClosing: - return session.StatusClosing - case statusClosed: - return session.StatusClosed - default: - return fmt.Sprintf("Unknown%d", s) - } -} diff --git a/internal/query/session_test.go b/internal/query/session_test.go index 6fdbf8522..0274e0c3e 100644 --- a/internal/query/session_test.go +++ b/internal/query/session_test.go @@ -10,7 +10,7 @@ import ( grpcCodes "google.golang.org/grpc/codes" grpcStatus "google.golang.org/grpc/status" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -41,9 +41,8 @@ func TestCreateSession(t *testing.T) { client.EXPECT().AttachSession(gomock.Any(), &Ydb_Query.AttachSessionRequest{ SessionId: "123", }).Return(attachStream, nil) - t.Log("createSession") require.NotPanics(t, func() { - s, err := createSession(ctx, client, config.New(config.WithTrace(trace))) + s, err := createSession(ctx, client, session.WithTrace(trace)) require.NoError(t, err) require.NotNil(t, s) require.Equal(t, "123", s.ID()) @@ -57,9 +56,8 @@ func TestCreateSession(t *testing.T) { client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, xerrors.Transport(grpcStatus.Error(grpcCodes.Unavailable, "test")), ) - t.Log("createSession") require.NotPanics(t, func() { - s, err := createSession(ctx, client, config.New(config.WithTrace(trace))) + s, err := createSession(ctx, client, session.WithTrace(trace)) require.Error(t, err) require.Nil(t, s) }) @@ -80,9 +78,8 @@ func TestCreateSession(t *testing.T) { }).Return(&Ydb_Query.DeleteSessionResponse{ Status: Ydb.StatusIds_SUCCESS, }, nil) - t.Log("createSession") require.NotPanics(t, func() { - s, err := createSession(ctx, client, config.New(config.WithTrace(trace))) + s, err := createSession(ctx, client, session.WithTrace(trace)) require.Error(t, err) require.Nil(t, s) }) @@ -96,9 +93,8 @@ func TestCreateSession(t *testing.T) { client.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAVAILABLE)), ) - t.Log("createSession") require.NotPanics(t, func() { - s, err := createSession(ctx, client, config.New(config.WithTrace(trace))) + s, err := createSession(ctx, client, session.WithTrace(trace)) require.Error(t, err) require.Nil(t, s) }) @@ -119,9 +115,8 @@ func TestCreateSession(t *testing.T) { }).Return(&Ydb_Query.DeleteSessionResponse{ Status: Ydb.StatusIds_SUCCESS, }, nil) - t.Log("createSession") require.NotPanics(t, func() { - s, err := createSession(ctx, client, config.New(config.WithTrace(trace))) + s, err := createSession(ctx, client, session.WithTrace(trace)) require.Error(t, err) require.Nil(t, s) }) diff --git a/internal/query/transaction.go b/internal/query/transaction.go index abcf2af44..50e10a403 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -11,6 +11,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session" queryTx "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" baseTx "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" @@ -52,7 +53,7 @@ func begin( defer a.Free() response, err := client.BeginTransaction(ctx, &Ydb_Query.BeginTransactionRequest{ - SessionId: s.id, + SessionId: s.ID(), TxSettings: txSettings.ToYDB(a), }, ) @@ -68,7 +69,7 @@ func (tx *Transaction) UnLazy(ctx context.Context) (err error) { return nil } - tx.Identifier, err = begin(ctx, tx.s.queryServiceClient, tx.s, tx.txSettings) + tx.Identifier, err = begin(ctx, tx.s.client, tx.s, tx.txSettings) if err != nil { return xerrors.WithStackTrace(err) } @@ -79,7 +80,7 @@ func (tx *Transaction) UnLazy(ctx context.Context) (err error) { func (tx *Transaction) QueryResultSet( ctx context.Context, q string, opts ...options.Execute, ) (rs result.ClosableResultSet, finalErr error) { - onDone := trace.QueryOnTxQueryResultSet(tx.s.cfg.Trace(), &ctx, + onDone := trace.QueryOnTxQueryResultSet(tx.s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Transaction).QueryResultSet"), tx, q) defer func() { onDone(finalErr) @@ -95,7 +96,7 @@ func (tx *Transaction) QueryResultSet( } resultOpts := []resultOption{ - withTrace(tx.s.cfg.Trace()), + withTrace(tx.s.trace), } if settings.TxControl().Commit { // notification about complete transaction must be sended for any error or for successfully read all result if @@ -106,7 +107,7 @@ func (tx *Transaction) QueryResultSet( }), ) } - txID, r, err := execute(ctx, tx.s.id, tx.s.queryServiceClient, q, settings, resultOpts...) + txID, r, err := execute(ctx, tx.s.ID(), tx.s.client, q, settings, resultOpts...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -134,7 +135,7 @@ func (tx *Transaction) QueryResultSet( func (tx *Transaction) QueryRow( ctx context.Context, q string, opts ...options.Execute, ) (row query.Row, finalErr error) { - onDone := trace.QueryOnTxQueryRow(tx.s.cfg.Trace(), &ctx, + onDone := trace.QueryOnTxQueryRow(tx.s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Transaction).QueryRow"), tx, q) defer func() { onDone(finalErr) @@ -148,7 +149,7 @@ func (tx *Transaction) QueryRow( ) resultOpts := []resultOption{ - withTrace(tx.s.cfg.Trace()), + withTrace(tx.s.trace), } if settings.TxControl().Commit { // notification about complete transaction must be sended for any error or for successfully read all result if @@ -159,7 +160,7 @@ func (tx *Transaction) QueryRow( }), ) } - txID, r, err := execute(ctx, tx.s.id, tx.s.queryServiceClient, q, settings, resultOpts...) + txID, r, err := execute(ctx, tx.s.ID(), tx.s.client, q, settings, resultOpts...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -201,7 +202,7 @@ func (tx *Transaction) ID() string { func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execute) ( finalErr error, ) { - onDone := trace.QueryOnTxExec(tx.s.cfg.Trace(), &ctx, + onDone := trace.QueryOnTxExec(tx.s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Transaction).Exec"), tx.s, tx, q) defer func() { onDone(finalErr) @@ -217,7 +218,7 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu } resultOpts := []resultOption{ - withTrace(tx.s.cfg.Trace()), + withTrace(tx.s.trace), } if settings.TxControl().Commit { // notification about complete transaction must be sended for any error or for successfully read all result if @@ -229,7 +230,7 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu ) } - txID, r, err := execute(ctx, tx.s.id, tx.s.queryServiceClient, q, settings, resultOpts...) + txID, r, err := execute(ctx, tx.s.ID(), tx.s.client, q, settings, resultOpts...) if err != nil { return xerrors.WithStackTrace(err) } @@ -288,7 +289,7 @@ func (tx *Transaction) executeSettings(opts ...options.Execute) (_ executeSettin func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Execute) ( _ query.Result, finalErr error, ) { - onDone := trace.QueryOnTxQuery(tx.s.cfg.Trace(), &ctx, + onDone := trace.QueryOnTxQuery(tx.s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Transaction).Query"), tx.s, tx, q) defer func() { onDone(finalErr) @@ -304,7 +305,7 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec } resultOpts := []resultOption{ - withTrace(tx.s.cfg.Trace()), + withTrace(tx.s.trace), } if settings.TxControl().Commit { // notification about complete transaction must be sended for any error or for successfully read all result if @@ -315,7 +316,7 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec }), ) } - txID, r, err := execute(ctx, tx.s.id, tx.s.queryServiceClient, q, settings, resultOpts...) + txID, r, err := execute(ctx, tx.s.ID(), tx.s.client, q, settings, resultOpts...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -359,10 +360,10 @@ func (tx *Transaction) CommitTx(ctx context.Context) (err error) { return nil } - err = commitTx(ctx, tx.s.queryServiceClient, tx.s.id, tx.ID()) + err = commitTx(ctx, tx.s.client, tx.s.ID(), tx.ID()) if err != nil { if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) { - tx.s.setStatus(statusClosed) + tx.s.SetStatus(session.StatusClosed) } return xerrors.WithStackTrace(err) @@ -392,10 +393,10 @@ func (tx *Transaction) Rollback(ctx context.Context) error { tx.notifyOnCompleted(ErrTransactionRollingBack) - err := rollback(ctx, tx.s.queryServiceClient, tx.s.id, tx.ID()) + err := rollback(ctx, tx.s.client, tx.s.ID(), tx.ID()) if err != nil { if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) { - tx.s.setStatus(statusClosed) + tx.s.SetStatus(session.StatusClosed) } return xerrors.WithStackTrace(err) diff --git a/internal/query/transaction_test.go b/internal/query/transaction_test.go index 2f51b8f25..3cc8f3a23 100644 --- a/internal/query/transaction_test.go +++ b/internal/query/transaction_test.go @@ -45,7 +45,7 @@ func TestBegin(t *testing.T) { }, }, nil) t.Log("begin") - tx, err := begin(ctx, client, &Session{id: "123"}, query.TxSettings()) + tx, err := begin(ctx, client, &Session{Core: &sessionControllerMock{id: "123"}}, query.TxSettings()) require.NoError(t, err) require.Equal(t, "123", tx.ID()) }) @@ -55,7 +55,7 @@ func TestBegin(t *testing.T) { client := NewMockQueryServiceClient(ctrl) client.EXPECT().BeginTransaction(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) t.Log("begin") - _, err := begin(ctx, client, &Session{id: "123"}, query.TxSettings()) + _, err := begin(ctx, client, &Session{Core: &sessionControllerMock{id: "123"}}, query.TxSettings()) require.Error(t, err) require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable)) }) @@ -67,7 +67,7 @@ func TestBegin(t *testing.T) { xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAVAILABLE)), ) t.Log("begin") - _, err := begin(ctx, client, &Session{id: "123"}, query.TxSettings()) + _, err := begin(ctx, client, &Session{Core: &sessionControllerMock{id: "123"}}, query.TxSettings()) require.Error(t, err) require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE)) }) diff --git a/internal/session/status.go b/internal/session/status.go deleted file mode 100644 index 49fbe7425..000000000 --- a/internal/session/status.go +++ /dev/null @@ -1,12 +0,0 @@ -package session - -type Status = string - -const ( - StatusUnknown = Status("Unknown") - StatusIdle = Status("Idle") - StatusInUse = Status("InUse") - StatusClosing = Status("Closing") - StatusClosed = Status("Closed") - StatusError = Status("Error") -)