diff --git a/CHANGELOG.md b/CHANGELOG.md index b8ebcab92..4ca62809b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ -* Added `table/options.UseQueryServiceExecute()` option for redirect `table.Session.Execute` call to `query.Execute` +* Added `table/options.WithQueryService()` option for redirect `/Ydb.Table.V1.TableService/ExecuteDataQuery` call to `/Ydb.Query.V1.QueryService/ExecuteQuery` + +## v3.65.2 +* Fixed data race using `log.WithNames` ## v3.65.1 * Updated dependency `ydb-go-genproto` diff --git a/internal/query/errors.go b/internal/query/errors.go index 923ef8ed8..ef29fcd9f 100644 --- a/internal/query/errors.go +++ b/internal/query/errors.go @@ -7,6 +7,7 @@ import ( var ( ErrNotImplemented = errors.New("not implemented yet") errWrongNextResultSetIndex = errors.New("wrong result set index") + errNilResult = errors.New("nil result") errClosedResult = errors.New("result closed early") errClosedClient = errors.New("query client closed early") errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index") diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index 8ea85bcc1..990bc6c62 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -39,6 +39,9 @@ func queryFromText( } func ReadAll(ctx context.Context, r *result) (resultSets []*Ydb.ResultSet, stats *Ydb_TableStats.QueryStats, _ error) { + if r == nil { + return nil, nil, xerrors.WithStackTrace(errNilResult) + } for { resultSet, err := r.nextResultSet(ctx) if err != nil { diff --git a/internal/query/execute_query_test.go b/internal/query/execute_query_test.go index 5fcdc89ab..89d2f2963 100644 --- a/internal/query/execute_query_test.go +++ b/internal/query/execute_query_test.go @@ -1109,3 +1109,1041 @@ func TestExecuteQueryRequest(t *testing.T) { }) } } + +func TestReadAll(t *testing.T) { + ctx := xtest.Context(t) + ctrl := gomock.NewController(t) + t.Run("NilResult", func(t *testing.T) { + resultSets, stats, err := ReadAll(ctx, nil) + require.ErrorIs(t, err, errNilResult) + require.Nil(t, resultSets) + require.Nil(t, stats) + }) + t.Run("io.EOF", func(t *testing.T) { + stream := NewMockResultStream(ctrl) + stream.EXPECT().Recv().Return(nil, io.EOF) + r, txID, err := newResult(ctx, stream, nil, nil) + require.NoError(t, err) + require.Equal(t, "", txID) + resultSets, stats, err := ReadAll(ctx, r) + require.ErrorIs(t, err, errNilResult) + require.Nil(t, resultSets) + require.Nil(t, stats) + }) + t.Run("EmptyResult", func(t *testing.T) { + stream := NewMockResultStream(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: nil, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF).AnyTimes() + r, txID, err := newResult(ctx, stream, nil, nil) + require.NoError(t, err) + require.Equal(t, "123", txID) + resultSets, stats, err := ReadAll(ctx, r) + require.NoError(t, err) + require.EqualValues(t, []*Ydb.ResultSet{ + {}, + }, resultSets) + require.Nil(t, stats) + }) + t.Run("SingleResultSet", func(t *testing.T) { + t.Run("SinglePart", func(t *testing.T) { + stream := NewMockResultStream(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF).AnyTimes() + r, txID, err := newResult(ctx, stream, nil, nil) + require.NoError(t, err) + require.Equal(t, "123", txID) + resultSets, stats, err := ReadAll(ctx, r) + require.NoError(t, err) + require.EqualValues(t, []*Ydb.ResultSet{ + { + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, resultSets) + require.Nil(t, stats) + }) + t.Run("TwoParts", func(t *testing.T) { + stream := NewMockResultStream(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 6, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "6", + }, + }}, + }, + }, + }, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF).AnyTimes() + r, txID, err := newResult(ctx, stream, nil, nil) + require.NoError(t, err) + require.Equal(t, "123", txID) + resultSets, stats, err := ReadAll(ctx, r) + require.NoError(t, err) + require.EqualValues(t, []*Ydb.ResultSet{ + { + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 6, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "6", + }, + }}, + }, + }, + }, + }, resultSets) + require.Nil(t, stats) + }) + }) + t.Run("TwoResultSets", func(t *testing.T) { + t.Run("SinglePart", func(t *testing.T) { + stream := NewMockResultStream(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 1, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "c", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "d", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF).AnyTimes() + r, txID, err := newResult(ctx, stream, nil, nil) + require.NoError(t, err) + require.Equal(t, "123", txID) + resultSets, stats, err := ReadAll(ctx, r) + require.NoError(t, err) + require.EqualValues(t, []*Ydb.ResultSet{ + { + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + { + Columns: []*Ydb.Column{ + { + Name: "c", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "d", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + }, resultSets) + require.Nil(t, stats) + }) + t.Run("TwoParts", func(t *testing.T) { + stream := NewMockResultStream(ctrl) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 0, + ResultSet: &Ydb.ResultSet{ + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 6, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "6", + }, + }}, + }, + }, + }, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 1, + ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "c", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "d", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + }, + }, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{ + Status: Ydb.StatusIds_SUCCESS, + ResultSetIndex: 1, + ResultSet: &Ydb.ResultSet{ + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 6, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "6", + }, + }}, + }, + }, + }, + ExecStats: nil, + TxMeta: &Ydb_Query.TransactionMeta{ + Id: "123", + }, + }, nil) + stream.EXPECT().Recv().Return(nil, io.EOF).AnyTimes() + r, txID, err := newResult(ctx, stream, nil, nil) + require.NoError(t, err) + require.Equal(t, "123", txID) + resultSets, stats, err := ReadAll(ctx, r) + require.NoError(t, err) + require.EqualValues(t, []*Ydb.ResultSet{ + { + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 6, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "6", + }, + }}, + }, + }, + }, + { + Columns: []*Ydb.Column{ + { + Name: "c", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "d", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, + Rows: []*Ydb.Value{ + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 1, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "1", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 2, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "2", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 3, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "3", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 4, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "4", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 5, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "5", + }, + }}, + }, + { + Items: []*Ydb.Value{{ + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 6, + }, + }, { + Value: &Ydb.Value_TextValue{ + TextValue: "6", + }, + }}, + }, + }, + }, + }, resultSets) + require.Nil(t, stats) + }) + }) +} diff --git a/internal/query/result.go b/internal/query/result.go index 78478610d..32e8b767f 100644 --- a/internal/query/result.go +++ b/internal/query/result.go @@ -5,7 +5,6 @@ import ( "fmt" "io" - "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" @@ -15,21 +14,28 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) +//go:generate mockgen -destination result_stream_mock_test.go -package query -write_package_comment=false . ResultStream + var _ query.Result = (*result)(nil) -type result struct { - stream Ydb_Query_V1.QueryService_ExecuteQueryClient - closeOnce func(ctx context.Context) error - lastPart *Ydb_Query.ExecuteQueryResponsePart - resultSetIndex int64 - errs []error - closed chan struct{} - trace *trace.Query -} +type ( + ResultStream interface { + Recv() (*Ydb_Query.ExecuteQueryResponsePart, error) + } + result struct { + stream ResultStream + closeOnce func(ctx context.Context) error + lastPart *Ydb_Query.ExecuteQueryResponsePart + resultSetIndex int64 + errs []error + closed chan struct{} + trace *trace.Query + } +) func newResult( ctx context.Context, - stream Ydb_Query_V1.QueryService_ExecuteQueryClient, + stream ResultStream, t *trace.Query, closeResult context.CancelFunc, ) (_ *result, txID string, err error) { @@ -53,6 +59,10 @@ func newResult( default: part, err := nextPart(ctx, stream, t) if err != nil { + if xerrors.Is(err, io.EOF) { + return nil, txID, nil + } + return nil, txID, xerrors.WithStackTrace(err) } var ( @@ -81,7 +91,7 @@ func newResult( func nextPart( ctx context.Context, - stream Ydb_Query_V1.QueryService_ExecuteQueryClient, + stream ResultStream, t *trace.Query, ) (_ *Ydb_Query.ExecuteQueryResponsePart, finalErr error) { if t == nil { @@ -130,7 +140,7 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) { case <-ctx.Done(): return nil, xerrors.WithStackTrace(ctx.Err()) default: - if resultSetIndex := r.lastPart.GetResultSetIndex(); resultSetIndex >= nextResultSetIndex { //nolint:nestif + if resultSetIndex := r.lastPart.GetResultSetIndex(); resultSetIndex >= nextResultSetIndex { r.resultSetIndex = resultSetIndex return newResultSet(func() (_ *Ydb_Query.ExecuteQueryResponsePart, err error) { @@ -147,10 +157,6 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) { default: part, err := nextPart(ctx, r.stream, r.trace) if err != nil { - if xerrors.Is(err, io.EOF) { - _ = r.closeOnce(ctx) - } - return nil, xerrors.WithStackTrace(err) } r.lastPart = part diff --git a/internal/query/result_stream_mock_test.go b/internal/query/result_stream_mock_test.go new file mode 100644 index 000000000..4a31558b8 --- /dev/null +++ b/internal/query/result_stream_mock_test.go @@ -0,0 +1,52 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ydb-platform/ydb-go-sdk/v3/internal/query (interfaces: ResultStream) +// +// Generated by this command: +// +// mockgen -destination result_stream_mock_test.go -package query -write_package_comment=false . ResultStream +package query + +import ( + reflect "reflect" + + Ydb_Query "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + gomock "go.uber.org/mock/gomock" +) + +// MockResultStream is a mock of ResultStream interface. +type MockResultStream struct { + ctrl *gomock.Controller + recorder *MockResultStreamMockRecorder +} + +// MockResultStreamMockRecorder is the mock recorder for MockResultStream. +type MockResultStreamMockRecorder struct { + mock *MockResultStream +} + +// NewMockResultStream creates a new mock instance. +func NewMockResultStream(ctrl *gomock.Controller) *MockResultStream { + mock := &MockResultStream{ctrl: ctrl} + mock.recorder = &MockResultStreamMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockResultStream) EXPECT() *MockResultStreamMockRecorder { + return m.recorder +} + +// Recv mocks base method. +func (m *MockResultStream) Recv() (*Ydb_Query.ExecuteQueryResponsePart, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*Ydb_Query.ExecuteQueryResponsePart) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockResultStreamMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockResultStream)(nil).Recv)) +} diff --git a/internal/table/session.go b/internal/table/session.go index 855ec5bae..80670b302 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -48,16 +48,16 @@ import ( // Note that after session is no longer needed it should be destroyed by // Close() call. type session struct { - onClose []func(s *session) - id string - tableService Ydb_Table_V1.TableServiceClient - queryService Ydb_Query_V1.QueryServiceClient - status table.SessionStatus - config *config.Config - lastUsage atomic.Int64 - statusMtx sync.RWMutex - closeOnce sync.Once - nodeID atomic.Int64 + onClose []func(s *session) + id string + client Ydb_Table_V1.TableServiceClient + queryClient Ydb_Query_V1.QueryServiceClient + status table.SessionStatus + config *config.Config + lastUsage atomic.Int64 + statusMtx sync.RWMutex + closeOnce sync.Once + nodeID atomic.Int64 } func (s *session) LastUsage() time.Time { @@ -156,7 +156,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config } s.lastUsage.Store(time.Now().Unix()) - s.tableService = Ydb_Table_V1.NewTableServiceClient( + s.client = Ydb_Table_V1.NewTableServiceClient( conn.WithBeforeFunc( conn.WithContextModifier(cc, func(ctx context.Context) context.Context { return meta.WithTrailerCallback(balancerContext.WithEndpoint(ctx, s), s.checkCloseHint) @@ -167,7 +167,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config ), ) - s.queryService = Ydb_Query_V1.NewQueryServiceClient( + s.queryClient = Ydb_Query_V1.NewQueryServiceClient( conn.WithBeforeFunc( conn.WithContextModifier(cc, func(ctx context.Context) context.Context { return meta.WithTrailerCallback(balancerContext.WithEndpoint(ctx, s), s.checkCloseHint) @@ -205,7 +205,7 @@ func (s *session) Close(ctx context.Context) (err error) { }() if time.Since(s.LastUsage()) < s.config.IdleThreshold() { - _, err = s.tableService.DeleteSession(ctx, + _, err = s.client.DeleteSession(ctx, &Ydb_Table.DeleteSessionRequest{ SessionId: s.id, OperationParams: operation.Params(ctx, @@ -256,7 +256,7 @@ func (s *session) KeepAlive(ctx context.Context) (err error) { onDone(err) }() - resp, err := s.tableService.KeepAlive(ctx, + resp, err := s.client.KeepAlive(ctx, &Ydb_Table.KeepAliveRequest{ SessionId: s.id, OperationParams: operation.Params( @@ -311,7 +311,7 @@ func (s *session) CreateTable( opt.ApplyCreateTableOption((*options.CreateTableDesc)(&request), a) } } - _, err = s.tableService.CreateTable(ctx, &request) + _, err = s.client.CreateTable(ctx, &request) if err != nil { return xerrors.WithStackTrace(err) } @@ -344,7 +344,7 @@ func (s *session) DescribeTable( opt((*options.DescribeTableDesc)(&request)) } } - response, err = s.tableService.DescribeTable(ctx, &request) + response, err = s.client.DescribeTable(ctx, &request) if err != nil { return desc, xerrors.WithStackTrace(err) } @@ -493,7 +493,7 @@ func (s *session) DropTable( opt.ApplyDropTableOption((*options.DropTableDesc)(&request)) } } - _, err = s.tableService.DropTable(ctx, &request) + _, err = s.client.DropTable(ctx, &request) return xerrors.WithStackTrace(err) } @@ -532,7 +532,7 @@ func (s *session) AlterTable( opt.ApplyAlterTableOption((*options.AlterTableDesc)(&request), a) } } - _, err = s.tableService.AlterTable(ctx, &request) + _, err = s.client.AlterTable(ctx, &request) return xerrors.WithStackTrace(err) } @@ -559,7 +559,7 @@ func (s *session) CopyTable( opt((*options.CopyTableDesc)(&request)) } } - _, err = s.tableService.CopyTable(ctx, &request) + _, err = s.client.CopyTable(ctx, &request) if err != nil { return xerrors.WithStackTrace(err) } @@ -609,7 +609,7 @@ func (s *session) CopyTables( ctx context.Context, opts ...options.CopyTablesOption, ) (err error) { - err = copyTables(ctx, s.id, s.config.OperationTimeout(), s.config.OperationCancelAfter(), s.tableService, opts...) + err = copyTables(ctx, s.id, s.config.OperationTimeout(), s.config.OperationCancelAfter(), s.client, opts...) if err != nil { return xerrors.WithStackTrace(err) } @@ -659,7 +659,7 @@ func (s *session) RenameTables( ctx context.Context, opts ...options.RenameTablesOption, ) (err error) { - err = renameTables(ctx, s.id, s.config.OperationTimeout(), s.config.OperationCancelAfter(), s.tableService, opts...) + err = renameTables(ctx, s.id, s.config.OperationTimeout(), s.config.OperationCancelAfter(), s.client, opts...) if err != nil { return xerrors.WithStackTrace(err) } @@ -692,7 +692,7 @@ func (s *session) Explain( } }() - response, err = s.tableService.ExplainDataQuery(ctx, + response, err = s.client.ExplainDataQuery(ctx, &Ydb_Table.ExplainDataQueryRequest{ SessionId: s.id, YqlText: query, @@ -741,7 +741,7 @@ func (s *session) Prepare(ctx context.Context, queryText string) (_ table.Statem } }() - response, err = s.tableService.PrepareDataQuery(ctx, + response, err = s.client.PrepareDataQuery(ctx, &Ydb_Table.PrepareDataQueryRequest{ SessionId: s.id, YqlText: queryText, @@ -837,7 +837,7 @@ func (s *session) Execute( for _, opt := range opts { executeOptions = append(executeOptions, opt) } - tx, res, err := query.Execute(ctx, s.queryService, s.ID(), q, executeOptions...) + tx, res, err := query.Execute(ctx, s.queryClient, s.ID(), q, executeOptions...) if err != nil { return nil, nil, xerrors.WithStackTrace(err) } @@ -904,7 +904,7 @@ func (s *session) executeDataQuery( response *Ydb_Table.ExecuteDataQueryResponse ) - response, err = s.tableService.ExecuteDataQuery(ctx, request, callOptions...) + response, err = s.client.ExecuteDataQuery(ctx, request, callOptions...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -938,7 +938,7 @@ func (s *session) ExecuteSchemeQuery( opt((*options.ExecuteSchemeQueryDesc)(&request)) } } - _, err = s.tableService.ExecuteSchemeQuery(ctx, &request) + _, err = s.client.ExecuteSchemeQuery(ctx, &request) return xerrors.WithStackTrace(err) } @@ -960,7 +960,7 @@ func (s *session) DescribeTableOptions(ctx context.Context) ( operation.ModeSync, ), } - response, err = s.tableService.DescribeTableOptions(ctx, &request) + response, err = s.client.DescribeTableOptions(ctx, &request) if err != nil { return desc, xerrors.WithStackTrace(err) } @@ -1111,7 +1111,7 @@ func (s *session) StreamReadTable( ctx, cancel := xcontext.WithCancel(ctx) - stream, err = s.tableService.StreamReadTable(ctx, &request) + stream, err = s.client.StreamReadTable(ctx, &request) if err != nil { cancel() @@ -1173,7 +1173,7 @@ func (s *session) ReadRows( } } - response, err = s.tableService.ReadRows(ctx, &request) + response, err = s.client.ReadRows(ctx, &request) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -1231,7 +1231,7 @@ func (s *session) StreamExecuteScanQuery( ctx, cancel := xcontext.WithCancel(ctx) - stream, err = s.tableService.StreamExecuteScanQuery(ctx, &request, callOptions...) + stream, err = s.client.StreamExecuteScanQuery(ctx, &request, callOptions...) if err != nil { cancel() @@ -1293,7 +1293,7 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value } } - _, err = s.tableService.BulkUpsert(ctx, + _, err = s.client.BulkUpsert(ctx, &Ydb_Table.BulkUpsertRequest{ Table: table, Rows: value.ToYDB(rows, a), @@ -1332,7 +1332,7 @@ func (s *session) BeginTransaction( onDone(x, err) }() - response, err = s.tableService.BeginTransaction(ctx, + response, err = s.client.BeginTransaction(ctx, &Ydb_Table.BeginTransactionRequest{ SessionId: s.id, TxSettings: txSettings.Settings(), diff --git a/internal/table/session_test.go b/internal/table/session_test.go index 710bd57b6..91c3ea9d8 100644 --- a/internal/table/session_test.go +++ b/internal/table/session_test.go @@ -240,8 +240,8 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { method: testutil.TableExecuteDataQuery, do: func(t *testing.T, ctx context.Context, c *Client) { s := &session{ - tableService: Ydb_Table_V1.NewTableServiceClient(c.cc), - config: config.New(), + client: Ydb_Table_V1.NewTableServiceClient(c.cc), + config: config.New(), } _, _, err := s.Execute(ctx, table.TxControl(), "", table.NewQueryParameters()) require.NoError(t, err) @@ -251,8 +251,8 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { method: testutil.TableExplainDataQuery, do: func(t *testing.T, ctx context.Context, c *Client) { s := &session{ - tableService: Ydb_Table_V1.NewTableServiceClient(c.cc), - config: config.New(), + client: Ydb_Table_V1.NewTableServiceClient(c.cc), + config: config.New(), } _, err := s.Explain(ctx, "") require.NoError(t, err) @@ -262,8 +262,8 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { method: testutil.TablePrepareDataQuery, do: func(t *testing.T, ctx context.Context, c *Client) { s := &session{ - tableService: Ydb_Table_V1.NewTableServiceClient(c.cc), - config: config.New(), + client: Ydb_Table_V1.NewTableServiceClient(c.cc), + config: config.New(), } _, err := s.Prepare(ctx, "") require.NoError(t, err) @@ -280,8 +280,8 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { method: testutil.TableDeleteSession, do: func(t *testing.T, ctx context.Context, c *Client) { s := &session{ - tableService: Ydb_Table_V1.NewTableServiceClient(c.cc), - config: config.New(), + client: Ydb_Table_V1.NewTableServiceClient(c.cc), + config: config.New(), } require.NoError(t, s.Close(ctx)) }, @@ -290,8 +290,8 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { method: testutil.TableBeginTransaction, do: func(t *testing.T, ctx context.Context, c *Client) { s := &session{ - tableService: Ydb_Table_V1.NewTableServiceClient(c.cc), - config: config.New(), + client: Ydb_Table_V1.NewTableServiceClient(c.cc), + config: config.New(), } _, err := s.BeginTransaction(ctx, table.TxSettings()) require.NoError(t, err) @@ -302,8 +302,8 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { do: func(t *testing.T, ctx context.Context, c *Client) { tx := &transaction{ s: &session{ - tableService: Ydb_Table_V1.NewTableServiceClient(c.cc), - config: config.New(), + client: Ydb_Table_V1.NewTableServiceClient(c.cc), + config: config.New(), }, } _, err := tx.CommitTx(ctx) @@ -315,8 +315,8 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { do: func(t *testing.T, ctx context.Context, c *Client) { tx := &transaction{ s: &session{ - tableService: Ydb_Table_V1.NewTableServiceClient(c.cc), - config: config.New(), + client: Ydb_Table_V1.NewTableServiceClient(c.cc), + config: config.New(), }, } err := tx.Rollback(ctx) @@ -327,8 +327,8 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { method: testutil.TableKeepAlive, do: func(t *testing.T, ctx context.Context, c *Client) { s := &session{ - tableService: Ydb_Table_V1.NewTableServiceClient(c.cc), - config: config.New(), + client: Ydb_Table_V1.NewTableServiceClient(c.cc), + config: config.New(), } require.NoError(t, s.KeepAlive(ctx)) }, diff --git a/internal/table/transaction.go b/internal/table/transaction.go index 81fe37500..35bdfd9b0 100644 --- a/internal/table/transaction.go +++ b/internal/table/transaction.go @@ -29,7 +29,12 @@ type txState struct { } func (s *txState) Get() txStateEnum { - return *s.Pointer.Load() + ptr := s.Pointer.Load() + if ptr == nil { + return txStateInitialized + } + + return *ptr } func (s *txState) Set(state txStateEnum) { @@ -52,6 +57,10 @@ type transaction struct { } func (tx *transaction) ID() string { + if tx == nil { + return "" + } + return tx.id } @@ -167,7 +176,7 @@ func (tx *transaction) CommitTx( } } - response, err = tx.s.tableService.CommitTransaction(ctx, request) + response, err = tx.s.client.CommitTransaction(ctx, request) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -204,7 +213,7 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) { case txStateRollbacked: return xerrors.WithStackTrace(errTxRollbackedEarly) default: - _, err = tx.s.tableService.RollbackTransaction(ctx, + _, err = tx.s.client.RollbackTransaction(ctx, &Ydb_Table.RollbackTransactionRequest{ SessionId: tx.s.id, TxId: tx.id, diff --git a/internal/version/version.go b/internal/version/version.go index 0238c9656..1238601f4 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -3,7 +3,7 @@ package version const ( Major = "3" Minor = "65" - Patch = "1" + Patch = "2" Prefix = "ydb-go-sdk" ) diff --git a/log/context.go b/log/context.go index 81231df01..b724e5ec3 100644 --- a/log/context.go +++ b/log/context.go @@ -1,6 +1,8 @@ package log -import "context" +import ( + "context" +) type ( ctxLevelKey struct{} @@ -18,7 +20,11 @@ func LevelFromContext(ctx context.Context) Level { } func WithNames(ctx context.Context, names ...string) context.Context { - return context.WithValue(ctx, ctxNamesKey{}, append(NamesFromContext(ctx), names...)) + // trim capacity for force allocate new memory while append and prevent data race + oldNames := NamesFromContext(ctx) + oldNames = oldNames[:len(oldNames):len(oldNames)] + + return context.WithValue(ctx, ctxNamesKey{}, append(oldNames, names...)) } func NamesFromContext(ctx context.Context) []string { @@ -27,7 +33,7 @@ func NamesFromContext(ctx context.Context) []string { return []string{} } - return v + return v[:len(v):len(v)] // prevent re } func with(ctx context.Context, lvl Level, names ...string) context.Context { diff --git a/log/context_test.go b/log/context_test.go index 3d8c05067..92882b887 100644 --- a/log/context_test.go +++ b/log/context_test.go @@ -2,9 +2,13 @@ package log import ( "context" + "strconv" "testing" + "time" "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" ) func TestLevelFromContext(t *testing.T) { @@ -54,3 +58,35 @@ func TestNamesFromContext(t *testing.T) { }) } } + +func TestWithNamesRaceRegression(t *testing.T) { + count := 100 + xtest.TestManyTimes(t, func(t testing.TB) { + ctx := WithNames(context.Background(), "test") + ctx = WithNames(ctx, "test") + ctx = WithNames(ctx, "test") + res := make([]context.Context, count) + + start := make(chan bool) + finished := make(chan bool) + for i := 0; i < count; i++ { + go func(index int) { + <-start + res[index] = WithNames(ctx, strconv.Itoa(index)) + finished <- true + }(i) + } + + time.Sleep(time.Microsecond) + close(start) + + for i := 0; i < count; i++ { + <-finished + } + + for i := 0; i < count; i++ { + expected := []string{"test", "test", "test", strconv.Itoa(i)} + require.Equal(t, expected, NamesFromContext(res[i])) + } + }) +} diff --git a/tests/integration/table_use_query_test.go b/tests/integration/table_with_query_service_test.go similarity index 88% rename from tests/integration/table_use_query_test.go rename to tests/integration/table_with_query_service_test.go index 0a961dd24..90eaeac7e 100644 --- a/tests/integration/table_use_query_test.go +++ b/tests/integration/table_with_query_service_test.go @@ -18,7 +18,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" ) -func TestWithQueryService(t *testing.T) { +func TestTableWithQueryService(t *testing.T) { if version.Lt(os.Getenv("YDB_VERSION"), "24.1") { t.Skip("query service not allowed in YDB version '" + os.Getenv("YDB_VERSION") + "'") } @@ -51,7 +51,6 @@ func TestWithQueryService(t *testing.T) { } return fmt.Errorf("unexpected empty result set") } - var abc, def int32 err = res.ScanNamed( named.Required("abc", &abc), named.Required("def", &def), @@ -67,8 +66,12 @@ func TestWithQueryService(t *testing.T) { require.EqualValues(t, 456, def) }) t.Run("table.Transaction.Execute", func(t *testing.T) { + var abc, def int32 err = db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) (err error) { - res, err := tx.Execute(ctx, `SELECT 1 as abc, 2 as def;`, nil) + res, err := tx.Execute(ctx, + `SELECT 123 as abc, 456 as def;`, nil, + options.WithQueryService(), + ) if err != nil { return err } @@ -82,10 +85,9 @@ func TestWithQueryService(t *testing.T) { } return fmt.Errorf("unexpected empty result set") } - var abc, def int32 err = res.ScanNamed( named.Required("abc", &abc), - named.Required("ghi", &def), + named.Required("def", &def), ) if err != nil { return err @@ -93,7 +95,8 @@ func TestWithQueryService(t *testing.T) { t.Log(abc, def) return res.Err() }, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly()))) - require.Error(t, err) - require.ErrorContains(t, err, "not found column 'ghi'") + require.NoError(t, err) + require.EqualValues(t, 123, abc) + require.EqualValues(t, 456, def) }) }