Skip to content

Commit

Permalink
Merge pull request #1484 from ydb-platform/fix-err-buffer
Browse files Browse the repository at this point in the history
special error for detect about written messages can be delivered to the server when write error happened.
  • Loading branch information
rekby authored Sep 25, 2024
2 parents 98f75f0 + e4df39e commit 2fa470e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Added error ErrMessagesPutToInternalQueueBeforeError to topic writer
* Added write to topics within transactions

## v3.80.10
Expand Down
33 changes: 18 additions & 15 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ import (
)

var (
errConnTimeout = xerrors.Wrap(errors.New("ydb: connection timeout"))
errStopWriterReconnector = xerrors.Wrap(errors.New("ydb: stop writer reconnector"))
errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode"))
errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll
errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic"))
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit"))
PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full"))
errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll
errConnTimeout = xerrors.Wrap(errors.New("ydb: connection timeout"))
errStopWriterReconnector = xerrors.Wrap(errors.New("ydb: stop writer reconnector"))
errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) //nolint:lll
errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll
errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic"))
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll
PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("ydb: the messages was put to internal buffer before the error happened. It mean about the messages can be delivered to the server")) //nolint:lll
errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll

// errProducerIDNotEqualMessageGroupID is temporary
// WithMessageGroupID is optional parameter because it allowed to be skipped by protocol.
Expand Down Expand Up @@ -209,7 +209,7 @@ func (w *WriterReconnector) start() {
w.background.Start(name+", sendloop", w.connectionLoop)
}

func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) error {
func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) (resErr error) {
if err := w.background.CloseReason(); err != nil {
return xerrors.WithStackTrace(fmt.Errorf("ydb: writer is closed: %w", err))
}
Expand All @@ -223,18 +223,16 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage)
semaphoreWeight := int64(len(messages))
if semaphoreWeight > int64(w.cfg.MaxQueueLen) {
return xerrors.WithStackTrace(fmt.Errorf(
"ydb: add more messages, then max queue limit. max queue: %v, try to add: %v: %w",
"ydb: add more messages, then max queue limit. max queue: %v, try to add: %v",
w.cfg.MaxQueueLen,
semaphoreWeight,
PublicErrQueueIsFull,
))
}
if err := w.semaphore.Acquire(ctx, semaphoreWeight); err != nil {
return xerrors.WithStackTrace(
fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v: %w",
fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v",
semaphoreWeight,
w.cfg.MaxQueueLen,
PublicErrQueueIsFull,
))
}
defer func() {
Expand All @@ -254,10 +252,15 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage)
return err
}

waiter, err := w.processMessagesWithLock(messagesSlice, &semaphoreWeight)
waiter, err := w.addMessageToInternalQueueWithLock(messagesSlice, &semaphoreWeight)
if err != nil {
return err
}
defer func() {
if resErr != nil {
resErr = xerrors.Join(resErr, PublicErrMessagesPutToInternalQueueBeforeError)
}
}()

if !w.cfg.WaitServerAck {
return nil
Expand All @@ -266,7 +269,7 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage)
return w.queue.Wait(ctx, waiter)
}

func (w *WriterReconnector) processMessagesWithLock(
func (w *WriterReconnector) addMessageToInternalQueueWithLock(
messagesSlice []messageWithDataContent,
semaphoreWeight *int64,
) (MessageQueueAckWaiter, error) {
Expand Down
23 changes: 20 additions & 3 deletions internal/topic/topicwriterinternal/writer_reconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,8 @@ func TestWriterReconnector_Write_QueueLimit(t *testing.T) {
ctxNoQueueSpaceCancel()
}()
err = w.Write(ctxNoQueueSpace, newTestMessages(3))
if !errors.Is(err, PublicErrQueueIsFull) {
require.ErrorIs(t, err, PublicErrQueueIsFull)
}
require.Error(t, err)
require.NotErrorIs(t, err, PublicErrMessagesPutToInternalQueueBeforeError)

go func() {
waitStartQueueWait(1)
Expand All @@ -325,6 +324,24 @@ func TestWriterReconnector_Write_QueueLimit(t *testing.T) {
})
}

func TestMessagesPutToInternalQueueBeforeError(t *testing.T) {
ctx := xtest.Context(t)
w := newWriterReconnectorStopped(NewWriterReconnectorConfig(
WithAutoSetSeqNo(false),
WithMaxQueueLen(2),
WithWaitAckOnWrite(true),
))
w.firstConnectionHandled.Store(true)

ctxCancel, cancel := context.WithCancel(ctx)
go func() {
<-w.queue.hasNewMessages
cancel()
}()
err := w.Write(ctxCancel, newTestMessages(1))
require.ErrorIs(t, err, PublicErrMessagesPutToInternalQueueBeforeError)
}

func TestEnv(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
env := newTestEnv(t, nil)
Expand Down
6 changes: 4 additions & 2 deletions topic/topicwriter/topicwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type (
Message = topicwriterinternal.PublicMessage
)

var ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull
var ErrMessagesPutToInternalQueueBeforeError = topicwriterinternal.PublicErrMessagesPutToInternalQueueBeforeError

// Writer represent write session to topic
// It handles connection problems, reconnect to server when need and resend buffered messages
Expand Down Expand Up @@ -38,7 +38,9 @@ func NewWriter(writer *topicwriterinternal.WriterReconnector) *Writer {
// especially when connection has problems.
//
// It returns ErrQueueLimitExceed (must be checked by errors.Is)
// if ctx cancelled before messages put to internal buffer or try to add more messages, that can be put to queue
// if ctx cancelled before messages put to internal buffer or try to add more messages, that can be put to queue.
// If err != nil you can check errors.Is(err, ErrMessagesPutToInternalQueueBeforeError) for check if the messages
// put to buffer before error. It means that it is messages can be delivered to the server.
func (w *Writer) Write(ctx context.Context, messages ...Message) error {
return w.inner.Write(ctx, messages)
}
Expand Down

0 comments on commit 2fa470e

Please sign in to comment.