From 5802d706734db3fc954378841295e224144b0560 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Wed, 17 Apr 2024 16:47:53 +0300 Subject: [PATCH] * Added `table/options.UseQueryServiceExecute()` option for redirect `table.Session.Execute` call to `query.Execute` --- CHANGELOG.md | 2 + balancers/balancers.go | 2 +- internal/balancer/balancer.go | 2 +- internal/balancer/connections_state.go | 6 +- internal/balancer/connections_state_test.go | 16 +- internal/balancer/ctx.go | 2 +- internal/conn/conn.go | 2 +- internal/conn/error.go | 4 +- internal/conn/pool.go | 2 +- internal/credentials/access_error.go | 4 +- internal/discovery/discovery.go | 2 +- internal/endpoint/endpoint.go | 8 +- internal/mock/conn.go | 6 +- internal/params/parameters.go | 2 +- internal/query/client.go | 4 +- internal/query/client_test.go | 2 +- internal/query/execute_query.go | 98 ++++++++----- internal/query/execute_query_test.go | 137 ++++++++++++++--- internal/query/options/execute.go | 154 +++++++++----------- internal/query/options/retry.go | 15 +- internal/query/options/trace.go | 23 +++ internal/query/result_set.go | 9 ++ internal/query/row.go | 2 + internal/query/session.go | 40 +++-- internal/query/transaction.go | 31 ++-- internal/query/transaction_test.go | 35 +++-- internal/query/tx/control.go | 18 +-- internal/table/client.go | 2 +- internal/table/client_test.go | 4 +- internal/table/data_query.go | 6 +- internal/table/session.go | 91 +++++++++--- internal/table/statement.go | 2 +- internal/table/transaction.go | 26 ++-- log/query.go | 2 - metrics/node_id.go | 4 +- query/session.go | 2 +- table/options/options.go | 146 ++++++++++++++----- table/table.go | 2 +- tests/integration/table_use_query_test.go | 99 +++++++++++++ testutil/driver.go | 2 +- trace/driver.go | 2 +- trace/query.go | 5 +- trace/query_gtrace.go | 3 +- trace/table.go | 2 +- 44 files changed, 707 insertions(+), 321 deletions(-) create mode 100644 internal/query/options/trace.go create mode 100644 tests/integration/table_use_query_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 03957b2fc..b8ebcab92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added `table/options.UseQueryServiceExecute()` option for redirect `table.Session.Execute` call to `query.Execute` + ## v3.65.1 * Updated dependency `ydb-go-genproto` * Added processing of `Ydb.StatusIds_EXTERNAL_ERROR` in `retry.Retry` diff --git a/balancers/balancers.go b/balancers/balancers.go index 2fe525a53..fcea2e3ef 100644 --- a/balancers/balancers.go +++ b/balancers/balancers.go @@ -111,7 +111,7 @@ func PreferLocationsWithFallback(balancer *balancerConfig.Config, locations ...s } type Endpoint interface { - NodeID() uint32 + NodeID() int64 Address() string Location() string diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index f69ec11e2..5c4effd77 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -46,7 +46,7 @@ type Balancer struct { onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info) } -func (b *Balancer) HasNode(id uint32) bool { +func (b *Balancer) HasNode(id int64) bool { if b.config.SingleConn { return true } diff --git a/internal/balancer/connections_state.go b/internal/balancer/connections_state.go index e9196ead7..87a6cf9b5 100644 --- a/internal/balancer/connections_state.go +++ b/internal/balancer/connections_state.go @@ -9,7 +9,7 @@ import ( ) type connectionsState struct { - connByNodeID map[uint32]conn.Conn + connByNodeID map[int64]conn.Conn prefer []conn.Conn fallback []conn.Conn @@ -115,11 +115,11 @@ func (s *connectionsState) selectRandomConnection(conns []conn.Conn, allowBanned return nil, failedConns } -func connsToNodeIDMap(conns []conn.Conn) (nodes map[uint32]conn.Conn) { +func connsToNodeIDMap(conns []conn.Conn) (nodes map[int64]conn.Conn) { if len(conns) == 0 { return nil } - nodes = make(map[uint32]conn.Conn, len(conns)) + nodes = make(map[int64]conn.Conn, len(conns)) for _, c := range conns { nodes[c.Endpoint().NodeID()] = c } diff --git a/internal/balancer/connections_state_test.go b/internal/balancer/connections_state_test.go index b052b3933..bbf60b80c 100644 --- a/internal/balancer/connections_state_test.go +++ b/internal/balancer/connections_state_test.go @@ -16,7 +16,7 @@ func TestConnsToNodeIDMap(t *testing.T) { table := []struct { name string source []conn.Conn - res map[uint32]conn.Conn + res map[int64]conn.Conn }{ { name: "Empty", @@ -28,7 +28,7 @@ func TestConnsToNodeIDMap(t *testing.T) { source: []conn.Conn{ &mock.Conn{NodeIDField: 0}, }, - res: map[uint32]conn.Conn{ + res: map[int64]conn.Conn{ 0: &mock.Conn{NodeIDField: 0}, }, }, @@ -38,7 +38,7 @@ func TestConnsToNodeIDMap(t *testing.T) { &mock.Conn{NodeIDField: 1}, &mock.Conn{NodeIDField: 10}, }, - res: map[uint32]conn.Conn{ + res: map[int64]conn.Conn{ 1: &mock.Conn{NodeIDField: 1}, 10: &mock.Conn{NodeIDField: 10}, }, @@ -50,7 +50,7 @@ func TestConnsToNodeIDMap(t *testing.T) { &mock.Conn{NodeIDField: 0}, &mock.Conn{NodeIDField: 10}, }, - res: map[uint32]conn.Conn{ + res: map[int64]conn.Conn{ 0: &mock.Conn{NodeIDField: 0}, 1: &mock.Conn{NodeIDField: 1}, 10: &mock.Conn{NodeIDField: 10}, @@ -264,7 +264,7 @@ func TestNewState(t *testing.T) { &mock.Conn{AddrField: "2", NodeIDField: 2}, }, nil, balancerConfig.Info{}, false), res: &connectionsState{ - connByNodeID: map[uint32]conn.Conn{ + connByNodeID: map[int64]conn.Conn{ 1: &mock.Conn{AddrField: "1", NodeIDField: 1}, 2: &mock.Conn{AddrField: "2", NodeIDField: 2}, }, @@ -290,7 +290,7 @@ func TestNewState(t *testing.T) { return info.SelfLocation == c.Endpoint().Location() }), balancerConfig.Info{SelfLocation: "t"}, false), res: &connectionsState{ - connByNodeID: map[uint32]conn.Conn{ + connByNodeID: map[int64]conn.Conn{ 1: &mock.Conn{AddrField: "t1", NodeIDField: 1, LocationField: "t"}, 2: &mock.Conn{AddrField: "f1", NodeIDField: 2, LocationField: "f"}, 3: &mock.Conn{AddrField: "t2", NodeIDField: 3, LocationField: "t"}, @@ -318,7 +318,7 @@ func TestNewState(t *testing.T) { return info.SelfLocation == c.Endpoint().Location() }), balancerConfig.Info{SelfLocation: "t"}, true), res: &connectionsState{ - connByNodeID: map[uint32]conn.Conn{ + connByNodeID: map[int64]conn.Conn{ 1: &mock.Conn{AddrField: "t1", NodeIDField: 1, LocationField: "t"}, 2: &mock.Conn{AddrField: "f1", NodeIDField: 2, LocationField: "f"}, 3: &mock.Conn{AddrField: "t2", NodeIDField: 3, LocationField: "t"}, @@ -351,7 +351,7 @@ func TestNewState(t *testing.T) { return info.SelfLocation == c.Endpoint().Location() }), balancerConfig.Info{SelfLocation: "t"}, true), res: &connectionsState{ - connByNodeID: map[uint32]conn.Conn{ + connByNodeID: map[int64]conn.Conn{ 1: &mock.Conn{AddrField: "t1", NodeIDField: 1, LocationField: "t"}, 2: &mock.Conn{AddrField: "f1", NodeIDField: 2, LocationField: "f"}, 3: &mock.Conn{AddrField: "t2", NodeIDField: 3, LocationField: "t"}, diff --git a/internal/balancer/ctx.go b/internal/balancer/ctx.go index 9b4aeb209..9bc6c8310 100644 --- a/internal/balancer/ctx.go +++ b/internal/balancer/ctx.go @@ -7,7 +7,7 @@ type ( ) type Endpoint interface { - NodeID() uint32 + NodeID() int64 } func WithEndpoint(ctx context.Context, endpoint Endpoint) context.Context { diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 8192e38e7..8ced21c76 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -96,7 +96,7 @@ func (c *conn) IsState(states ...State) bool { return false } -func (c *conn) NodeID() uint32 { +func (c *conn) NodeID() int64 { if c != nil { return c.endpoint.NodeID() } diff --git a/internal/conn/error.go b/internal/conn/error.go index 216ac2f7b..54d8cf195 100644 --- a/internal/conn/error.go +++ b/internal/conn/error.go @@ -3,12 +3,12 @@ package conn import "fmt" type connError struct { - nodeID uint32 + nodeID int64 endpoint string err error } -func newConnError(id uint32, endpoint string, err error) connError { +func newConnError(id int64, endpoint string, err error) connError { return connError{ nodeID: id, endpoint: endpoint, diff --git a/internal/conn/pool.go b/internal/conn/pool.go index 783b7a880..e6af8440f 100644 --- a/internal/conn/pool.go +++ b/internal/conn/pool.go @@ -20,7 +20,7 @@ import ( type connsKey struct { address string - nodeID uint32 + nodeID int64 } type Pool struct { diff --git a/internal/credentials/access_error.go b/internal/credentials/access_error.go index 777bc3d80..a07e0391d 100644 --- a/internal/credentials/access_error.go +++ b/internal/credentials/access_error.go @@ -57,14 +57,14 @@ func WithDatabase(database string) databaseAuthErrorOption { return databaseAuthErrorOption(database) } -type nodeIDAuthErrorOption uint32 +type nodeIDAuthErrorOption int64 func (id nodeIDAuthErrorOption) applyAuthErrorOption(w io.Writer) { fmt.Fprint(w, "nodeID:") fmt.Fprint(w, strconv.FormatUint(uint64(id), 10)) } -func WithNodeID(id uint32) authErrorOption { +func WithNodeID(id int64) authErrorOption { return nodeIDAuthErrorOption(id) } diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index dfda2660c..e8ddc3392 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -86,7 +86,7 @@ func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e endpoints = append(endpoints, endpoint.New( net.JoinHostPort(e.GetAddress(), strconv.Itoa(int(e.GetPort()))), endpoint.WithLocation(e.GetLocation()), - endpoint.WithID(e.GetNodeId()), + endpoint.WithID(int64(e.GetNodeId())), endpoint.WithLoadFactor(e.GetLoadFactor()), endpoint.WithLocalDC(e.GetLocation() == location), endpoint.WithServices(e.GetService()), diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index 37a889b81..5ab28a1cc 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -7,7 +7,7 @@ import ( ) type Info interface { - NodeID() uint32 + NodeID() int64 Address() string Location() string LastUpdated() time.Time @@ -30,7 +30,7 @@ type Endpoint interface { type endpoint struct { //nolint:maligned mu sync.RWMutex - id uint32 + id int64 address string location string services []string @@ -70,7 +70,7 @@ func (e *endpoint) String() string { ) } -func (e *endpoint) NodeID() uint32 { +func (e *endpoint) NodeID() int64 { e.mu.RLock() defer e.mu.RUnlock() @@ -128,7 +128,7 @@ func (e *endpoint) Touch(opts ...Option) { type Option func(e *endpoint) -func WithID(id uint32) Option { +func WithID(id int64) Option { return func(e *endpoint) { e.id = id } diff --git a/internal/mock/conn.go b/internal/mock/conn.go index b4ceb9f69..b948b7680 100644 --- a/internal/mock/conn.go +++ b/internal/mock/conn.go @@ -14,7 +14,7 @@ type Conn struct { PingErr error AddrField string LocationField string - NodeIDField uint32 + NodeIDField int64 State conn.State LocalDCField bool } @@ -80,14 +80,14 @@ func (c *Conn) Unban(ctx context.Context) conn.State { type Endpoint struct { AddrField string LocationField string - NodeIDField uint32 + NodeIDField int64 LocalDCField bool } func (e *Endpoint) Choose(bool) { } -func (e *Endpoint) NodeID() uint32 { +func (e *Endpoint) NodeID() int64 { return e.NodeIDField } diff --git a/internal/params/parameters.go b/internal/params/parameters.go index aae4e38d0..005397413 100644 --- a/internal/params/parameters.go +++ b/internal/params/parameters.go @@ -61,7 +61,7 @@ func (p *Parameters) String() string { } func (p *Parameters) ToYDB(a *allocator.Allocator) map[string]*Ydb.TypedValue { - if p == nil { + if p == nil || len(*p) == 0 { return nil } parameters := make(map[string]*Ydb.TypedValue, len(*p)) diff --git a/internal/query/client.go b/internal/query/client.go index 3019e83cd..ce5916e04 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -21,7 +21,7 @@ import ( //go:generate mockgen -destination grpc_client_mock_test.go -package query -write_package_comment=false github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1 QueryServiceClient,QueryService_AttachSessionClient,QueryService_ExecuteQueryClient type nodeChecker interface { - HasNode(id uint32) bool + HasNode(id int64) bool } type balancer interface { @@ -197,7 +197,7 @@ func New(ctx context.Context, balancer balancer, cfg *config.Config) *Client { s, err := createSession(createCtx, client.grpcClient, cfg, withSessionCheck(func(s *Session) bool { - return balancer.HasNode(uint32(s.nodeID)) + return balancer.HasNode(s.nodeID) }), ) if err != nil { diff --git a/internal/query/client_test.go b/internal/query/client_test.go index 1b7260750..8ee870a23 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -164,7 +164,7 @@ func newTestSession(id string) *Session { func newTestSessionWithClient(id string, client Ydb_Query_V1.QueryServiceClient) *Session { return &Session{ id: id, - grpcClient: client, + client: client, statusCode: statusIdle, cfg: config.New(), } diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index bf2fef314..8ea85bcc1 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -2,43 +2,29 @@ package query import ( "context" + "io" "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" - "google.golang.org/grpc" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" - "github.com/ydb-platform/ydb-go-sdk/v3/query" ) -type executeConfig interface { - ExecMode() options.ExecMode - StatsMode() options.StatsMode - TxControl() *query.TransactionControl - Syntax() options.Syntax - Params() *params.Parameters - CallOptions() []grpc.CallOption +type withAllocatorOption struct { + a *allocator.Allocator } -func executeQueryRequest(a *allocator.Allocator, sessionID, q string, cfg executeConfig) ( - *Ydb_Query.ExecuteQueryRequest, - []grpc.CallOption, -) { - request := a.QueryExecuteQueryRequest() - - request.SessionId = sessionID - request.ExecMode = Ydb_Query.ExecMode(cfg.ExecMode()) - request.TxControl = cfg.TxControl().ToYDB(a) - request.Query = queryFromText(a, q, Ydb_Query.Syntax(cfg.Syntax())) - request.Parameters = cfg.Params().ToYDB(a) - request.StatsMode = Ydb_Query.StatsMode(cfg.StatsMode()) - request.ConcurrentResultSets = false +func (a withAllocatorOption) ApplyExecuteOption(s *options.Execute) { + s.Allocator = a.a +} - return request, cfg.CallOptions() +func WithAllocator(a *allocator.Allocator) options.ExecuteOption { + return withAllocatorOption{a: a} } func queryFromText( @@ -52,22 +38,68 @@ func queryFromText( return content } -func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient, q string, cfg executeConfig) ( - _ *transaction, _ *result, finalErr error, -) { - a := allocator.New() - defer a.Free() +func ReadAll(ctx context.Context, r *result) (resultSets []*Ydb.ResultSet, stats *Ydb_TableStats.QueryStats, _ error) { + for { + resultSet, err := r.nextResultSet(ctx) + if err != nil { + if xerrors.Is(err, io.EOF) { + return resultSets, resultSet.stats(), nil + } + + return nil, nil, xerrors.WithStackTrace(err) + } + var rows []*Ydb.Value + for { + row, err := resultSet.nextRow(ctx) + if err != nil { + if xerrors.Is(err, io.EOF) { + break + } + + return nil, nil, xerrors.WithStackTrace(err) + } + + rows = append(rows, row.v) + } + + resultSets = append(resultSets, &Ydb.ResultSet{ + Columns: resultSet.columns, + Rows: rows, + }) + } +} - request, callOptions := executeQueryRequest(a, s.id, q, cfg) +func Execute[T options.ExecuteOption]( + ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID, query string, opts ...T, +) (_ *transaction, _ *result, finalErr error) { + var ( + settings = options.ExecuteSettings(opts...) + a = settings.Allocator + ) + + if a == nil { + a = allocator.New() + defer a.Free() + } + + request := a.QueryExecuteQueryRequest() + + request.SessionId = sessionID + request.ExecMode = Ydb_Query.ExecMode(settings.ExecMode) + request.TxControl = settings.TxControl.ToYDB(a) + request.Query = queryFromText(a, query, Ydb_Query.Syntax(settings.Syntax)) + request.Parameters = settings.Params.ToYDB(a) + request.StatsMode = Ydb_Query.StatsMode(settings.StatsMode) + request.ConcurrentResultSets = false executeCtx, cancelExecute := xcontext.WithCancel(xcontext.ValueOnly(ctx)) - stream, err := c.ExecuteQuery(executeCtx, request, callOptions...) + stream, err := client.ExecuteQuery(executeCtx, request, settings.GrpcCallOptions...) if err != nil { return nil, nil, xerrors.WithStackTrace(err) } - r, txID, err := newResult(ctx, stream, s.cfg.Trace(), cancelExecute) + r, txID, err := newResult(ctx, stream, settings.Trace, cancelExecute) if err != nil { cancelExecute() @@ -78,5 +110,5 @@ func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient, return nil, r, nil } - return newTransaction(txID, s), r, nil + return newTx(txID, sessionID, client, settings.Trace), r, nil } diff --git a/internal/query/execute_query_test.go b/internal/query/execute_query_test.go index c3c14d754..5fcdc89ab 100644 --- a/internal/query/execute_query_test.go +++ b/internal/query/execute_query_test.go @@ -14,7 +14,6 @@ import ( "google.golang.org/grpc/metadata" grpcStatus "google.golang.org/grpc/status" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -355,12 +354,32 @@ func TestExecute(t *testing.T) { }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) service := NewMockQueryServiceClient(ctrl) - service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil) - tx, r, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings()) + service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Cond(func(x any) bool { + request, ok := x.(*Ydb_Query.ExecuteQueryRequest) + if !ok { + return false + } + if request.GetSessionId() != "123" { + return false + } + query := request.GetQueryContent() + if query == nil { + return false + } + if query.GetText() != "SELECT 1" { + return false + } + if query.GetSyntax() != Ydb_Query.Syntax_SYNTAX_YQL_V1 { + return false + } + + return true + })).Return(stream, nil) + tx, r, err := Execute[options.ExecuteOption](ctx, service, "123", "SELECT 1") require.NoError(t, err) defer r.Close(ctx) require.EqualValues(t, "456", tx.id) - require.EqualValues(t, "123", tx.s.id) + require.EqualValues(t, "123", tx.sessionID) require.EqualValues(t, -1, r.resultSetIndex) { t.Log("nextResultSet") @@ -469,7 +488,7 @@ func TestExecute(t *testing.T) { service := NewMockQueryServiceClient(ctrl) service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) t.Log("execute") - _, _, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings()) + _, _, err := Execute[options.ExecuteOption](ctx, service, "123", "") require.Error(t, err) require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable)) }) @@ -573,11 +592,11 @@ func TestExecute(t *testing.T) { service := NewMockQueryServiceClient(ctrl) service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil) t.Log("execute") - tx, r, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings()) + tx, r, err := Execute[options.ExecuteOption](ctx, service, "123", "") require.NoError(t, err) defer r.Close(ctx) require.EqualValues(t, "456", tx.id) - require.EqualValues(t, "123", tx.s.id) + require.EqualValues(t, "123", tx.sessionID) require.EqualValues(t, -1, r.resultSetIndex) { t.Log("nextResultSet") @@ -637,7 +656,7 @@ func TestExecute(t *testing.T) { service := NewMockQueryServiceClient(ctrl) service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil) t.Log("execute") - _, _, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings()) + _, _, err := Execute[options.ExecuteOption](ctx, service, "123", "") require.Error(t, err) require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE)) }) @@ -713,11 +732,11 @@ func TestExecute(t *testing.T) { service := NewMockQueryServiceClient(ctrl) service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil) t.Log("execute") - tx, r, err := execute(ctx, newTestSession("123"), service, "", options.ExecuteSettings()) + tx, r, err := Execute[options.ExecuteOption](ctx, service, "123", "") require.NoError(t, err) defer r.Close(ctx) require.EqualValues(t, "456", tx.id) - require.EqualValues(t, "123", tx.s.id) + require.EqualValues(t, "123", tx.sessionID) require.EqualValues(t, -1, r.resultSetIndex) { t.Log("nextResultSet") @@ -757,12 +776,11 @@ func TestExecute(t *testing.T) { } func TestExecuteQueryRequest(t *testing.T) { - a := allocator.New() for _, tt := range []struct { - name string - opts []options.ExecuteOption - request *Ydb_Query.ExecuteQueryRequest - callOptions []grpc.CallOption + name string + opts []options.ExecuteOption + request *Ydb_Query.ExecuteQueryRequest + grpcCallOptions []grpc.CallOption }{ { name: "WithoutOptions", @@ -996,7 +1014,7 @@ func TestExecuteQueryRequest(t *testing.T) { StatsMode: Ydb_Query.StatsMode_STATS_MODE_NONE, ConcurrentResultSets: false, }, - callOptions: []grpc.CallOption{ + grpcCallOptions: []grpc.CallOption{ grpc.Header(&metadata.MD{ "ext-header": []string{"test"}, }), @@ -1004,9 +1022,90 @@ func TestExecuteQueryRequest(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - request, callOptions := executeQueryRequest(a, tt.name, tt.name, options.ExecuteSettings(tt.opts...)) - require.Equal(t, request.String(), tt.request.String()) - require.Equal(t, tt.callOptions, callOptions) + ctx := xtest.Context(t) + ctrl := gomock.NewController(t) + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "456", + }, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + client := NewMockQueryServiceClient(ctrl) + var args []any + if len(tt.grpcCallOptions) > 0 { + for _, grpcOpt := range tt.grpcCallOptions { + args = append(args, grpcOpt) + } + } + client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Cond(func(x any) bool { + request, ok := x.(*Ydb_Query.ExecuteQueryRequest) + if !ok { + return false + } + + return tt.request.String() == request.String() + }), args...).Return(stream, nil) + tx, _, err := Execute(ctx, client, tt.name, tt.name, tt.opts...) + require.NoError(t, err) + require.Equal(t, tt.name, tx.sessionID) + require.Equal(t, "456", tx.id) }) } } diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index 6a306c26d..b92dfb031 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -4,37 +4,43 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "google.golang.org/grpc" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) type ( - Syntax Ydb_Query.Syntax - ExecMode Ydb_Query.ExecMode - StatsMode Ydb_Query.StatsMode - CallOptions []grpc.CallOption - commonExecuteSettings struct { - syntax Syntax - params params.Parameters - execMode ExecMode - statsMode StatsMode - callOptions []grpc.CallOption + Syntax Ydb_Query.Syntax + ExecMode Ydb_Query.ExecMode + StatsMode Ydb_Query.StatsMode + GrpcOpts []grpc.CallOption + executeSettings struct { + Syntax Syntax + Params params.Parameters + ExecMode ExecMode + StatsMode StatsMode + GrpcCallOptions []grpc.CallOption + Trace *trace.Query + Allocator *allocator.Allocator } Execute struct { - commonExecuteSettings + executeSettings - txControl *tx.Control + TxControl *tx.Control } ExecuteOption interface { - applyExecuteOption(s *Execute) + ApplyExecuteOption(s *Execute) } txExecuteSettings struct { - ExecuteSettings *Execute + ExecuteOptions []ExecuteOption - commitTx bool + CommitTx bool } TxExecuteOption interface { - applyTxExecuteOption(s *txExecuteSettings) + ExecuteOption + + ApplyTxExecuteOption(s *txExecuteSettings) } txCommitOption struct{} ParametersOption params.Parameters @@ -43,20 +49,23 @@ type ( } ) -func (opt TxControlOption) applyExecuteOption(s *Execute) { - s.txControl = opt.txControl +func (t txCommitOption) ApplyExecuteOption(s *Execute) { +} + +func (opt TxControlOption) ApplyExecuteOption(s *Execute) { + s.TxControl = opt.txControl } -func (t txCommitOption) applyTxExecuteOption(s *txExecuteSettings) { - s.commitTx = true +func (t txCommitOption) ApplyTxExecuteOption(s *txExecuteSettings) { + s.CommitTx = true } -func (syntax Syntax) applyTxExecuteOption(s *txExecuteSettings) { - syntax.applyExecuteOption(s.ExecuteSettings) +func (syntax Syntax) ApplyTxExecuteOption(s *txExecuteSettings) { + s.ExecuteOptions = append(s.ExecuteOptions, syntax) } -func (syntax Syntax) applyExecuteOption(s *Execute) { - s.syntax = syntax +func (syntax Syntax) ApplyExecuteOption(s *Execute) { + s.Syntax = syntax } const ( @@ -64,36 +73,36 @@ const ( SyntaxPostgreSQL = Syntax(Ydb_Query.Syntax_SYNTAX_PG) ) -func (params ParametersOption) applyTxExecuteOption(s *txExecuteSettings) { - params.applyExecuteOption(s.ExecuteSettings) +func (params ParametersOption) ApplyTxExecuteOption(s *txExecuteSettings) { + s.ExecuteOptions = append(s.ExecuteOptions, params) } -func (params ParametersOption) applyExecuteOption(s *Execute) { - s.params = append(s.params, params...) +func (params ParametersOption) ApplyExecuteOption(s *Execute) { + s.Params = append(s.Params, params...) } -func (opts CallOptions) applyExecuteOption(s *Execute) { - s.callOptions = append(s.callOptions, opts...) +func (opts GrpcOpts) ApplyExecuteOption(s *Execute) { + s.GrpcCallOptions = append(s.GrpcCallOptions, opts...) } -func (opts CallOptions) applyTxExecuteOption(s *txExecuteSettings) { - opts.applyExecuteOption(s.ExecuteSettings) +func (opts GrpcOpts) ApplyTxExecuteOption(s *txExecuteSettings) { + s.ExecuteOptions = append(s.ExecuteOptions, opts) } -func (mode StatsMode) applyTxExecuteOption(s *txExecuteSettings) { - mode.applyExecuteOption(s.ExecuteSettings) +func (mode StatsMode) ApplyTxExecuteOption(s *txExecuteSettings) { + s.ExecuteOptions = append(s.ExecuteOptions, mode) } -func (mode StatsMode) applyExecuteOption(s *Execute) { - s.statsMode = mode +func (mode StatsMode) ApplyExecuteOption(s *Execute) { + s.StatsMode = mode } -func (mode ExecMode) applyTxExecuteOption(s *txExecuteSettings) { - mode.applyExecuteOption(s.ExecuteSettings) +func (mode ExecMode) ApplyTxExecuteOption(s *txExecuteSettings) { + s.ExecuteOptions = append(s.ExecuteOptions, mode) } -func (mode ExecMode) applyExecuteOption(s *Execute) { - s.execMode = mode +func (mode ExecMode) ApplyExecuteOption(s *Execute) { + s.ExecMode = mode } const ( @@ -110,68 +119,37 @@ const ( StatsModeProfile = StatsMode(Ydb_Query.StatsMode_STATS_MODE_PROFILE) ) -func defaultCommonExecuteSettings() commonExecuteSettings { - return commonExecuteSettings{ - syntax: SyntaxYQL, - execMode: ExecModeExecute, - statsMode: StatsModeNone, +func defaultExecuteSettings() executeSettings { + return executeSettings{ + Syntax: SyntaxYQL, + ExecMode: ExecModeExecute, + StatsMode: StatsModeNone, + Trace: &trace.Query{}, } } -func ExecuteSettings(opts ...ExecuteOption) (settings *Execute) { +func ExecuteSettings[T ExecuteOption](opts ...T) (settings *Execute) { settings = &Execute{ - commonExecuteSettings: defaultCommonExecuteSettings(), + executeSettings: defaultExecuteSettings(), } - settings.commonExecuteSettings = defaultCommonExecuteSettings() - settings.txControl = tx.DefaultTxControl() + settings.executeSettings = defaultExecuteSettings() + settings.TxControl = tx.DefaultTxControl() for _, opt := range opts { - if opt != nil { - opt.applyExecuteOption(settings) - } + opt.ApplyExecuteOption(settings) } return settings } -func (s *Execute) TxControl() *tx.Control { - return s.txControl -} - -func (s *Execute) SetTxControl(ctrl *tx.Control) { - s.txControl = ctrl -} - -func (s *commonExecuteSettings) CallOptions() []grpc.CallOption { - return s.callOptions -} - -func (s *commonExecuteSettings) Syntax() Syntax { - return s.syntax -} - -func (s *commonExecuteSettings) ExecMode() ExecMode { - return s.execMode -} - -func (s *commonExecuteSettings) StatsMode() StatsMode { - return s.statsMode -} - -func (s *commonExecuteSettings) Params() *params.Parameters { - if len(s.params) == 0 { - return nil - } - - return &s.params -} - func TxExecuteSettings(id string, opts ...TxExecuteOption) (settings *txExecuteSettings) { settings = &txExecuteSettings{ - ExecuteSettings: ExecuteSettings(WithTxControl(tx.NewControl(tx.WithTxID(id)))), + ExecuteOptions: []ExecuteOption{ + WithTxControl(tx.NewControl(tx.WithTxID(id))), + }, } for _, opt := range opts { if opt != nil { - opt.applyTxExecuteOption(settings) + opt.ApplyTxExecuteOption(settings) } } @@ -191,6 +169,8 @@ var ( _ TxExecuteOption = StatsMode(0) _ TxExecuteOption = txCommitOption{} _ ExecuteOption = TxControlOption{} + _ TxExecuteOption = traceOption{} + _ ExecuteOption = traceOption{} ) func WithCommit() txCommitOption { @@ -215,7 +195,7 @@ func WithStatsMode(mode StatsMode) StatsMode { return mode } -func WithCallOptions(opts ...grpc.CallOption) CallOptions { +func WithCallOptions(opts ...grpc.CallOption) GrpcOpts { return opts } diff --git a/internal/query/options/retry.go b/internal/query/options/retry.go index b604152e3..21bd9723f 100644 --- a/internal/query/options/retry.go +++ b/internal/query/options/retry.go @@ -36,11 +36,8 @@ type ( txSettings tx.Settings } - idempotentOption struct{} - labelOption string - traceOption struct { - t *trace.Query - } + idempotentOption struct{} + labelOption string doTxSettingsOption struct { txSettings tx.Settings } @@ -70,14 +67,6 @@ func (idempotentOption) applyDoOption(s *doSettings) { s.retryOpts = append(s.retryOpts, retry.WithIdempotent(true)) } -func (opt traceOption) applyDoOption(s *doSettings) { - s.trace = s.trace.Compose(opt.t) -} - -func (opt traceOption) applyDoTxOption(s *doTxSettings) { - s.doOpts = append(s.doOpts, opt) -} - func (opt labelOption) applyDoOption(s *doSettings) { s.retryOpts = append(s.retryOpts, retry.WithLabel(string(opt))) } diff --git a/internal/query/options/trace.go b/internal/query/options/trace.go new file mode 100644 index 000000000..3dfaef05d --- /dev/null +++ b/internal/query/options/trace.go @@ -0,0 +1,23 @@ +package options + +import "github.com/ydb-platform/ydb-go-sdk/v3/trace" + +type traceOption struct { + t *trace.Query +} + +func (opt traceOption) ApplyTxExecuteOption(s *txExecuteSettings) { + s.ExecuteOptions = append(s.ExecuteOptions, opt) +} + +func (opt traceOption) ApplyExecuteOption(s *Execute) { + s.Trace = s.Trace.Compose(opt.t) +} + +func (opt traceOption) applyDoOption(s *doSettings) { + s.trace = s.trace.Compose(opt.t) +} + +func (opt traceOption) applyDoTxOption(s *doTxSettings) { + s.doOpts = append(s.doOpts, opt) +} diff --git a/internal/query/result_set.go b/internal/query/result_set.go index 8d2ae8b71..cf2299cc9 100644 --- a/internal/query/result_set.go +++ b/internal/query/result_set.go @@ -7,6 +7,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -46,6 +47,14 @@ func newResultSet( } } +func (rs *resultSet) stats() *Ydb_TableStats.QueryStats { + if rs == nil || rs.currentPart == nil { + return nil + } + + return rs.currentPart.GetExecStats() +} + func (rs *resultSet) nextRow(ctx context.Context) (*row, error) { rs.rowIndex++ for { diff --git a/internal/query/row.go b/internal/query/row.go index 476b6aa14..1a0ce2c5c 100644 --- a/internal/query/row.go +++ b/internal/query/row.go @@ -16,6 +16,7 @@ var _ query.Row = (*row)(nil) type row struct { ctx context.Context trace *trace.Query + v *Ydb.Value indexedScanner scanner.IndexedScanner namedScanner scanner.NamedScanner @@ -28,6 +29,7 @@ func newRow(ctx context.Context, columns []*Ydb.Column, v *Ydb.Value, t *trace.Q return &row{ ctx: ctx, trace: t, + v: v, indexedScanner: scanner.Indexed(data), namedScanner: scanner.Named(data), structScanner: scanner.Struct(data), diff --git a/internal/query/session.go b/internal/query/session.go index 708b36df6..c089f1669 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -26,7 +26,7 @@ type ( cfg *config.Config id string nodeID int64 - grpcClient Ydb_Query_V1.QueryServiceClient + client Ydb_Query_V1.QueryServiceClient statusCode statusCode closeOnce func(ctx context.Context) error checks []func(s *Session) bool @@ -45,7 +45,7 @@ func createSession( ) (s *Session, finalErr error) { s = &Session{ cfg: cfg, - grpcClient: client, + client: client, statusCode: statusUnknown, checks: []func(*Session) bool{ func(s *Session) bool { @@ -68,7 +68,7 @@ func createSession( opt(s) } - onDone := trace.QueryOnSessionCreate(s.cfg.Trace(), &ctx, + onDone := trace.QueryOnSessionCreate(s.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.createSession"), ) defer func() { @@ -99,8 +99,16 @@ func createSession( return s, nil } +func (s *Session) Trace() *trace.Query { + if s == nil || s.cfg == nil { + return nil + } + + return s.cfg.Trace() +} + func (s *Session) attach(ctx context.Context) (finalErr error) { - onDone := trace.QueryOnSessionAttach(s.cfg.Trace(), &ctx, + onDone := trace.QueryOnSessionAttach(s.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).attach"), s) defer func() { onDone(finalErr) @@ -108,7 +116,7 @@ func (s *Session) attach(ctx context.Context) (finalErr error) { attachCtx, cancelAttach := xcontext.WithCancel(xcontext.ValueOnly(ctx)) - attach, err := s.grpcClient.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{ + attach, err := s.client.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{ SessionId: s.id, }) if err != nil { @@ -136,7 +144,7 @@ func (s *Session) attach(ctx context.Context) (finalErr error) { } defer cancel() - if err = deleteSession(ctx, s.grpcClient, s.id); err != nil { + if err = deleteSession(ctx, s.client, s.id); err != nil { return xerrors.WithStackTrace(err) } @@ -192,7 +200,7 @@ func (s *Session) IsAlive() bool { } func (s *Session) Close(ctx context.Context) (err error) { - onDone := trace.QueryOnSessionDelete(s.cfg.Trace(), &ctx, + onDone := trace.QueryOnSessionDelete(s.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Close"), s) defer func() { onDone(err) @@ -223,7 +231,7 @@ func begin( return nil, xerrors.WithStackTrace(err) } - return newTransaction(response.GetTxMeta().GetId(), s), nil + return newTx(response.GetTxMeta().GetId(), s.id, s.client, s.Trace()), nil } func (s *Session) Begin( @@ -234,17 +242,16 @@ func (s *Session) Begin( ) { var tx *transaction - onDone := trace.QueryOnSessionBegin(s.cfg.Trace(), &ctx, + onDone := trace.QueryOnSessionBegin(s.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Begin"), s) defer func() { onDone(err, tx) }() - tx, err = begin(ctx, s.grpcClient, s, txSettings) + tx, err = begin(ctx, s.client, s, txSettings) if err != nil { return nil, xerrors.WithStackTrace(err) } - tx.s = s return tx, nil } @@ -272,13 +279,18 @@ func (s *Session) Status() string { func (s *Session) Execute( ctx context.Context, q string, opts ...options.ExecuteOption, ) (_ query.Transaction, _ query.Result, err error) { - onDone := trace.QueryOnSessionExecute(s.cfg.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Execute"), s, q) + onDone := trace.QueryOnSessionExecute(s.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Execute"), + s, q, + ) defer func() { onDone(err) }() - tx, r, err := execute(ctx, s, s.grpcClient, q, options.ExecuteSettings(opts...)) + a := allocator.New() + defer a.Free() + + tx, r, err := Execute(ctx, s.client, s.ID(), q, append(opts, WithAllocator(a), options.WithTrace(s.Trace()))...) if err != nil { return nil, nil, xerrors.WithStackTrace(err) } diff --git a/internal/query/transaction.go b/internal/query/transaction.go index fbfcd9151..838d72291 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -6,6 +6,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -16,14 +17,19 @@ import ( var _ query.Transaction = (*transaction)(nil) type transaction struct { - id string - s *Session + id string + sessionID string + + client Ydb_Query_V1.QueryServiceClient + trace *trace.Query } -func newTransaction(id string, s *Session) *transaction { +func newTx(txID, sessionID string, client Ydb_Query_V1.QueryServiceClient, trace *trace.Query) *transaction { return &transaction{ - id: id, - s: s, + id: txID, + sessionID: sessionID, + client: client, + trace: trace, } } @@ -34,13 +40,18 @@ func (tx transaction) ID() string { func (tx transaction) Execute(ctx context.Context, q string, opts ...options.TxExecuteOption) ( r query.Result, finalErr error, ) { - onDone := trace.QueryOnTxExecute(tx.s.cfg.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.transaction.Execute"), tx.s, tx, q) + onDone := trace.QueryOnTxExecute(tx.trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.transaction.Execute"), tx, q) defer func() { onDone(finalErr) }() - _, res, err := execute(ctx, tx.s, tx.s.grpcClient, q, options.TxExecuteSettings(tx.id, opts...).ExecuteSettings) + a := allocator.New() + defer a.Free() + + settings := options.TxExecuteSettings(tx.id, opts...) + + _, res, err := Execute(ctx, tx.client, tx.sessionID, q, settings.ExecuteOptions...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -61,7 +72,7 @@ func commitTx(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi } func (tx transaction) CommitTx(ctx context.Context) (err error) { - return commitTx(ctx, tx.s.grpcClient, tx.s.id, tx.id) + return commitTx(ctx, tx.client, tx.sessionID, tx.id) } func rollback(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID, txID string) error { @@ -77,5 +88,5 @@ func rollback(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi } func (tx transaction) Rollback(ctx context.Context) (err error) { - return rollback(ctx, tx.s.grpcClient, tx.s.id, tx.id) + return rollback(ctx, tx.client, tx.sessionID, tx.id) } diff --git a/internal/query/transaction_test.go b/internal/query/transaction_test.go index 83ecdfc6c..adac6977c 100644 --- a/internal/query/transaction_test.go +++ b/internal/query/transaction_test.go @@ -132,14 +132,13 @@ func (s testExecuteSettings) CallOptions() []grpc.CallOption { return s.callOptions } -var _ executeConfig = testExecuteSettings{} - func TestTxExecuteSettings(t *testing.T) { for _, tt := range []struct { name string txID string txOpts []options.TxExecuteOption - settings executeConfig + settings testExecuteSettings + commitTx bool }{ { name: "WithTxID", @@ -218,16 +217,32 @@ func TestTxExecuteSettings(t *testing.T) { params: params.Builder{}.Param("$a").Text("A").Build(), }, }, + { + name: "WithCommit", + txID: "test", + txOpts: []options.TxExecuteOption{ + options.WithCommit(), + }, + settings: testExecuteSettings{ + execMode: options.ExecModeExecute, + statsMode: options.StatsModeNone, + txControl: query.TxControl(query.WithTxID("test")), + syntax: options.SyntaxYQL, + }, + commitTx: true, + }, } { t.Run(tt.name, func(t *testing.T) { a := allocator.New() - settings := options.TxExecuteSettings(tt.txID, tt.txOpts...).ExecuteSettings - require.Equal(t, tt.settings.Syntax(), settings.Syntax()) - require.Equal(t, tt.settings.ExecMode(), settings.ExecMode()) - require.Equal(t, tt.settings.StatsMode(), settings.StatsMode()) - require.Equal(t, tt.settings.TxControl().ToYDB(a).String(), settings.TxControl().ToYDB(a).String()) - require.Equal(t, tt.settings.Params().ToYDB(a), settings.Params().ToYDB(a)) - require.Equal(t, tt.settings.CallOptions(), settings.CallOptions()) + settings := options.TxExecuteSettings(tt.txID, tt.txOpts...) + executeSettings := options.ExecuteSettings(settings.ExecuteOptions...) + require.Equal(t, tt.settings.Syntax(), executeSettings.Syntax) + require.Equal(t, tt.settings.ExecMode(), executeSettings.ExecMode) + require.Equal(t, tt.settings.StatsMode(), executeSettings.StatsMode) + require.Equal(t, tt.settings.TxControl().ToYDB(a).String(), executeSettings.TxControl.ToYDB(a).String()) + require.Equal(t, tt.settings.Params().ToYDB(a), executeSettings.Params.ToYDB(a)) + require.Equal(t, tt.settings.CallOptions(), executeSettings.GrpcCallOptions) + require.Equal(t, tt.commitTx, settings.CommitTx) }) } } diff --git a/internal/query/tx/control.go b/internal/query/tx/control.go index a5be2fb21..922d795b7 100644 --- a/internal/query/tx/control.go +++ b/internal/query/tx/control.go @@ -21,8 +21,8 @@ type ( applyTxControlOption(txControl *Control) } Control struct { - selector Selector - commit bool + Selector Selector + Commit bool } Identifier interface { ID() string @@ -35,8 +35,8 @@ func (ctrl *Control) ToYDB(a *allocator.Allocator) *Ydb_Query.TransactionControl } txControl := a.QueryTransactionControl() - ctrl.selector.applyTxSelector(a, txControl) - txControl.CommitTx = ctrl.commit + ctrl.Selector.applyTxSelector(a, txControl) + txControl.CommitTx = ctrl.Commit return txControl } @@ -49,7 +49,7 @@ var ( type beginTxOptions []Option func (opts beginTxOptions) applyTxControlOption(txControl *Control) { - txControl.selector = opts + txControl.Selector = opts } func (opts beginTxOptions) applyTxSelector(a *allocator.Allocator, txControl *Ydb_Query.TransactionControl) { @@ -76,7 +76,7 @@ var ( type txIDTxControlOption string func (id txIDTxControlOption) applyTxControlOption(txControl *Control) { - txControl.selector = id + txControl.Selector = id } func (id txIDTxControlOption) applyTxSelector(a *allocator.Allocator, txControl *Ydb_Query.TransactionControl) { @@ -96,7 +96,7 @@ func WithTxID(txID string) txIDTxControlOption { type commitTxOption struct{} func (c commitTxOption) applyTxControlOption(txControl *Control) { - txControl.commit = true + txControl.Commit = true } // CommitTx returns commit transaction control option @@ -107,8 +107,8 @@ func CommitTx() ControlOption { // NewControl makes transaction control from given options func NewControl(opts ...ControlOption) *Control { txControl := &Control{ - selector: BeginTx(WithSerializableReadWrite()), - commit: false, + Selector: BeginTx(WithSerializableReadWrite()), + Commit: false, } for _, opt := range opts { if opt != nil { diff --git a/internal/table/client.go b/internal/table/client.go index 7d063346b..5d9e4bad9 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -26,7 +26,7 @@ import ( type sessionBuilder func(ctx context.Context) (*session, error) type nodeChecker interface { - HasNode(id uint32) bool + HasNode(id int64) bool } type balancer interface { diff --git a/internal/table/client_test.go b/internal/table/client_test.go index 82b7eb6af..e1bf06882 100644 --- a/internal/table/client_test.go +++ b/internal/table/client_test.go @@ -937,7 +937,7 @@ func TestDeadlockOnUpdateNodes(t *testing.T) { ctx, cancel := xcontext.WithTimeout(context.Background(), 1*time.Second) defer cancel() var ( - nodes = make([]uint32, 0, 3) + nodes = make([]int64, 0, 3) nodeIDCounter = uint32(0) ) balancer := testutil.NewBalancer(testutil.WithInvokeHandlers(testutil.InvokeHandlers{ @@ -980,7 +980,7 @@ func TestDeadlockOnInternalPoolGCTick(t *testing.T) { ctx, cancel := xcontext.WithTimeout(context.Background(), 1*time.Second) defer cancel() var ( - nodes = make([]uint32, 0, 3) + nodes = make([]int64, 0, 3) nodeIDCounter = uint32(0) ) balancer := testutil.NewBalancer(testutil.WithInvokeHandlers(testutil.InvokeHandlers{ diff --git a/internal/table/data_query.go b/internal/table/data_query.go index f918b7192..9b87b435e 100644 --- a/internal/table/data_query.go +++ b/internal/table/data_query.go @@ -7,7 +7,7 @@ import ( ) type ( - query interface { + queryRenameMe interface { String() string ID() string YQL() string @@ -59,11 +59,11 @@ func (q preparedDataQuery) toYDB(a *allocator.Allocator) *Ydb_Table.Query { return query } -func queryFromText(s string) query { +func queryFromText(s string) queryRenameMe { return textDataQuery(s) } -func queryPrepared(id, query string) query { +func queryPrepared(id, query string) queryRenameMe { return preparedDataQuery{ id: id, query: query, diff --git a/internal/table/session.go b/internal/table/session.go index 98e8ce4db..855ec5bae 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -9,6 +9,7 @@ import ( "sync/atomic" "time" + "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" "github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" @@ -23,6 +24,8 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query" + options2 "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/scanner" @@ -48,32 +51,33 @@ type session struct { onClose []func(s *session) id string tableService Ydb_Table_V1.TableServiceClient + queryService Ydb_Query_V1.QueryServiceClient status table.SessionStatus config *config.Config lastUsage atomic.Int64 statusMtx sync.RWMutex closeOnce sync.Once - nodeID atomic.Uint32 + nodeID atomic.Int64 } func (s *session) LastUsage() time.Time { return time.Unix(s.lastUsage.Load(), 0) } -func nodeID(sessionID string) (uint32, error) { +func nodeID(sessionID string) (int64, error) { u, err := url.Parse(sessionID) if err != nil { return 0, err } - id, err := strconv.ParseUint(u.Query().Get("node_id"), 10, 32) + id, err := strconv.ParseInt(u.Query().Get("node_id"), 10, 32) if err != nil { return 0, err } - return uint32(id), err + return id, err } -func (s *session) NodeID() uint32 { +func (s *session) NodeID() int64 { if s == nil { return 0 } @@ -163,6 +167,17 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config ), ) + s.queryService = Ydb_Query_V1.NewQueryServiceClient( + conn.WithBeforeFunc( + conn.WithContextModifier(cc, func(ctx context.Context) context.Context { + return meta.WithTrailerCallback(balancerContext.WithEndpoint(ctx, s), s.checkCloseHint) + }), + func() { + s.lastUsage.Store(time.Now().Unix()) + }, + ), + ) + return s, nil } @@ -760,15 +775,15 @@ func (s *session) Prepare(ctx context.Context, queryText string) (_ table.Statem func (s *session) Execute( ctx context.Context, txControl *table.TransactionControl, - query string, + q string, parameters *params.Parameters, opts ...options.ExecuteDataQueryOption, ) ( - txr table.Transaction, r result.Result, err error, + _ table.Transaction, r result.Result, finalErr error, ) { var ( a = allocator.New() - q = queryFromText(query) + qq = queryFromText(q) request = options.ExecuteDataQueryDesc{ ExecuteDataQueryRequest: a.TableExecuteDataQueryRequest(), IgnoreTruncated: s.config.IgnoreTruncated(), @@ -780,7 +795,7 @@ func (s *session) Execute( request.SessionId = s.id request.TxControl = txControl.Desc() request.Parameters = parameters.ToYDB(a) - request.Query = q.toYDB(a) + request.Query = qq.toYDB(a) request.QueryCachePolicy = a.TableQueryCachePolicy() request.QueryCachePolicy.KeepInCache = len(request.Parameters) > 0 request.OperationParams = operation.Params(ctx, @@ -795,22 +810,58 @@ func (s *session) Execute( } } - onDone := trace.TableOnSessionQueryExecute( - s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/table.(*session).Execute"), - s, q, parameters, - request.QueryCachePolicy.GetKeepInCache(), + var ( + onDone = trace.TableOnSessionQueryExecute( + s.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/table.(*session).Execute"), + s, qq, parameters, + request.QueryCachePolicy.GetKeepInCache(), + ) + txr *transaction ) defer func() { - onDone(txr, false, r, err) + onDone(txr, false, r, finalErr) }() - result, err := s.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...) + if !request.WithQueryService { + result, err := s.executeDataQuery(ctx, a, request.ExecuteDataQueryRequest, callOptions...) + if err != nil { + return nil, nil, xerrors.WithStackTrace(err) + } + + return s.executeQueryResult(result, request.TxControl, request.IgnoreTruncated) + } + + executeOptions := make([]options2.ExecuteOption, 0, len(opts)+1) + executeOptions = append(executeOptions, query.WithAllocator(a)) + for _, opt := range opts { + executeOptions = append(executeOptions, opt) + } + tx, res, err := query.Execute(ctx, s.queryService, s.ID(), q, executeOptions...) if err != nil { return nil, nil, xerrors.WithStackTrace(err) } + if tx != nil { + txr = &transaction{ + id: tx.ID(), + s: s, + } + if txControl.Desc().GetCommitTx() { + txr.state.Set(txStateCommitted) + } else { + txr.state.Set(txStateInitialized) + txr.control = table.TxControl(table.WithTxID(tx.ID())) + } + } - return s.executeQueryResult(result, request.TxControl, request.IgnoreTruncated) + resultSets, stats, err := query.ReadAll(ctx, res) + if err != nil { + return nil, nil, xerrors.WithStackTrace(err) + } + + return txr, scanner.NewUnary(resultSets, stats, + scanner.WithIgnoreTruncated(true), + ), nil } // executeQueryResult returns Transaction and result built from received @@ -827,9 +878,9 @@ func (s *session) executeQueryResult( s: s, } if txControl.GetCommitTx() { - tx.state.Store(txStateCommitted) + tx.state.Set(txStateCommitted) } else { - tx.state.Store(txStateInitialized) + tx.state.Set(txStateInitialized) tx.control = table.TxControl(table.WithTxID(tx.id)) } @@ -1305,7 +1356,7 @@ func (s *session) BeginTransaction( s: s, control: table.TxControl(table.WithTxID(result.GetTxMeta().GetId())), } - tx.state.Store(txStateInitialized) + tx.state.Set(txStateInitialized) return tx, nil } diff --git a/internal/table/statement.go b/internal/table/statement.go index eb797e2ef..27cdd46ed 100644 --- a/internal/table/statement.go +++ b/internal/table/statement.go @@ -20,7 +20,7 @@ import ( type statement struct { session *session - query query + query queryRenameMe params map[string]*Ydb.Type } diff --git a/internal/table/transaction.go b/internal/table/transaction.go index a07576196..81fe37500 100644 --- a/internal/table/transaction.go +++ b/internal/table/transaction.go @@ -25,15 +25,15 @@ var ( ) type txState struct { - rawVal atomic.Uint32 + atomic.Pointer[txStateEnum] } -func (s *txState) Load() txStateEnum { - return txStateEnum(s.rawVal.Load()) +func (s *txState) Get() txStateEnum { + return *s.Pointer.Load() } -func (s *txState) Store(val txStateEnum) { - s.rawVal.Store(uint32(val)) +func (s *txState) Set(state txStateEnum) { + s.Pointer.Store(&state) } type txStateEnum uint32 @@ -70,7 +70,7 @@ func (tx *transaction) Execute( onDone(r, err) }() - switch tx.state.Load() { + switch tx.state.Get() { case txStateCommitted: return nil, xerrors.WithStackTrace(errTxAlreadyCommitted) case txStateRollbacked: @@ -82,7 +82,7 @@ func (tx *transaction) Execute( } if tx.control.Desc().GetCommitTx() { - tx.state.Store(txStateCommitted) + tx.state.Set(txStateCommitted) } return r, nil @@ -107,7 +107,7 @@ func (tx *transaction) ExecuteStatement( onDone(r, err) }() - switch tx.state.Load() { + switch tx.state.Get() { case txStateCommitted: return nil, xerrors.WithStackTrace(errTxAlreadyCommitted) case txStateRollbacked: @@ -119,7 +119,7 @@ func (tx *transaction) ExecuteStatement( } if tx.control.Desc().GetCommitTx() { - tx.state.Store(txStateCommitted) + tx.state.Set(txStateCommitted) } return r, nil @@ -140,7 +140,7 @@ func (tx *transaction) CommitTx( onDone(err) }() - switch tx.state.Load() { + switch tx.state.Get() { case txStateCommitted: return nil, xerrors.WithStackTrace(errTxAlreadyCommitted) case txStateRollbacked: @@ -177,7 +177,7 @@ func (tx *transaction) CommitTx( return nil, xerrors.WithStackTrace(err) } - tx.state.Store(txStateCommitted) + tx.state.Set(txStateCommitted) return scanner.NewUnary( nil, @@ -198,7 +198,7 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) { onDone(err) }() - switch tx.state.Load() { + switch tx.state.Get() { case txStateCommitted: return nil // nop for committed tx case txStateRollbacked: @@ -220,7 +220,7 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) { return xerrors.WithStackTrace(err) } - tx.state.Store(txStateRollbacked) + tx.state.Set(txStateRollbacked) return nil } diff --git a/log/query.go b/log/query.go index e44e9e3ac..c52a68d5b 100644 --- a/log/query.go +++ b/log/query.go @@ -413,9 +413,7 @@ func internalQuery( } ctx := with(*info.Context, TRACE, "ydb", "query", "transaction", "execute") l.Log(ctx, "start", - String("SessionID", info.Session.ID()), String("TransactionID", info.Tx.ID()), - String("SessionStatus", info.Session.Status()), ) start := time.Now() diff --git a/metrics/node_id.go b/metrics/node_id.go index 176eace55..a06fdba6c 100644 --- a/metrics/node_id.go +++ b/metrics/node_id.go @@ -4,6 +4,6 @@ import ( "strconv" ) -func idToString(id uint32) string { - return strconv.FormatUint(uint64(id), 10) +func idToString(id int64) string { + return strconv.FormatInt(id, 10) } diff --git a/query/session.go b/query/session.go index 14da2f3d5..d53bd3854 100644 --- a/query/session.go +++ b/query/session.go @@ -78,6 +78,6 @@ func WithStatsMode(mode options.StatsMode) options.StatsModeOption { return options.WithStatsMode(mode) } -func WithCallOptions(opts ...grpc.CallOption) options.CallOptions { +func WithCallOptions(opts ...grpc.CallOption) options.GrpcOpts { return options.WithCallOptions(opts...) } diff --git a/table/options/options.go b/table/options/options.go index 922333a17..0a74c07fd 100644 --- a/table/options/options.go +++ b/table/options/options.go @@ -2,10 +2,12 @@ package options import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" "google.golang.org/grpc" "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/types" "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" ) @@ -834,22 +836,16 @@ type ( ExecuteDataQueryDesc struct { *Ydb_Table.ExecuteDataQueryRequest - IgnoreTruncated bool + IgnoreTruncated bool + WithQueryService bool } ExecuteDataQueryOption interface { + options.ExecuteOption + ApplyExecuteDataQueryOption(d *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption } - executeDataQueryOptionFunc func(d *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption ) -func (f executeDataQueryOptionFunc) ApplyExecuteDataQueryOption( - d *ExecuteDataQueryDesc, a *allocator.Allocator, -) []grpc.CallOption { - return f(d, a) -} - -var _ ExecuteDataQueryOption = executeDataQueryOptionFunc(nil) - type ( CommitTransactionDesc Ydb_Table.CommitTransactionRequest CommitTransactionOption func(*CommitTransactionDesc) @@ -871,6 +867,10 @@ func WithKeepInCache(keepInCache bool) ExecuteDataQueryOption { type withCallOptions []grpc.CallOption +func (opts withCallOptions) ApplyExecuteOption(s *options.Execute) { + s.GrpcCallOptions = append(s.GrpcCallOptions, opts...) +} + func (opts withCallOptions) ApplyExecuteScanQueryOption(d *ExecuteScanQueryDesc) []grpc.CallOption { return opts } @@ -890,22 +890,59 @@ func WithCallOptions(opts ...grpc.CallOption) withCallOptions { return opts } +type withCommitExecuteDataQueryOption struct{} + +func (withCommitExecuteDataQueryOption) ApplyExecuteOption(s *options.Execute) { + s.TxControl.Commit = true +} + +func (withCommitExecuteDataQueryOption) ApplyExecuteDataQueryOption( + d *ExecuteDataQueryDesc, a *allocator.Allocator, +) []grpc.CallOption { + d.TxControl.CommitTx = true + + return nil +} + // WithCommit appends flag of commit transaction with executing query func WithCommit() ExecuteDataQueryOption { - return executeDataQueryOptionFunc(func(desc *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption { - desc.TxControl.CommitTx = true + return withCommitExecuteDataQueryOption{} +} - return nil - }) +type useQueryServiceExecuteOption struct{} + +func (useQueryServiceExecuteOption) ApplyExecuteOption(s *options.Execute) { +} + +func (u useQueryServiceExecuteOption) ApplyExecuteDataQueryOption( + d *ExecuteDataQueryDesc, a *allocator.Allocator, +) []grpc.CallOption { + d.WithQueryService = true + + return nil +} + +// WithQueryService redirects request to query service client +func WithQueryService() ExecuteDataQueryOption { + return useQueryServiceExecuteOption{} +} + +type withIgnoreTruncatedOption struct{} + +func (withIgnoreTruncatedOption) ApplyExecuteOption(s *options.Execute) { +} + +func (w withIgnoreTruncatedOption) ApplyExecuteDataQueryOption( + d *ExecuteDataQueryDesc, a *allocator.Allocator, +) []grpc.CallOption { + d.IgnoreTruncated = true + + return nil } // WithIgnoreTruncated mark truncated result as good (without error) func WithIgnoreTruncated() ExecuteDataQueryOption { - return executeDataQueryOptionFunc(func(desc *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption { - desc.IgnoreTruncated = true - - return nil - }) + return withIgnoreTruncatedOption{} } // WithQueryCachePolicyKeepInCache manages keep-in-cache policy @@ -932,20 +969,29 @@ func WithQueryCachePolicy(opts ...QueryCachePolicyOption) ExecuteDataQueryOption return withQueryCachePolicy(opts...) } -func withQueryCachePolicy(opts ...QueryCachePolicyOption) ExecuteDataQueryOption { - return executeDataQueryOptionFunc(func(d *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption { - if d.QueryCachePolicy == nil { - d.QueryCachePolicy = a.TableQueryCachePolicy() - d.QueryCachePolicy.KeepInCache = true - } - for _, opt := range opts { - if opt != nil { - opt((*queryCachePolicy)(d.QueryCachePolicy), a) - } +type withQueryCachePolicyOption []QueryCachePolicyOption + +func (opts withQueryCachePolicyOption) ApplyExecuteOption(s *options.Execute) { +} + +func (opts withQueryCachePolicyOption) ApplyExecuteDataQueryOption( + d *ExecuteDataQueryDesc, a *allocator.Allocator, +) []grpc.CallOption { + if d.QueryCachePolicy == nil { + d.QueryCachePolicy = a.TableQueryCachePolicy() + d.QueryCachePolicy.KeepInCache = true + } + for _, opt := range opts { + if opt != nil { + opt((*queryCachePolicy)(d.QueryCachePolicy), a) } + } - return nil - }) + return nil +} + +func withQueryCachePolicy(opts ...QueryCachePolicyOption) ExecuteDataQueryOption { + return withQueryCachePolicyOption(opts) } func WithCommitCollectStatsModeNone() CommitTransactionOption { @@ -960,20 +1006,40 @@ func WithCommitCollectStatsModeBasic() CommitTransactionOption { } } +type withCollectStatsModeNoneOption struct{} + +func (withCollectStatsModeNoneOption) ApplyExecuteOption(s *options.Execute) { + s.StatsMode = options.StatsMode(Ydb_Query.StatsMode_STATS_MODE_NONE) +} + +func (withCollectStatsModeNoneOption) ApplyExecuteDataQueryOption( + d *ExecuteDataQueryDesc, a *allocator.Allocator, +) []grpc.CallOption { + d.CollectStats = Ydb_Table.QueryStatsCollection_STATS_COLLECTION_NONE + + return nil +} + func WithCollectStatsModeNone() ExecuteDataQueryOption { - return executeDataQueryOptionFunc(func(d *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption { - d.CollectStats = Ydb_Table.QueryStatsCollection_STATS_COLLECTION_NONE + return withCollectStatsModeNoneOption{} +} - return nil - }) +type withCollectStatsModeBasicOption struct{} + +func (withCollectStatsModeBasicOption) ApplyExecuteOption(s *options.Execute) { + s.StatsMode = options.StatsMode(Ydb_Query.StatsMode_STATS_MODE_BASIC) } -func WithCollectStatsModeBasic() ExecuteDataQueryOption { - return executeDataQueryOptionFunc(func(d *ExecuteDataQueryDesc, a *allocator.Allocator) []grpc.CallOption { - d.CollectStats = Ydb_Table.QueryStatsCollection_STATS_COLLECTION_BASIC +func (withCollectStatsModeBasicOption) ApplyExecuteDataQueryOption( + d *ExecuteDataQueryDesc, a *allocator.Allocator, +) []grpc.CallOption { + d.CollectStats = Ydb_Table.QueryStatsCollection_STATS_COLLECTION_BASIC - return nil - }) + return nil +} + +func WithCollectStatsModeBasic() ExecuteDataQueryOption { + return withCollectStatsModeBasicOption{} } type ( diff --git a/table/table.go b/table/table.go index 1cb1ed8e0..4cf3b0a1a 100644 --- a/table/table.go +++ b/table/table.go @@ -80,7 +80,7 @@ const ( type SessionInfo interface { ID() string - NodeID() uint32 + NodeID() int64 Status() SessionStatus LastUsage() time.Time } diff --git a/tests/integration/table_use_query_test.go b/tests/integration/table_use_query_test.go new file mode 100644 index 000000000..0a961dd24 --- /dev/null +++ b/tests/integration/table_use_query_test.go @@ -0,0 +1,99 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" +) + +func TestWithQueryService(t *testing.T) { + if version.Lt(os.Getenv("YDB_VERSION"), "24.1") { + t.Skip("query service not allowed in YDB version '" + os.Getenv("YDB_VERSION") + "'") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := ydb.Open(ctx, + os.Getenv("YDB_CONNECTION_STRING"), + ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")), + ) + require.NoError(t, err) + t.Run("table.Session.Execute", func(t *testing.T) { + var abc, def int32 + err = db.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, res, err := s.Execute(ctx, table.DefaultTxControl(), + `SELECT 123 as abc, 456 as def;`, nil, + options.WithQueryService(), + ) + if err != nil { + return err + } + err = res.NextResultSetErr(ctx) + if err != nil { + return err + } + if !res.NextRow() { + if err = res.Err(); err != nil { + return err + } + return fmt.Errorf("unexpected empty result set") + } + var abc, def int32 + err = res.ScanNamed( + named.Required("abc", &abc), + named.Required("def", &def), + ) + if err != nil { + return err + } + t.Log(abc, def) + return res.Err() + }, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly()))) + require.NoError(t, err) + require.EqualValues(t, 123, abc) + require.EqualValues(t, 456, def) + }) + t.Run("table.Transaction.Execute", func(t *testing.T) { + err = db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) (err error) { + res, err := tx.Execute(ctx, `SELECT 1 as abc, 2 as def;`, nil) + if err != nil { + return err + } + err = res.NextResultSetErr(ctx) + if err != nil { + return err + } + if !res.NextRow() { + if err = res.Err(); err != nil { + return err + } + return fmt.Errorf("unexpected empty result set") + } + var abc, def int32 + err = res.ScanNamed( + named.Required("abc", &abc), + named.Required("ghi", &def), + ) + if err != nil { + return err + } + t.Log(abc, def) + return res.Err() + }, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly()))) + require.Error(t, err) + require.ErrorContains(t, err, "not found column 'ghi'") + }) +} diff --git a/testutil/driver.go b/testutil/driver.go index 48b82d4ca..e55cd6975 100644 --- a/testutil/driver.go +++ b/testutil/driver.go @@ -138,7 +138,7 @@ type balancerStub struct { ) (grpc.ClientStream, error) } -func (b *balancerStub) HasNode(id uint32) bool { +func (b *balancerStub) HasNode(id int64) bool { return true } diff --git a/trace/driver.go b/trace/driver.go index 8cf5dedc3..042d50579 100644 --- a/trace/driver.go +++ b/trace/driver.go @@ -139,7 +139,7 @@ type ConnState interface { type EndpointInfo interface { fmt.Stringer - NodeID() uint32 + NodeID() int64 Address() string Location() string LoadFactor() float32 diff --git a/trace/query.go b/trace/query.go index 3fbaef522..adca7dee9 100644 --- a/trace/query.go +++ b/trace/query.go @@ -135,9 +135,8 @@ type ( Context *context.Context Call call - Session querySessionInfo - Tx queryTransactionInfo - Query string + Tx queryTransactionInfo + Query string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals QueryTxExecuteDoneInfo struct { diff --git a/trace/query_gtrace.go b/trace/query_gtrace.go index 2bbc46f3c..0bf3cc7aa 100644 --- a/trace/query_gtrace.go +++ b/trace/query_gtrace.go @@ -1460,11 +1460,10 @@ func QueryOnSessionBegin(t *Query, c *context.Context, call call, session queryS } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func QueryOnTxExecute(t *Query, c *context.Context, call call, session querySessionInfo, tx queryTransactionInfo, query string) func(error) { +func QueryOnTxExecute(t *Query, c *context.Context, call call, tx queryTransactionInfo, query string) func(error) { var p QueryTxExecuteStartInfo p.Context = c p.Call = call - p.Session = session p.Tx = tx p.Query = query res := t.onTxExecute(p) diff --git a/trace/table.go b/trace/table.go index d8f0622e7..26de5a29c 100644 --- a/trace/table.go +++ b/trace/table.go @@ -96,7 +96,7 @@ type ( } tableSessionInfo interface { ID() string - NodeID() uint32 + NodeID() int64 Status() string LastUsage() time.Time }