Skip to content

Commit

Permalink
Showing 9 changed files with 26 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Renamed method at experimental API reader.PopBatchTx to reader.PopMessagesBatchTx

## v3.80.5
* Fixed connections pool leak on failed `ydb.Open` call

6 changes: 3 additions & 3 deletions examples/topic/topicreader/topic_reader_transaction.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ func CommitMessagesToTransaction(ctx context.Context, db *ydb.Driver, reader *to
}

err := db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
batch, err := reader.PopBatchTx(ctx, tx) // the batch will be committed with commit the tx
batch, err := reader.PopMessagesBatchTx(ctx, tx) // the batch will be committed with commit the tx
if err != nil {
return err
}
@@ -51,7 +51,7 @@ func PopWithTransaction(ctx context.Context, db *ydb.Driver, reader *topicreader
}

err := db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
batch, err := reader.PopBatchTx(ctx, tx)
batch, err := reader.PopMessagesBatchTx(ctx, tx)
if err != nil {
return err
}
@@ -102,7 +102,7 @@ func PopWithTransactionRecreateReader(
return err
}

batch, err := reader.PopBatchTx(ctx, tx)
batch, err := reader.PopMessagesBatchTx(ctx, tx)
if err != nil {
return err
}
Original file line number Diff line number Diff line change
@@ -14,5 +14,5 @@ type batchedStreamReader interface {
ReadMessageBatch(ctx context.Context, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error)
Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
CloseWithError(ctx context.Context, err error) error
PopBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/topic/topicreaderinternal/reader.go
Original file line number Diff line number Diff line change
@@ -128,7 +128,7 @@ func (r *Reader) PopBatchTx(
) (*topicreadercommon.PublicBatch, error) {
batchOptions := r.getBatchOptions(opts)

return r.reader.PopBatchTx(ctx, tx, batchOptions)
return r.reader.PopMessagesBatchTx(ctx, tx, batchOptions)
}

// ReadMessage read exactly one message
2 changes: 1 addition & 1 deletion internal/topic/topicreaderinternal/stream_reader_impl.go
Original file line number Diff line number Diff line change
@@ -185,7 +185,7 @@ func (r *topicStreamReaderImpl) WaitInit(_ context.Context) error {
return nil
}

func (r *topicStreamReaderImpl) PopBatchTx(
func (r *topicStreamReaderImpl) PopMessagesBatchTx(
ctx context.Context,
tx tx.Transaction,
opts ReadMessageBatchOptions,
4 changes: 2 additions & 2 deletions internal/topic/topicreaderinternal/stream_reconnector.go
Original file line number Diff line number Diff line change
@@ -78,7 +78,7 @@ func newReaderReconnector(
return res
}

func (r *readerReconnector) PopBatchTx(
func (r *readerReconnector) PopMessagesBatchTx(
ctx context.Context,
tx tx.Transaction,
opts ReadMessageBatchOptions,
@@ -95,7 +95,7 @@ func (r *readerReconnector) PopBatchTx(
*topicreadercommon.PublicBatch,
error,
) {
return stream.PopBatchTx(ctx, tx, opts)
return stream.PopMessagesBatchTx(ctx, tx, opts)
},
)
}
4 changes: 2 additions & 2 deletions tests/integration/topic_transactions_test.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ func TestTopicReadInTransaction(t *testing.T) {
require.NoError(t, scope.Driver().Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
reader := scope.TopicReaderNamed("first")
scope.Logf("trying to pop a batch")
batch, err := reader.PopBatchTx(ctx, tx)
batch, err := reader.PopMessagesBatchTx(ctx, tx)
scope.Logf("pop a batch result: %v", err)
if err != nil {
return err
@@ -55,7 +55,7 @@ func TestTopicReadInTransaction(t *testing.T) {
}

scope.Logf("trying second pop batch")
batch, err := reader.PopBatchTx(ctx, tx)
batch, err := reader.PopMessagesBatchTx(ctx, tx)
scope.Logf("second pop batch result: %v", err)
if err != nil {
return err
4 changes: 2 additions & 2 deletions topic/topicreader/reader.go
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error {
return r.reader.Commit(ctx, obj)
}

// PopBatchTx read messages batch and commit them within tx.
// PopMessagesBatchTx read messages batch and commit them within tx.
// If tx failed - the batch will be received again.
//
// Now it means reconnect to the server and re-read messages from the server to the readers buffer.
@@ -83,7 +83,7 @@ func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error {
// The reconnect is implementation detail and may be changed in the future.
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func (r *Reader) PopBatchTx(
func (r *Reader) PopMessagesBatchTx(
ctx context.Context,
transaction tx.Identifier,
opts ...ReadBatchOption,

0 comments on commit e582517

Please sign in to comment.