diff --git a/internal/query/client.go b/internal/query/client.go index 99b9055d9..556aea07d 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -186,7 +186,7 @@ func (c *Client) ExecuteScript( return op, nil } -func (p *poolStub) Close(ctx context.Context) error { +func (p *poolStub) Close(context.Context) error { return nil } diff --git a/internal/query/result_range_test.go b/internal/query/result_range_test.go deleted file mode 100644 index 34024a5e9..000000000 --- a/internal/query/result_range_test.go +++ /dev/null @@ -1,362 +0,0 @@ -//go:build go1.23 - -package query - -import ( - "context" - "io" - "testing" - - "github.com/stretchr/testify/require" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" - "go.uber.org/mock/gomock" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" -) - -func TestResultRange(t *testing.T) { - ctx, cancel := context.WithCancel(xtest.Context(t)) - defer cancel() - ctrl := gomock.NewController(t) - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 4, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "4", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 5, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "5", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 1, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "c", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "d", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - { - Name: "e", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_BOOL, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: true, - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: false, - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 1, - ResultSet: &Ydb.ResultSet{ - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: true, - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 4, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "4", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: false, - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 5, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "5", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: false, - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 2, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "c", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "d", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - { - Name: "e", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_BOOL, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: true, - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: false, - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 2, - ResultSet: &Ydb.ResultSet{ - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: true, - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 4, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "4", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: false, - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 5, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "5", - }, - }, { - Value: &Ydb.Value_BoolValue{ - BoolValue: false, - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(nil, io.EOF) - r, _, err := newResult(ctx, stream, nil) - require.NoError(t, err) - defer r.Close(ctx) - rsCount := 0 - for rs, err := range r.ResultSets(ctx) { - require.NoError(t, err) - rowsCount := 0 - for _, err := range rs.Rows(ctx) { - require.NoError(t, err) - rowsCount++ - } - require.EqualValues(t, 5, rowsCount) - rsCount++ - } - require.EqualValues(t, 3, rsCount) -} diff --git a/internal/query/result_set_range_test.go b/internal/query/result_set_range_test.go deleted file mode 100644 index a1c349235..000000000 --- a/internal/query/result_set_range_test.go +++ /dev/null @@ -1,938 +0,0 @@ -//go:build go1.23 - -package query - -import ( - "context" - "fmt" - "io" - "testing" - - "github.com/stretchr/testify/require" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" - "go.uber.org/mock/gomock" - grpcCodes "google.golang.org/grpc/codes" - grpcStatus "google.golang.org/grpc/status" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" -) - -func TestResultSetRange(t *testing.T) { - ctx := xtest.Context(t) - ctrl := gomock.NewController(t) - t.Run("EmptyResultSet", func(t *testing.T) { - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{}, - }, - }, nil) - stream.EXPECT().Recv().Return(nil, io.EOF) - recv, err := stream.Recv() - require.NoError(t, err) - rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { - part, err := stream.Recv() - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - - return part, nil - }, recv) - require.EqualValues(t, 0, rs.index) - count := 0 - for _, err := range rs.Rows(ctx) { - require.NoError(t, err) - count++ - } - require.EqualValues(t, 0, count) - }) - t.Run("SecondResultSetEmpty", func(t *testing.T) { - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Rows: []*Ydb.Value{}, - }, - }, nil) - stream.EXPECT().Recv().Return(nil, io.EOF) - recv, err := stream.Recv() - require.NoError(t, err) - rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { - part, err := stream.Recv() - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - - return part, nil - }, recv) - require.EqualValues(t, 0, rs.index) - count := 0 - for row, err := range rs.Rows(ctx) { - require.NoError(t, err) - require.EqualValues(t, count, rs.rowIndex) - var ( - a uint64 - b string - ) - err := row.Scan(&a, &b) - require.NoError(t, err) - count++ - require.EqualValues(t, count, a) - require.EqualValues(t, fmt.Sprintf("%v", count), b) - } - require.EqualValues(t, count, 3) - }) - t.Run("BreakIterate", func(t *testing.T) { - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - recv, err := stream.Recv() - require.NoError(t, err) - rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { - part, err := stream.Recv() - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - - return part, nil - }, recv) - require.EqualValues(t, 0, rs.index) - count := 0 - for _, err := range rs.Rows(ctx) { - require.NoError(t, err) - require.EqualValues(t, count, rs.rowIndex) - if count > 0 { - break - } - count++ - } - require.EqualValues(t, count, 1) - }) - t.Run("IntermediateResultSetEmpty", func(t *testing.T) { - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Rows: []*Ydb.Value{}, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 4, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "4", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 5, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "5", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 6, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "6", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(nil, io.EOF) - recv, err := stream.Recv() - require.NoError(t, err) - rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { - part, err := stream.Recv() - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - - return part, nil - }, recv) - require.EqualValues(t, 0, rs.index) - count := 0 - for row, err := range rs.Rows(ctx) { - require.NoError(t, err) - var ( - a uint64 - b string - ) - err := row.Scan(&a, &b) - require.NoError(t, err) - count++ - require.EqualValues(t, count, a) - require.EqualValues(t, fmt.Sprintf("%v", count), b) - } - require.EqualValues(t, count, 6) - }) - t.Run("OverTwoParts", func(t *testing.T) { - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 4, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "4", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 5, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "5", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(nil, io.EOF) - recv, err := stream.Recv() - require.NoError(t, err) - rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { - part, err := stream.Recv() - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - - return part, nil - }, recv) - require.EqualValues(t, 0, rs.index) - count := 0 - for row, err := range rs.Rows(ctx) { - require.NoError(t, err) - var ( - a uint64 - b string - ) - err := row.Scan(&a, &b) - require.NoError(t, err) - count++ - require.EqualValues(t, count, a) - require.EqualValues(t, fmt.Sprintf("%v", count), b) - } - require.EqualValues(t, count, 5) - }) - t.Run("CanceledContext", func(t *testing.T) { - childCtx, cancel := context.WithCancel(xtest.Context(t)) - defer cancel() - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - recv, err := stream.Recv() - require.NoError(t, err) - rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { - part, err := stream.Recv() - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - - return part, nil - }, recv) - require.EqualValues(t, 0, rs.index) - var ( - count = 0 - cancelled = false - ) - for _, err := range rs.Rows(childCtx) { - count++ - if !cancelled { - require.NoError(t, err) - cancel() - cancelled = true - } else { - require.ErrorIs(t, err, context.Canceled) - } - } - require.EqualValues(t, count, 2) - }) - t.Run("OperationError", func(t *testing.T) { - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(nil, xerrors.Operation(xerrors.WithStatusCode( - Ydb.StatusIds_OVERLOADED, - ))) - recv, err := stream.Recv() - require.NoError(t, err) - rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { - part, err := nextPart(stream) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - if resultSetIndex := part.GetResultSetIndex(); resultSetIndex != 0 { - return nil, xerrors.WithStackTrace(fmt.Errorf( - "%w: %d != %d", - errWrongNextResultSetIndex, - resultSetIndex, 0, - )) - } - - return part, nil - }, recv) - require.EqualValues(t, 0, rs.index) - count := 0 - for _, err := range rs.Rows(ctx) { - if count < 3 { - require.NoError(t, err) - } else { - require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_OVERLOADED)) - } - count++ - } - require.EqualValues(t, count, 4) - }) - t.Run("TransportError", func(t *testing.T) { - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) - recv, err := stream.Recv() - require.NoError(t, err) - rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { - part, err := nextPart(stream) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - if resultSetIndex := part.GetResultSetIndex(); resultSetIndex != 0 { - return nil, xerrors.WithStackTrace(fmt.Errorf( - "%w: %d != %d", - errWrongNextResultSetIndex, - resultSetIndex, 0, - )) - } - - return part, nil - }, recv) - require.EqualValues(t, 0, rs.index) - count := 0 - for _, err := range rs.Rows(ctx) { - if count < 3 { - require.NoError(t, err) - } else { - require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable)) - } - count++ - } - require.EqualValues(t, count, 4) - }) - t.Run("WrongResultSetIndex", func(t *testing.T) { - stream := NewMockQueryService_ExecuteQueryClient(ctrl) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 0, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ - Status: Ydb.StatusIds_SUCCESS, - ResultSetIndex: 1, - ResultSet: &Ydb.ResultSet{ - Columns: []*Ydb.Column{ - { - Name: "a", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UINT64, - }, - }, - }, - { - Name: "b", - Type: &Ydb.Type{ - Type: &Ydb.Type_TypeId{ - TypeId: Ydb.Type_UTF8, - }, - }, - }, - }, - Rows: []*Ydb.Value{ - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 1, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "1", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 2, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "2", - }, - }}, - }, - { - Items: []*Ydb.Value{{ - Value: &Ydb.Value_Uint64Value{ - Uint64Value: 3, - }, - }, { - Value: &Ydb.Value_TextValue{ - TextValue: "3", - }, - }}, - }, - }, - }, - }, nil) - recv, err := stream.Recv() - require.NoError(t, err) - rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { - part, err := nextPart(stream) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - - return part, nil - }, recv) - require.EqualValues(t, 0, rs.index) - count := 0 - for _, err := range rs.Rows(ctx) { - if count < 3 { - require.NoError(t, err) - } else { - require.ErrorIs(t, err, errWrongResultSetIndex) - } - count++ - } - require.EqualValues(t, count, 4) - }) -} diff --git a/internal/query/result_set_test.go b/internal/query/result_set_test.go index 0d21f5ada..8fc9d9ef0 100644 --- a/internal/query/result_set_test.go +++ b/internal/query/result_set_test.go @@ -960,3 +960,921 @@ func TestResultSetNext(t *testing.T) { require.EqualValues(t, []string{"Uint64", "Utf8"}, types) }) } + +func TestResultSetRange(t *testing.T) { + ctx := xtest.Context(t) + ctrl := gomock.NewController(t) + t.Run("EmptyResultSet", func(t *testing.T) { + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{}, + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF) + recv, err := stream.Recv() + require.NoError(t, err) + rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + part, err := stream.Recv() + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return part, nil + }, recv) + require.EqualValues(t, 0, rs.index) + count := 0 + for _, err := range rs.Rows(ctx) { + require.NoError(t, err) + count++ + } + require.EqualValues(t, 0, count) + }) + t.Run("SecondResultSetEmpty", func(t *testing.T) { + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Rows: []*Ydb.Value{}, + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF) + recv, err := stream.Recv() + require.NoError(t, err) + rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + part, err := stream.Recv() + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return part, nil + }, recv) + require.EqualValues(t, 0, rs.index) + count := 0 + for row, err := range rs.Rows(ctx) { + require.NoError(t, err) + require.EqualValues(t, count, rs.rowIndex) + var ( + a uint64 + b string + ) + err := row.Scan(&a, &b) + require.NoError(t, err) + count++ + require.EqualValues(t, count, a) + require.EqualValues(t, fmt.Sprintf("%v", count), b) + } + require.EqualValues(t, count, 3) + }) + t.Run("BreakIterate", func(t *testing.T) { + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + recv, err := stream.Recv() + require.NoError(t, err) + rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + part, err := stream.Recv() + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return part, nil + }, recv) + require.EqualValues(t, 0, rs.index) + count := 0 + for _, err := range rs.Rows(ctx) { + require.NoError(t, err) + require.EqualValues(t, count, rs.rowIndex) + if count > 0 { + break + } + count++ + } + require.EqualValues(t, count, 1) + }) + t.Run("IntermediateResultSetEmpty", func(t *testing.T) { + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Rows: []*Ydb.Value{}, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 6, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "6", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF) + recv, err := stream.Recv() + require.NoError(t, err) + rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + part, err := stream.Recv() + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return part, nil + }, recv) + require.EqualValues(t, 0, rs.index) + count := 0 + for row, err := range rs.Rows(ctx) { + require.NoError(t, err) + var ( + a uint64 + b string + ) + err := row.Scan(&a, &b) + require.NoError(t, err) + count++ + require.EqualValues(t, count, a) + require.EqualValues(t, fmt.Sprintf("%v", count), b) + } + require.EqualValues(t, count, 6) + }) + t.Run("OverTwoParts", func(t *testing.T) { + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF) + recv, err := stream.Recv() + require.NoError(t, err) + rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + part, err := stream.Recv() + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return part, nil + }, recv) + require.EqualValues(t, 0, rs.index) + count := 0 + for row, err := range rs.Rows(ctx) { + require.NoError(t, err) + var ( + a uint64 + b string + ) + err := row.Scan(&a, &b) + require.NoError(t, err) + count++ + require.EqualValues(t, count, a) + require.EqualValues(t, fmt.Sprintf("%v", count), b) + } + require.EqualValues(t, count, 5) + }) + t.Run("CanceledContext", func(t *testing.T) { + childCtx, cancel := context.WithCancel(xtest.Context(t)) + defer cancel() + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + recv, err := stream.Recv() + require.NoError(t, err) + rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + part, err := stream.Recv() + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return part, nil + }, recv) + require.EqualValues(t, 0, rs.index) + var ( + count = 0 + cancelled = false + ) + for _, err := range rs.Rows(childCtx) { + count++ + if !cancelled { + require.NoError(t, err) + cancel() + cancelled = true + } else { + require.ErrorIs(t, err, context.Canceled) + } + } + require.EqualValues(t, count, 2) + }) + t.Run("OperationError", func(t *testing.T) { + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(nil, xerrors.Operation(xerrors.WithStatusCode( + Ydb.StatusIds_OVERLOADED, + ))) + recv, err := stream.Recv() + require.NoError(t, err) + rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + part, err := nextPart(stream) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + if resultSetIndex := part.GetResultSetIndex(); resultSetIndex != 0 { + return nil, xerrors.WithStackTrace(fmt.Errorf( + "%w: %d != %d", + errWrongNextResultSetIndex, + resultSetIndex, 0, + )) + } + + return part, nil + }, recv) + require.EqualValues(t, 0, rs.index) + count := 0 + for _, err := range rs.Rows(ctx) { + if count < 3 { + require.NoError(t, err) + } else { + require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_OVERLOADED)) + } + count++ + } + require.EqualValues(t, count, 4) + }) + t.Run("TransportError", func(t *testing.T) { + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) + recv, err := stream.Recv() + require.NoError(t, err) + rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + part, err := nextPart(stream) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + if resultSetIndex := part.GetResultSetIndex(); resultSetIndex != 0 { + return nil, xerrors.WithStackTrace(fmt.Errorf( + "%w: %d != %d", + errWrongNextResultSetIndex, + resultSetIndex, 0, + )) + } + + return part, nil + }, recv) + require.EqualValues(t, 0, rs.index) + count := 0 + for _, err := range rs.Rows(ctx) { + if count < 3 { + require.NoError(t, err) + } else { + require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable)) + } + count++ + } + require.EqualValues(t, count, 4) + }) + t.Run("WrongResultSetIndex", func(t *testing.T) { + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 1, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + recv, err := stream.Recv() + require.NoError(t, err) + rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { + part, err := nextPart(stream) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return part, nil + }, recv) + require.EqualValues(t, 0, rs.index) + count := 0 + for _, err := range rs.Rows(ctx) { + if count < 3 { + require.NoError(t, err) + } else { + require.ErrorIs(t, err, errWrongResultSetIndex) + } + count++ + } + require.EqualValues(t, count, 4) + }) +} diff --git a/internal/query/result_test.go b/internal/query/result_test.go index eab0712a0..eda60808b 100644 --- a/internal/query/result_test.go +++ b/internal/query/result_test.go @@ -18,6 +18,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) func TestResultNextResultSet(t *testing.T) { @@ -1435,6 +1436,153 @@ func TestExactlyOneResultSetFromResult(t *testing.T) { }) } +func TestCloseResultOnCloseClosableResultSet(t *testing.T) { + ctx := xtest.Context(t) + ctrl := gomock.NewController(t) + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "456", + }, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "456", + }, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF) + var closed bool + r, _, err := newResult(ctx, stream, withTrace(&trace.Query{ + OnResultClose: func(info trace.QueryResultCloseStartInfo) func(info trace.QueryResultCloseDoneInfo) { + require.False(t, closed) + closed = true + + return nil + }, + })) + + require.NoError(t, err) + + rs, err := readResultSet(ctx, r) + require.NoError(t, err) + var ( + a uint64 + b string + ) + r1, err1 := rs.NextRow(ctx) + require.NoError(t, err1) + require.NotNil(t, r1) + scanErr1 := r1.Scan(&a, &b) + require.NoError(t, scanErr1) + require.EqualValues(t, 1, a) + require.EqualValues(t, "1", b) + r2, err2 := rs.NextRow(ctx) + require.NoError(t, err2) + require.NotNil(t, r2) + scanErr2 := r2.Scan(&a, &b) + require.NoError(t, scanErr2) + require.EqualValues(t, 2, a) + require.EqualValues(t, "2", b) + r3, err3 := rs.NextRow(ctx) + require.ErrorIs(t, err3, io.EOF) + require.Nil(t, r3) + err = rs.Close(ctx) + require.NoError(t, err) + require.True(t, closed) +} + func TestResultStats(t *testing.T) { t.Run("Stats", func(t *testing.T) { t.Run("Never", func(t *testing.T) { @@ -4286,3 +4434,349 @@ func TestMaterializedResultStats(t *testing.T) { }) }) } + +func TestResultRange(t *testing.T) { + ctx, cancel := context.WithCancel(xtest.Context(t)) + defer cancel() + ctrl := gomock.NewController(t) + stream := NewMockQueryService_ExecuteQueryClient(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 1, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "c", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "d", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + { + Name: "e", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_BOOL, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: true, + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: false, + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 1, + ResultSet: &Ydb.ResultSet{ + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: true, + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: false, + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: false, + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 2, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "c", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "d", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + { + Name: "e", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_BOOL, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: true, + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: false, + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 2, + ResultSet: &Ydb.ResultSet{ + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: true, + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: false, + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }, { + Value: &Ydb.Value_BoolValue{ + BoolValue: false, + }, + }}, + }, + }, + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF) + r, _, err := newResult(ctx, stream, nil) + require.NoError(t, err) + defer r.Close(ctx) + rsCount := 0 + for rs, err := range r.ResultSets(ctx) { + require.NoError(t, err) + rowsCount := 0 + for _, err := range rs.Rows(ctx) { + require.NoError(t, err) + rowsCount++ + } + require.EqualValues(t, 5, rowsCount) + rsCount++ + } + require.EqualValues(t, 3, rsCount) +} diff --git a/internal/xiter/xiter_experiment_go1.23.go b/internal/xiter/xiter_go1.23.go similarity index 100% rename from internal/xiter/xiter_experiment_go1.23.go rename to internal/xiter/xiter_go1.23.go diff --git a/query/client.go b/query/client.go index 3b2549eaa..c5366ebad 100644 --- a/query/client.go +++ b/query/client.go @@ -31,7 +31,7 @@ type ( // // Exec used by default: // - DefaultTxControl - QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (resultSetWithClose, error) + QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (ClosableResultSet, error) // QueryRow execute query and take the exactly single row from exactly single result set from result // @@ -87,7 +87,7 @@ type ( // Warning: the large result set from query will be materialized and can happened to "OOM killed" problem // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental - QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (resultSetWithClose, error) + QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (ClosableResultSet, error) // QueryRow is a helper which read only one row from first result set in result // diff --git a/query/result.go b/query/result.go index 8a4869ff4..9990fa8f0 100644 --- a/query/result.go +++ b/query/result.go @@ -7,13 +7,13 @@ import ( ) type ( - Result = result.Result - ResultSet = result.Set - resultSetWithClose = result.ClosableResultSet - Row = result.Row - Type = types.Type - NamedDestination = scanner.NamedDestination - ScanStructOption = scanner.ScanStructOption + Result = result.Result + ResultSet = result.Set + ClosableResultSet = result.ClosableResultSet + Row = result.Row + Type = types.Type + NamedDestination = scanner.NamedDestination + ScanStructOption = scanner.ScanStructOption ) func Named(columnName string, destinationValueReference interface{}) (dst NamedDestination) {