diff --git a/CHANGELOG.md b/CHANGELOG.md index 89126073e..ea3eba4e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Renamed method at experimental API reader.PopBatchTx to reader.PopMessagesBatchTx + ## v3.80.5 * Fixed connections pool leak on failed `ydb.Open` call diff --git a/examples/topic/topicreader/topic_reader_transaction.go b/examples/topic/topicreader/topic_reader_transaction.go index 7bfa54944..013614fe0 100644 --- a/examples/topic/topicreader/topic_reader_transaction.go +++ b/examples/topic/topicreader/topic_reader_transaction.go @@ -15,7 +15,7 @@ func CommitMessagesToTransaction(ctx context.Context, db *ydb.Driver, reader *to } err := db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error { - batch, err := reader.PopBatchTx(ctx, tx) // the batch will be committed with commit the tx + batch, err := reader.PopMessagesBatchTx(ctx, tx) // the batch will be committed with commit the tx if err != nil { return err } @@ -51,7 +51,7 @@ func PopWithTransaction(ctx context.Context, db *ydb.Driver, reader *topicreader } err := db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error { - batch, err := reader.PopBatchTx(ctx, tx) + batch, err := reader.PopMessagesBatchTx(ctx, tx) if err != nil { return err } @@ -102,7 +102,7 @@ func PopWithTransactionRecreateReader( return err } - batch, err := reader.PopBatchTx(ctx, tx) + batch, err := reader.PopMessagesBatchTx(ctx, tx) if err != nil { return err } diff --git a/internal/topic/topicreaderinternal/batched_stream_reader_interface.go b/internal/topic/topicreaderinternal/batched_stream_reader_interface.go index 47aa4522f..b65a4e8ae 100644 --- a/internal/topic/topicreaderinternal/batched_stream_reader_interface.go +++ b/internal/topic/topicreaderinternal/batched_stream_reader_interface.go @@ -14,5 +14,5 @@ type batchedStreamReader interface { ReadMessageBatch(ctx context.Context, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error CloseWithError(ctx context.Context, err error) error - PopBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll + PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll } diff --git a/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go b/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go index ad0ccb166..85194d433 100644 --- a/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go +++ b/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go @@ -114,41 +114,41 @@ func (c *MockbatchedStreamReaderCommitCall) DoAndReturn(f func(context.Context, return c } -// PopBatchTx mocks base method. -func (m *MockbatchedStreamReader) PopBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) { +// PopMessagesBatchTx mocks base method. +func (m *MockbatchedStreamReader) PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PopBatchTx", ctx, tx, opts) + ret := m.ctrl.Call(m, "PopMessagesBatchTx", ctx, tx, opts) ret0, _ := ret[0].(*topicreadercommon.PublicBatch) ret1, _ := ret[1].(error) return ret0, ret1 } -// PopBatchTx indicates an expected call of PopBatchTx. -func (mr *MockbatchedStreamReaderMockRecorder) PopBatchTx(ctx, tx, opts any) *MockbatchedStreamReaderPopBatchTxCall { +// PopMessagesBatchTx indicates an expected call of PopMessagesBatchTx. +func (mr *MockbatchedStreamReaderMockRecorder) PopMessagesBatchTx(ctx, tx, opts any) *MockbatchedStreamReaderPopMessagesBatchTxCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PopBatchTx", reflect.TypeOf((*MockbatchedStreamReader)(nil).PopBatchTx), ctx, tx, opts) - return &MockbatchedStreamReaderPopBatchTxCall{Call: call} + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PopMessagesBatchTx", reflect.TypeOf((*MockbatchedStreamReader)(nil).PopMessagesBatchTx), ctx, tx, opts) + return &MockbatchedStreamReaderPopMessagesBatchTxCall{Call: call} } -// MockbatchedStreamReaderPopBatchTxCall wrap *gomock.Call -type MockbatchedStreamReaderPopBatchTxCall struct { +// MockbatchedStreamReaderPopMessagesBatchTxCall wrap *gomock.Call +type MockbatchedStreamReaderPopMessagesBatchTxCall struct { *gomock.Call } // Return rewrite *gomock.Call.Return -func (c *MockbatchedStreamReaderPopBatchTxCall) Return(arg0 *topicreadercommon.PublicBatch, arg1 error) *MockbatchedStreamReaderPopBatchTxCall { +func (c *MockbatchedStreamReaderPopMessagesBatchTxCall) Return(arg0 *topicreadercommon.PublicBatch, arg1 error) *MockbatchedStreamReaderPopMessagesBatchTxCall { c.Call = c.Call.Return(arg0, arg1) return c } // Do rewrite *gomock.Call.Do -func (c *MockbatchedStreamReaderPopBatchTxCall) Do(f func(context.Context, tx.Transaction, ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error)) *MockbatchedStreamReaderPopBatchTxCall { +func (c *MockbatchedStreamReaderPopMessagesBatchTxCall) Do(f func(context.Context, tx.Transaction, ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error)) *MockbatchedStreamReaderPopMessagesBatchTxCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockbatchedStreamReaderPopBatchTxCall) DoAndReturn(f func(context.Context, tx.Transaction, ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error)) *MockbatchedStreamReaderPopBatchTxCall { +func (c *MockbatchedStreamReaderPopMessagesBatchTxCall) DoAndReturn(f func(context.Context, tx.Transaction, ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error)) *MockbatchedStreamReaderPopMessagesBatchTxCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/internal/topic/topicreaderinternal/reader.go b/internal/topic/topicreaderinternal/reader.go index 76c5884f1..1b7e995c9 100644 --- a/internal/topic/topicreaderinternal/reader.go +++ b/internal/topic/topicreaderinternal/reader.go @@ -128,7 +128,7 @@ func (r *Reader) PopBatchTx( ) (*topicreadercommon.PublicBatch, error) { batchOptions := r.getBatchOptions(opts) - return r.reader.PopBatchTx(ctx, tx, batchOptions) + return r.reader.PopMessagesBatchTx(ctx, tx, batchOptions) } // ReadMessage read exactly one message diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index a5bf552da..60eed2606 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -185,7 +185,7 @@ func (r *topicStreamReaderImpl) WaitInit(_ context.Context) error { return nil } -func (r *topicStreamReaderImpl) PopBatchTx( +func (r *topicStreamReaderImpl) PopMessagesBatchTx( ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions, diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index dfa04c78b..1f41cb39b 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -78,7 +78,7 @@ func newReaderReconnector( return res } -func (r *readerReconnector) PopBatchTx( +func (r *readerReconnector) PopMessagesBatchTx( ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions, @@ -95,7 +95,7 @@ func (r *readerReconnector) PopBatchTx( *topicreadercommon.PublicBatch, error, ) { - return stream.PopBatchTx(ctx, tx, opts) + return stream.PopMessagesBatchTx(ctx, tx, opts) }, ) } diff --git a/tests/integration/topic_transactions_test.go b/tests/integration/topic_transactions_test.go index facdc360a..3f3654b53 100644 --- a/tests/integration/topic_transactions_test.go +++ b/tests/integration/topic_transactions_test.go @@ -30,7 +30,7 @@ func TestTopicReadInTransaction(t *testing.T) { require.NoError(t, scope.Driver().Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error { reader := scope.TopicReaderNamed("first") scope.Logf("trying to pop a batch") - batch, err := reader.PopBatchTx(ctx, tx) + batch, err := reader.PopMessagesBatchTx(ctx, tx) scope.Logf("pop a batch result: %v", err) if err != nil { return err @@ -55,7 +55,7 @@ func TestTopicReadInTransaction(t *testing.T) { } scope.Logf("trying second pop batch") - batch, err := reader.PopBatchTx(ctx, tx) + batch, err := reader.PopMessagesBatchTx(ctx, tx) scope.Logf("second pop batch result: %v", err) if err != nil { return err diff --git a/topic/topicreader/reader.go b/topic/topicreader/reader.go index f96f70505..2e8c0fce3 100644 --- a/topic/topicreader/reader.go +++ b/topic/topicreader/reader.go @@ -74,7 +74,7 @@ func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error { return r.reader.Commit(ctx, obj) } -// PopBatchTx read messages batch and commit them within tx. +// PopMessagesBatchTx read messages batch and commit them within tx. // If tx failed - the batch will be received again. // // Now it means reconnect to the server and re-read messages from the server to the readers buffer. @@ -83,7 +83,7 @@ func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error { // The reconnect is implementation detail and may be changed in the future. // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental -func (r *Reader) PopBatchTx( +func (r *Reader) PopMessagesBatchTx( ctx context.Context, transaction tx.Identifier, opts ...ReadBatchOption,