From 3a853209d066600698d7f7c9217b5894020c8ff0 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 25 Sep 2024 20:54:34 +0300 Subject: [PATCH 1/3] special error for detect about written messages can be delivered to the server when write error happened. --- .../topicwriterinternal/writer_reconnector.go | 33 ++++++++++--------- .../writer_reconnector_test.go | 23 +++++++++++-- topic/topicwriter/topicwriter.go | 6 ++-- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index d78af8fa6..4551c2a3b 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -31,14 +31,14 @@ import ( ) var ( - errConnTimeout = xerrors.Wrap(errors.New("ydb: connection timeout")) - errStopWriterReconnector = xerrors.Wrap(errors.New("ydb: stop writer reconnector")) - errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) - errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll - errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic")) - errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) - PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full")) - errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll + errConnTimeout = xerrors.Wrap(errors.New("ydb: connection timeout")) + errStopWriterReconnector = xerrors.Wrap(errors.New("ydb: stop writer reconnector")) + errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) + errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll + errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic")) + errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) + PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("ydb: the messages was put to internal buffer before the error happened. It mean about the messages can be delivered to the server")) + errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll // errProducerIDNotEqualMessageGroupID is temporary // WithMessageGroupID is optional parameter because it allowed to be skipped by protocol. @@ -209,7 +209,7 @@ func (w *WriterReconnector) start() { w.background.Start(name+", sendloop", w.connectionLoop) } -func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) error { +func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) (resErr error) { if err := w.background.CloseReason(); err != nil { return xerrors.WithStackTrace(fmt.Errorf("ydb: writer is closed: %w", err)) } @@ -223,18 +223,16 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) semaphoreWeight := int64(len(messages)) if semaphoreWeight > int64(w.cfg.MaxQueueLen) { return xerrors.WithStackTrace(fmt.Errorf( - "ydb: add more messages, then max queue limit. max queue: %v, try to add: %v: %w", + "ydb: add more messages, then max queue limit. max queue: %v, try to add: %v", w.cfg.MaxQueueLen, semaphoreWeight, - PublicErrQueueIsFull, )) } if err := w.semaphore.Acquire(ctx, semaphoreWeight); err != nil { return xerrors.WithStackTrace( - fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v: %w", + fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v", semaphoreWeight, w.cfg.MaxQueueLen, - PublicErrQueueIsFull, )) } defer func() { @@ -254,10 +252,15 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) return err } - waiter, err := w.processMessagesWithLock(messagesSlice, &semaphoreWeight) + waiter, err := w.addMessageToInternalQueueWithLock(messagesSlice, &semaphoreWeight) if err != nil { return err } + defer func() { + if resErr != nil { + resErr = xerrors.Join(resErr, PublicErrMessagesPutToInternalQueueBeforeError) + } + }() if !w.cfg.WaitServerAck { return nil @@ -266,7 +269,7 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) return w.queue.Wait(ctx, waiter) } -func (w *WriterReconnector) processMessagesWithLock( +func (w *WriterReconnector) addMessageToInternalQueueWithLock( messagesSlice []messageWithDataContent, semaphoreWeight *int64, ) (MessageQueueAckWaiter, error) { diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index e70477491..dc89dcafb 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -306,9 +306,8 @@ func TestWriterReconnector_Write_QueueLimit(t *testing.T) { ctxNoQueueSpaceCancel() }() err = w.Write(ctxNoQueueSpace, newTestMessages(3)) - if !errors.Is(err, PublicErrQueueIsFull) { - require.ErrorIs(t, err, PublicErrQueueIsFull) - } + require.Error(t, err) + require.NotErrorIs(t, err, PublicErrMessagesPutToInternalQueueBeforeError) go func() { waitStartQueueWait(1) @@ -325,6 +324,24 @@ func TestWriterReconnector_Write_QueueLimit(t *testing.T) { }) } +func TestMessagesPutToInternalQueueBeforeError(t *testing.T) { + ctx := xtest.Context(t) + w := newWriterReconnectorStopped(NewWriterReconnectorConfig( + WithAutoSetSeqNo(false), + WithMaxQueueLen(2), + WithWaitAckOnWrite(true), + )) + w.firstConnectionHandled.Store(true) + + ctxCancel, cancel := context.WithCancel(ctx) + go func() { + <-w.queue.hasNewMessages + cancel() + }() + err := w.Write(ctxCancel, newTestMessages(1)) + require.ErrorIs(t, err, PublicErrMessagesPutToInternalQueueBeforeError) +} + func TestEnv(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { env := newTestEnv(t, nil) diff --git a/topic/topicwriter/topicwriter.go b/topic/topicwriter/topicwriter.go index 4fa3f6c90..96d39e9c9 100644 --- a/topic/topicwriter/topicwriter.go +++ b/topic/topicwriter/topicwriter.go @@ -10,7 +10,7 @@ type ( Message = topicwriterinternal.PublicMessage ) -var ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull +var ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError // Writer represent write session to topic // It handles connection problems, reconnect to server when need and resend buffered messages @@ -38,7 +38,9 @@ func NewWriter(writer *topicwriterinternal.WriterReconnector) *Writer { // especially when connection has problems. // // It returns ErrQueueLimitExceed (must be checked by errors.Is) -// if ctx cancelled before messages put to internal buffer or try to add more messages, that can be put to queue +// if ctx cancelled before messages put to internal buffer or try to add more messages, that can be put to queue. +// If err != nil you can check errors.Is(err, ErrMessagesPutToInternalQueueBeforeError) for check if the messages +// put to buffer before error. It means that it is messages can be delivered to the server. func (w *Writer) Write(ctx context.Context, messages ...Message) error { return w.inner.Write(ctx, messages) } From 8429cb3167f507caf6179950ffa14bd2b6c4b9a0 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 25 Sep 2024 20:55:34 +0300 Subject: [PATCH 2/3] special error for detect about written messages can be delivered to the server when write error happened. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 27ef3b408..8846f366d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Added error ErrMessagesPutToInternalQueueBeforeError to topic writer * Added write to topics within transactions ## v3.80.10 From e4df39e641d1514dc4fd8221e8dac685ec6bccc2 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 25 Sep 2024 21:08:34 +0300 Subject: [PATCH 3/3] fix linters --- internal/topic/topicwriterinternal/writer_reconnector.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 4551c2a3b..3ab1dad4c 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -33,11 +33,11 @@ import ( var ( errConnTimeout = xerrors.Wrap(errors.New("ydb: connection timeout")) errStopWriterReconnector = xerrors.Wrap(errors.New("ydb: stop writer reconnector")) - errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) + errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) //nolint:lll errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic")) - errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) - PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("ydb: the messages was put to internal buffer before the error happened. It mean about the messages can be delivered to the server")) + errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll + PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("ydb: the messages was put to internal buffer before the error happened. It mean about the messages can be delivered to the server")) //nolint:lll errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll // errProducerIDNotEqualMessageGroupID is temporary