Skip to content

Commit

Permalink
refactored lazy tx
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Sep 13, 2024
1 parent 10bd41a commit 428430c
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 246 deletions.
10 changes: 5 additions & 5 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,12 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
settings := options.ExecuteSettings(opts...)
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
_, r, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
if err != nil {
return xerrors.WithStackTrace(err)
}

err = readAll(ctx, r)
err = readAll(ctx, streamResult)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -380,7 +380,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
) {
settings := options.ExecuteSettings(opts...)
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
_, streamResult, err := execute(ctx, s.ID(), s.client, q,
streamResult, err := execute(ctx, s.ID(), s.client, q,
options.ExecuteSettings(opts...), withTrace(s.trace),
)
if err != nil {
Expand Down Expand Up @@ -432,12 +432,12 @@ func clientQueryResultSet(
ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption,
) (rs result.ClosableResultSet, finalErr error) {
err := do(ctx, pool, func(ctx context.Context, s *Session) error {
_, r, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...)
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...)
if err != nil {
return xerrors.WithStackTrace(err)
}

rs, err = readMaterializedResultSet(ctx, r)
rs, err = readMaterializedResultSet(ctx, streamResult)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down
157 changes: 81 additions & 76 deletions internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,76 +175,80 @@ func TestClient(t *testing.T) {
t.Run("DoTx", func(t *testing.T) {
t.Run("HappyWay", func(t *testing.T) {
ctrl := gomock.NewController(t)
client := NewMockQueryServiceClient(ctrl)
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
Status: Ydb.StatusIds_SUCCESS,
TxMeta: &Ydb_Query.TransactionMeta{
Id: "456",
},
ResultSetIndex: 0,
ResultSet: &Ydb.ResultSet{
Columns: []*Ydb.Column{
{
Name: "a",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UINT64,
},
},
},
{
Name: "b",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UTF8,
},
},
err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
client := NewMockQueryServiceClient(ctrl)
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{
Status: Ydb.StatusIds_SUCCESS,
}, nil)

return &Ydb_Query.ExecuteQueryResponsePart{
Status: Ydb.StatusIds_SUCCESS,
TxMeta: &Ydb_Query.TransactionMeta{
Id: "456",
},
},
Rows: []*Ydb.Value{
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 1,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "1",
ResultSetIndex: 0,
ResultSet: &Ydb.ResultSet{
Columns: []*Ydb.Column{
{
Name: "a",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UINT64,
},
},
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 2,
{
Name: "b",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UTF8,
},
},
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "2",
},
Rows: []*Ydb.Value{
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 1,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "1",
},
}},
},
}},
},
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 3,
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 2,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "2",
},
}},
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "3",
{
Items: []*Ydb.Value{{
Value: &Ydb.Value_Uint64Value{
Uint64Value: 3,
},
}, {
Value: &Ydb.Value_TextValue{
TextValue: "3",
},
}},
},
}},
},
},
},
},
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{
Status: Ydb.StatusIds_SUCCESS,
}, nil)
err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
}, nil
})
stream.EXPECT().Recv().Return(nil, io.EOF)
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)

return newTestSessionWithClient("123", client), nil
}), func(ctx context.Context, tx query.TxActor) error {
defer func() {
Expand Down Expand Up @@ -283,7 +287,7 @@ func TestClient(t *testing.T) {
})
t.Run("TxLeak", func(t *testing.T) {
t.Run("OnExec", func(t *testing.T) {
t.Run("WithExplicitCommit", func(t *testing.T) {
t.Run("WithoutCommit", func(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
txInFlight := 0
ctrl := gomock.NewController(t)
Expand All @@ -297,12 +301,11 @@ func TestClient(t *testing.T) {
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
}

txInFlight++

stream := NewMockQueryService_ExecuteQueryClient(ctrl)
stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
txInFlight++

stream.EXPECT().Recv().Return(nil, io.EOF)

client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, request *Ydb_Query.CommitTransactionRequest, option ...grpc.CallOption) (
*Ydb_Query.CommitTransactionResponse, error,
Expand Down Expand Up @@ -334,7 +337,7 @@ func TestClient(t *testing.T) {
require.Zero(t, txInFlight)
})
})
t.Run("WithLazyCommit", func(t *testing.T) {
t.Run("WithCommit", func(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
ctrl := gomock.NewController(t)
txInFlight := 0
Expand All @@ -350,14 +353,16 @@ func TestClient(t *testing.T) {
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
}

txInFlight++

stream := NewMockQueryService_ExecuteQueryClient(ctrl)
stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
if rand.Int31n(100) < 50 {
txInFlight--

return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
}

txInFlight++

stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
txInFlight--

Expand Down Expand Up @@ -386,7 +391,7 @@ func TestClient(t *testing.T) {
})
})
t.Run("OnSecondExec", func(t *testing.T) {
t.Run("WithExplicitCommit", func(t *testing.T) {
t.Run("WithoutCommit", func(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
ctrl := gomock.NewController(t)
txInFlight := 0
Expand All @@ -400,10 +405,10 @@ func TestClient(t *testing.T) {
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
}

txInFlight++

firstStream := NewMockQueryService_ExecuteQueryClient(ctrl)
firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
txInFlight++

firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) (
Expand Down Expand Up @@ -476,7 +481,7 @@ func TestClient(t *testing.T) {
require.NoError(t, err)
})
})
t.Run("WithLazyCommit", func(t *testing.T) {
t.Run("WithCommit", func(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
ctrl := gomock.NewController(t)
txInFlight := 0
Expand All @@ -490,10 +495,10 @@ func TestClient(t *testing.T) {
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
}

txInFlight++

firstStream := NewMockQueryService_ExecuteQueryClient(ctrl)
firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
txInFlight++

firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) (
Expand Down
19 changes: 9 additions & 10 deletions internal/query/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (
)

var (
ErrTransactionRollingBack = xerrors.Wrap(errors.New("ydb: the transaction is rolling back"))
errWrongNextResultSetIndex = errors.New("wrong result set index")
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
errMoreThanOneRow = errors.New("unexpected more than one row in result set")
errMoreThanOneResultSet = errors.New("unexpected more than one result set")
errNoResultSets = errors.New("no result sets")
errUnexpectedTxIDOnCommitFlag = errors.New("unexpected transaction ID on commit flag")
errExpectedTxID = errors.New("expected transaction ID but nil")
ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction")
errExecuteOnCompletedTx = errors.New("execute on completed transaction")
ErrTransactionRollingBack = xerrors.Wrap(errors.New("ydb: the transaction is rolling back"))
errWrongNextResultSetIndex = errors.New("wrong result set index")
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
errMoreThanOneRow = errors.New("unexpected more than one row in result set")
errMoreThanOneResultSet = errors.New("unexpected more than one result set")
errNoResultSets = errors.New("no result sets")
errNilOption = errors.New("nil option")
ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction")
errExecuteOnCompletedTx = errors.New("execute on completed transaction")
)
15 changes: 5 additions & 10 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
Expand Down Expand Up @@ -100,7 +99,7 @@ func execute(
ctx context.Context, sessionID string, c Ydb_Query_V1.QueryServiceClient,
q string, settings executeSettings, opts ...resultOption,
) (
_ tx.Identifier, _ *streamResult, finalErr error,
_ *streamResult, finalErr error,
) {
a := allocator.New()
defer a.Free()
Expand All @@ -111,19 +110,15 @@ func execute(

stream, err := c.ExecuteQuery(executeCtx, request, callOptions...)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
return nil, xerrors.WithStackTrace(err)
}

r, txID, err := newResult(ctx, stream, append(opts, withStatsCallback(settings.StatsCallback()))...)
r, err := newResult(ctx, stream, append(opts, withStatsCallback(settings.StatsCallback()))...)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
}

if txID == "" {
return nil, r, nil
return nil, xerrors.WithStackTrace(err)
}

return tx.ID(txID), r, nil
return r, nil
}

func readAll(ctx context.Context, r *streamResult) error {
Expand Down
Loading

0 comments on commit 428430c

Please sign in to comment.