From 7f2c79325701715ac2bc32cb7797219361e903e1 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Mon, 20 May 2024 13:28:53 +0300 Subject: [PATCH 1/7] initial --- topic/topicoptions/topicoptions_reader.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/topic/topicoptions/topicoptions_reader.go b/topic/topicoptions/topicoptions_reader.go index 862f4e7de..b4eae8caa 100644 --- a/topic/topicoptions/topicoptions_reader.go +++ b/topic/topicoptions/topicoptions_reader.go @@ -257,3 +257,11 @@ func WithReaderUpdateTokenInterval(interval time.Duration) ReaderOption { cfg.CredUpdateInterval = interval } } + +// WithReaderWithoutConsumer allow read topic without consumer. +// Read without consumer is special read mode on a server. In the mode every reader without consumer receive all +// messages from a topic and can't commit them. +// The mode work good if every reader process need all messages (for example for cache invalidation) and no need +// scale process messages by readers count. +func WithReaderWithoutConsumer() ReaderOption { +} From c7c3642275b5183258d47214f6726defce1e4b0c Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Mon, 20 May 2024 19:07:07 +0300 Subject: [PATCH 2/7] Fixed sent partition number to server from topic reader --- CHANGELOG.md | 2 ++ internal/grpcwrapper/rawtopic/rawtopicreader/messages.go | 3 +++ 2 files changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 873c3687b..d0d9cd5b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Fixed bug: topic didn't send specified partition number to a server + ## v3.67.2 * Fixed incorrect formatting of decimal. Implementation of decimal has been reverted to latest working version diff --git a/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go b/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go index ad60427fa..3c2fc6ca9 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go +++ b/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go @@ -123,6 +123,9 @@ func (r *InitRequest) toProto() *Ydb_Topic.StreamReadMessage_InitRequest { dstTopicSettings.Path = srcTopicSettings.Path dstTopicSettings.MaxLag = srcTopicSettings.MaxLag.ToProto() dstTopicSettings.ReadFrom = srcTopicSettings.ReadFrom.ToProto() + + dstTopicSettings.PartitionIds = make([]int64, len(srcTopicSettings.PartitionsID)) + copy(dstTopicSettings.PartitionIds, srcTopicSettings.PartitionsID) } return p From d2095f09f4843c4c97cbcf11b7599842ff4669cc Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Mon, 20 May 2024 19:42:27 +0300 Subject: [PATCH 3/7] Added option for read topic without consumer --- CHANGELOG.md | 1 + internal/topic/topicclientinternal/client.go | 7 +- internal/topic/topicreaderinternal/reader.go | 25 ++++++- .../topicreaderinternal/stream_reader_impl.go | 1 + tests/integration/topic_client_test.go | 72 +++++++++++++++++++ topic/topicoptions/topicoptions_reader.go | 16 ++++- trace/topic.go | 1 + trace/topic_gtrace.go | 28 +------- 8 files changed, 120 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0d9cd5b5..373d02cab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Simple implement option WithReaderWithoutConsumer * Fixed bug: topic didn't send specified partition number to a server ## v3.67.2 diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 4609df17a..87bf774e4 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -223,8 +223,11 @@ func (c *Client) StartReader( } opts = append(defaultOpts, opts...) - internalReader := topicreaderinternal.NewReader(connector, consumer, readSelectors, opts...) - trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer) + internalReader, err := topicreaderinternal.NewReader(connector, consumer, readSelectors, opts...) + if err != nil { + return nil, err + } + trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err) return topicreader.NewReader(internalReader), nil } diff --git a/internal/topic/topicreaderinternal/reader.go b/internal/topic/topicreaderinternal/reader.go index bc2dc865c..e9bff2baa 100644 --- a/internal/topic/topicreaderinternal/reader.go +++ b/internal/topic/topicreaderinternal/reader.go @@ -22,6 +22,9 @@ var ( errors.New("ydb: first connection attempt not finished"), )) errReaderClosed = xerrors.Wrap(errors.New("ydb: reader closed")) + errUnexpectedEmptyConsumername = xerrors.Wrap(errors.New("ydb: create ydb reader with empty consumer name. Set one of: consumer name or option WithReaderWithoutConsumer")) //nolint:lll + errSetConsumerAndNoConsumer = xerrors.Wrap(errors.New("ydb: reader has non empty consumer name and set option WithReaderWithoutConsumer. Only one of them must be set")) //nolint:lll + errCantCommitWithoutConsumer = xerrors.Wrap(errors.New("ydb: reader can't commit messages without consumer")) errCommitSessionFromOtherReader = xerrors.Wrap(errors.New("ydb: commit with session from other reader")) ) @@ -82,8 +85,13 @@ func NewReader( consumer string, readSelectors []PublicReadSelector, opts ...PublicReaderOption, -) Reader { +) (Reader, error) { cfg := convertNewParamsToStreamConfig(consumer, readSelectors, opts...) + + if err := cfg.Validate(); err != nil { + return Reader{}, err + } + readerID := nextReaderID() readerConnector := func(ctx context.Context) (batchedStreamReader, error) { @@ -108,7 +116,7 @@ func NewReader( readerID: readerID, } - return res + return res, nil } func (r *Reader) WaitInit(ctx context.Context) error { @@ -226,6 +234,19 @@ type ReaderConfig struct { topicStreamReaderConfig } +func (c ReaderConfig) Validate() error { + if c.Consumer != "" && c.ReadWithoutConsumer { + return xerrors.WithStackTrace(errSetConsumerAndNoConsumer) + } + if c.Consumer == "" && !c.ReadWithoutConsumer { + return xerrors.WithStackTrace(errUnexpectedEmptyConsumername) + } + if c.ReadWithoutConsumer && c.CommitMode != CommitModeNone { + return xerrors.WithStackTrace(errCantCommitWithoutConsumer) + } + return nil +} + type PublicReaderOption func(cfg *ReaderConfig) func WithCredentials(cred credentials.Credentials) PublicReaderOption { diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index 8e80575b9..a974376ed 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -65,6 +65,7 @@ type topicStreamReaderConfig struct { Cred credentials.Credentials CredUpdateInterval time.Duration Consumer string + ReadWithoutConsumer bool ReadSelectors []*PublicReadSelector Trace *trace.Topic GetPartitionStartOffsetCallback PublicGetPartitionStartOffsetFunc diff --git a/tests/integration/topic_client_test.go b/tests/integration/topic_client_test.go index a47bbbc2a..a957432ec 100644 --- a/tests/integration/topic_client_test.go +++ b/tests/integration/topic_client_test.go @@ -5,8 +5,10 @@ package integration import ( "context" + "io" "os" "path" + "strings" "testing" "time" @@ -18,6 +20,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter" ) const defaultConnectionString = "grpc://localhost:2136/local" @@ -155,6 +158,75 @@ func TestSchemeList(t *testing.T) { require.True(t, hasTopic) } +func TestReaderWithoutConsumer(t *testing.T) { + t.Run("OK", func(t *testing.T) { + scope := newScope(t) + ctx := scope.Ctx + + reader1, err := scope.Driver().Topic().StartReader( + "", + topicoptions.ReadSelectors{ + { + Path: scope.TopicPath(), + Partitions: []int64{0}, + }, + }, + topicoptions.WithReaderWithoutConsumer(false), + ) + require.NoError(t, err) + + reader2, err := scope.Driver().Topic().StartReader( + "", + topicoptions.ReadSelectors{ + { + Path: scope.TopicPath(), + Partitions: []int64{0}, + }, + }, + topicoptions.WithReaderWithoutConsumer(false), + ) + require.NoError(t, err) + + err = scope.TopicWriter().Write(ctx, topicwriter.Message{Data: strings.NewReader("123")}) + require.NoError(t, err) + + msg1, err := reader1.ReadMessage(ctx) + require.NoError(t, err) + require.Equal(t, int64(1), msg1.SeqNo) + + msg1data, err := io.ReadAll(msg1) + require.NoError(t, err) + require.Equal(t, "123", string(msg1data)) + + msg2, err := reader2.ReadMessage(ctx) + require.NoError(t, err) + require.Equal(t, int64(1), msg2.SeqNo) + + msg2data, err := io.ReadAll(msg2) + require.NoError(t, err) + require.Equal(t, "123", string(msg2data)) + + _ = reader1.Close(ctx) + _ = reader2.Close(ctx) + }) + t.Run("NoNameNoOptionErr", func(t *testing.T) { + scope := newScope(t) + topicReader, err := scope.Driver().Topic().StartReader("", topicoptions.ReadTopic(scope.TopicPath())) + require.Error(t, err) + require.Nil(t, topicReader) + }) + t.Run("NameAndOption", func(t *testing.T) { + scope := newScope(t) + topicReader, err := scope.Driver().Topic().StartReader( + scope.TopicConsumerName(), + topicoptions.ReadTopic(scope.TopicPath()), + topicoptions.WithReaderWithoutConsumer(false), + ) + require.Error(t, err) + require.Nil(t, topicReader) + }) +} + func connect(t testing.TB, opts ...ydb.Option) *ydb.Driver { return connectWithLogOption(t, false, opts...) } diff --git a/topic/topicoptions/topicoptions_reader.go b/topic/topicoptions/topicoptions_reader.go index b4eae8caa..9b00541b1 100644 --- a/topic/topicoptions/topicoptions_reader.go +++ b/topic/topicoptions/topicoptions_reader.go @@ -263,5 +263,19 @@ func WithReaderUpdateTokenInterval(interval time.Duration) ReaderOption { // messages from a topic and can't commit them. // The mode work good if every reader process need all messages (for example for cache invalidation) and no need // scale process messages by readers count. -func WithReaderWithoutConsumer() ReaderOption { +// +// saveStateOnReconnection +// - if true: simulate one unbroken stream without duplicate messages (unimplemented) +// - if false: need store progress on client side for prevent re-read messages on internal reconnections to the server. +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +// https://github.com/ydb-platform/ydb-go-sdk/issues/905 +func WithReaderWithoutConsumer(saveStateOnReconnection bool) ReaderOption { + if saveStateOnReconnection { + panic("ydb: saveStateOnReconnection mode doesn't implemented yet") + } + return func(cfg *topicreaderinternal.ReaderConfig) { + cfg.ReadWithoutConsumer = true + cfg.CommitMode = CommitModeNone + } } diff --git a/trace/topic.go b/trace/topic.go index ad20bc980..a10ffd4b0 100644 --- a/trace/topic.go +++ b/trace/topic.go @@ -106,6 +106,7 @@ type ( TopicReaderStartInfo struct { ReaderID int64 Consumer string + Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index 2f3d4a6b4..dec6f299c 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -12,11 +12,9 @@ type topicComposeOptions struct { } // TopicOption specified Topic compose option -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals type TopicComposeOption func(o *topicComposeOptions) // WithTopicPanicCallback specified behavior on panic -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func WithTopicPanicCallback(cb func(e interface{})) TopicComposeOption { return func(o *topicComposeOptions) { o.panicCallback = cb @@ -24,7 +22,6 @@ func WithTopicPanicCallback(cb func(e interface{})) TopicComposeOption { } // Compose returns a new Topic which has functional fields composed both from t and x. -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func (t *Topic) Compose(x *Topic, opts ...TopicComposeOption) *Topic { var ret Topic options := topicComposeOptions{} @@ -995,14 +992,13 @@ func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMe } fn(t1) } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderStart(t *Topic, readerID int64, consumer string) { +func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { var p TopicReaderStartInfo p.ReaderID = readerID p.Consumer = consumer + p.Error = e t.onReaderStart(p) } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnect(t *Topic, reason error) func(error) { var p TopicReaderReconnectStartInfo p.Reason = reason @@ -1013,14 +1009,12 @@ func TopicOnReaderReconnect(t *Topic, reason error) func(error) { res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { var p TopicReaderReconnectRequestInfo p.Reason = reason p.WasSent = wasSent t.onReaderReconnectRequest(p) } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { var p TopicReaderPartitionReadStartResponseStartInfo p.ReaderConnectionID = readerConnectionID @@ -1037,7 +1031,6 @@ func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { var p TopicReaderPartitionReadStopResponseStartInfo p.ReaderConnectionID = readerConnectionID @@ -1054,7 +1047,6 @@ func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo p.RequestContext = requestContext @@ -1070,7 +1062,6 @@ func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { var p TopicReaderSendCommitMessageStartInfo p.CommitsInfo = commitsInfo @@ -1081,7 +1072,6 @@ func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendC res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { var p TopicReaderCommittedNotifyInfo p.ReaderConnectionID = readerConnectionID @@ -1091,7 +1081,6 @@ func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic str p.CommittedOffset = committedOffset t.onReaderCommittedNotify(p) } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) func(closeError error) { var p TopicReaderCloseStartInfo p.ReaderConnectionID = readerConnectionID @@ -1103,7 +1092,6 @@ func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { var p TopicReaderInitStartInfo p.PreInitReaderConnectionID = preInitReaderConnectionID @@ -1116,14 +1104,12 @@ func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestIn res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { var p TopicReaderErrorInfo p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderError(p) } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen int, _ error) func(error) { var p OnReadUpdateTokenStartInfo p.ReaderConnectionID = readerConnectionID @@ -1140,7 +1126,6 @@ func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen } } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { var p TopicReaderSentDataRequestInfo p.ReaderConnectionID = readerConnectionID @@ -1148,7 +1133,6 @@ func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBy p.LocalBufferSizeAfterSent = localBufferSizeAfterSent t.onReaderSentDataRequest(p) } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { var p TopicReaderReceiveDataResponseStartInfo p.ReaderConnectionID = readerConnectionID @@ -1161,7 +1145,6 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo p.RequestContext = requestContext @@ -1182,14 +1165,12 @@ func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCou res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e error) { var p OnReadUnknownGrpcMessageInfo p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderUnknownGrpcMessage(p) } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, producerID string, attempt int) func(error) { var p TopicWriterReconnectStartInfo p.WriterInstanceID = writerInstanceID @@ -1203,7 +1184,6 @@ func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, pro res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { var p TopicWriterInitStreamStartInfo p.WriterInstanceID = writerInstanceID @@ -1217,7 +1197,6 @@ func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, pr res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(error) { var p TopicWriterCloseStartInfo p.WriterInstanceID = writerInstanceID @@ -1229,7 +1208,6 @@ func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(er res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { var p TopicWriterCompressMessagesStartInfo p.WriterInstanceID = writerInstanceID @@ -1245,7 +1223,6 @@ func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { var p TopicWriterSendMessagesStartInfo p.WriterInstanceID = writerInstanceID @@ -1260,7 +1237,6 @@ func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID stri res(p) } } -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo p.WriterInstanceID = writerInstanceID From 92ec5578de9eae50e0ab5eeea4a9aec4d6da6304 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Mon, 20 May 2024 20:22:17 +0300 Subject: [PATCH 4/7] Fix linter --- internal/grpcwrapper/rawtopic/rawtopicreader/messages.go | 6 ++++-- internal/topic/topicreaderinternal/reader.go | 3 ++- topic/topicoptions/topicoptions_reader.go | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go b/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go index 3c2fc6ca9..cea9072d1 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go +++ b/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go @@ -124,8 +124,10 @@ func (r *InitRequest) toProto() *Ydb_Topic.StreamReadMessage_InitRequest { dstTopicSettings.MaxLag = srcTopicSettings.MaxLag.ToProto() dstTopicSettings.ReadFrom = srcTopicSettings.ReadFrom.ToProto() - dstTopicSettings.PartitionIds = make([]int64, len(srcTopicSettings.PartitionsID)) - copy(dstTopicSettings.PartitionIds, srcTopicSettings.PartitionsID) + partitionsIDs := make([]int64, len(srcTopicSettings.PartitionsID)) + copy(partitionsIDs, srcTopicSettings.PartitionsID) + + dstTopicSettings.PartitionIds = partitionsIDs } return p diff --git a/internal/topic/topicreaderinternal/reader.go b/internal/topic/topicreaderinternal/reader.go index e9bff2baa..85bd5ef5c 100644 --- a/internal/topic/topicreaderinternal/reader.go +++ b/internal/topic/topicreaderinternal/reader.go @@ -234,7 +234,7 @@ type ReaderConfig struct { topicStreamReaderConfig } -func (c ReaderConfig) Validate() error { +func (c *ReaderConfig) Validate() error { if c.Consumer != "" && c.ReadWithoutConsumer { return xerrors.WithStackTrace(errSetConsumerAndNoConsumer) } @@ -244,6 +244,7 @@ func (c ReaderConfig) Validate() error { if c.ReadWithoutConsumer && c.CommitMode != CommitModeNone { return xerrors.WithStackTrace(errCantCommitWithoutConsumer) } + return nil } diff --git a/topic/topicoptions/topicoptions_reader.go b/topic/topicoptions/topicoptions_reader.go index 9b00541b1..dfc7661a5 100644 --- a/topic/topicoptions/topicoptions_reader.go +++ b/topic/topicoptions/topicoptions_reader.go @@ -274,6 +274,7 @@ func WithReaderWithoutConsumer(saveStateOnReconnection bool) ReaderOption { if saveStateOnReconnection { panic("ydb: saveStateOnReconnection mode doesn't implemented yet") } + return func(cfg *topicreaderinternal.ReaderConfig) { cfg.ReadWithoutConsumer = true cfg.CommitMode = CommitModeNone From 7e42846199d45a7d76b04e2679b7c068474302ca Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Mon, 20 May 2024 21:21:28 +0300 Subject: [PATCH 5/7] Skip older YDB version --- tests/integration/topic_client_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integration/topic_client_test.go b/tests/integration/topic_client_test.go index a957432ec..81b94eee9 100644 --- a/tests/integration/topic_client_test.go +++ b/tests/integration/topic_client_test.go @@ -17,6 +17,7 @@ import ( ydb "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" @@ -160,6 +161,9 @@ func TestSchemeList(t *testing.T) { func TestReaderWithoutConsumer(t *testing.T) { t.Run("OK", func(t *testing.T) { + if version.Lt(os.Getenv("YDB_VERSION"), "24.1") { + t.Skip("Read topic without consumer implemented since YDB 24.1, test ran for '" + os.Getenv("YDB_VERSION") + "'") + } scope := newScope(t) ctx := scope.Ctx From 19e07e8a248ab2430229f394ac9910f3e926ef23 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 21 May 2024 11:53:40 +0300 Subject: [PATCH 6/7] regenerate topic traces with new version of utility --- trace/topic_gtrace.go | 47 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index dec6f299c..c9d7a3a0c 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -12,9 +12,11 @@ type topicComposeOptions struct { } // TopicOption specified Topic compose option +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals type TopicComposeOption func(o *topicComposeOptions) // WithTopicPanicCallback specified behavior on panic +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func WithTopicPanicCallback(cb func(e interface{})) TopicComposeOption { return func(o *topicComposeOptions) { o.panicCallback = cb @@ -22,6 +24,7 @@ func WithTopicPanicCallback(cb func(e interface{})) TopicComposeOption { } // Compose returns a new Topic which has functional fields composed both from t and x. +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func (t *Topic) Compose(x *Topic, opts ...TopicComposeOption) *Topic { var ret Topic options := topicComposeOptions{} @@ -992,6 +995,8 @@ func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMe } fn(t1) } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { var p TopicReaderStartInfo p.ReaderID = readerID @@ -999,6 +1004,8 @@ func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { p.Error = e t.onReaderStart(p) } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnect(t *Topic, reason error) func(error) { var p TopicReaderReconnectStartInfo p.Reason = reason @@ -1009,12 +1016,16 @@ func TopicOnReaderReconnect(t *Topic, reason error) func(error) { res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { var p TopicReaderReconnectRequestInfo p.Reason = reason p.WasSent = wasSent t.onReaderReconnectRequest(p) } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { var p TopicReaderPartitionReadStartResponseStartInfo p.ReaderConnectionID = readerConnectionID @@ -1031,6 +1042,8 @@ func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { var p TopicReaderPartitionReadStopResponseStartInfo p.ReaderConnectionID = readerConnectionID @@ -1047,6 +1060,8 @@ func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo p.RequestContext = requestContext @@ -1062,6 +1077,8 @@ func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { var p TopicReaderSendCommitMessageStartInfo p.CommitsInfo = commitsInfo @@ -1072,6 +1089,8 @@ func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendC res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { var p TopicReaderCommittedNotifyInfo p.ReaderConnectionID = readerConnectionID @@ -1081,6 +1100,8 @@ func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic str p.CommittedOffset = committedOffset t.onReaderCommittedNotify(p) } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) func(closeError error) { var p TopicReaderCloseStartInfo p.ReaderConnectionID = readerConnectionID @@ -1092,6 +1113,8 @@ func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { var p TopicReaderInitStartInfo p.PreInitReaderConnectionID = preInitReaderConnectionID @@ -1104,12 +1127,16 @@ func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestIn res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { var p TopicReaderErrorInfo p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderError(p) } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen int, _ error) func(error) { var p OnReadUpdateTokenStartInfo p.ReaderConnectionID = readerConnectionID @@ -1126,6 +1153,8 @@ func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen } } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { var p TopicReaderSentDataRequestInfo p.ReaderConnectionID = readerConnectionID @@ -1133,6 +1162,8 @@ func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBy p.LocalBufferSizeAfterSent = localBufferSizeAfterSent t.onReaderSentDataRequest(p) } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { var p TopicReaderReceiveDataResponseStartInfo p.ReaderConnectionID = readerConnectionID @@ -1145,6 +1176,8 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo p.RequestContext = requestContext @@ -1165,12 +1198,16 @@ func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCou res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e error) { var p OnReadUnknownGrpcMessageInfo p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderUnknownGrpcMessage(p) } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, producerID string, attempt int) func(error) { var p TopicWriterReconnectStartInfo p.WriterInstanceID = writerInstanceID @@ -1184,6 +1221,8 @@ func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, pro res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { var p TopicWriterInitStreamStartInfo p.WriterInstanceID = writerInstanceID @@ -1197,6 +1236,8 @@ func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, pr res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(error) { var p TopicWriterCloseStartInfo p.WriterInstanceID = writerInstanceID @@ -1208,6 +1249,8 @@ func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(er res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { var p TopicWriterCompressMessagesStartInfo p.WriterInstanceID = writerInstanceID @@ -1223,6 +1266,8 @@ func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { var p TopicWriterSendMessagesStartInfo p.WriterInstanceID = writerInstanceID @@ -1237,6 +1282,8 @@ func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID stri res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo p.WriterInstanceID = writerInstanceID From 58ac44d1ff56d41873c2ac8017ff9c249a634134 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 21 May 2024 14:00:46 +0300 Subject: [PATCH 7/7] generated file --- trace/topic_gtrace.go | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index c9d7a3a0c..f13294b07 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -995,7 +995,6 @@ func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMe } fn(t1) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { var p TopicReaderStartInfo @@ -1004,7 +1003,6 @@ func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { p.Error = e t.onReaderStart(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnect(t *Topic, reason error) func(error) { var p TopicReaderReconnectStartInfo @@ -1016,7 +1014,6 @@ func TopicOnReaderReconnect(t *Topic, reason error) func(error) { res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { var p TopicReaderReconnectRequestInfo @@ -1024,7 +1021,6 @@ func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { p.WasSent = wasSent t.onReaderReconnectRequest(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { var p TopicReaderPartitionReadStartResponseStartInfo @@ -1042,7 +1038,6 @@ func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { var p TopicReaderPartitionReadStopResponseStartInfo @@ -1060,7 +1055,6 @@ func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo @@ -1077,7 +1071,6 @@ func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { var p TopicReaderSendCommitMessageStartInfo @@ -1089,7 +1082,6 @@ func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendC res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { var p TopicReaderCommittedNotifyInfo @@ -1100,7 +1092,6 @@ func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic str p.CommittedOffset = committedOffset t.onReaderCommittedNotify(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) func(closeError error) { var p TopicReaderCloseStartInfo @@ -1113,7 +1104,6 @@ func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { var p TopicReaderInitStartInfo @@ -1127,7 +1117,6 @@ func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestIn res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { var p TopicReaderErrorInfo @@ -1135,7 +1124,6 @@ func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { p.Error = e t.onReaderError(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen int, _ error) func(error) { var p OnReadUpdateTokenStartInfo @@ -1153,7 +1141,6 @@ func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen } } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { var p TopicReaderSentDataRequestInfo @@ -1162,7 +1149,6 @@ func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBy p.LocalBufferSizeAfterSent = localBufferSizeAfterSent t.onReaderSentDataRequest(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { var p TopicReaderReceiveDataResponseStartInfo @@ -1176,7 +1162,6 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo @@ -1198,7 +1183,6 @@ func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCou res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e error) { var p OnReadUnknownGrpcMessageInfo @@ -1206,7 +1190,6 @@ func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e erro p.Error = e t.onReaderUnknownGrpcMessage(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, producerID string, attempt int) func(error) { var p TopicWriterReconnectStartInfo @@ -1221,7 +1204,6 @@ func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, pro res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { var p TopicWriterInitStreamStartInfo @@ -1236,7 +1218,6 @@ func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, pr res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(error) { var p TopicWriterCloseStartInfo @@ -1249,7 +1230,6 @@ func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(er res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { var p TopicWriterCompressMessagesStartInfo @@ -1266,7 +1246,6 @@ func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { var p TopicWriterSendMessagesStartInfo @@ -1282,7 +1261,6 @@ func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID stri res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo