Skip to content

Commit

Permalink
Merge pull request #1214 Writer flush messages before close from ydb-…
Browse files Browse the repository at this point in the history
…platform/flush-writer-on-close
  • Loading branch information
rekby authored Apr 26, 2024
2 parents 43d8e33 + 859a140 commit b59f008
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Added flush messages from buffer before close topic writer
* Added Flush method for topic writer

## v3.66.0
Expand Down
34 changes: 26 additions & 8 deletions internal/topic/topicwriterinternal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ type messageQueue struct {
closedErr error
acksReceivedEvent xsync.EventBroadcast

m xsync.RWMutex
closed bool
closedChan empty.Chan
lastWrittenIndex int
lastSentIndex int
lastSeqNo int64
m xsync.RWMutex
stopReceiveMessagesReason error
closed bool
closedChan empty.Chan
lastWrittenIndex int
lastSentIndex int
lastSeqNo int64

messagesByOrder map[int]messageWithDataContent
seqNoToOrderID map[int64]int
Expand Down Expand Up @@ -77,8 +78,10 @@ func (q *messageQueue) addMessages(messages []messageWithDataContent, needWaiter
q.m.Lock()
defer q.m.Unlock()

if q.closed {
return waiter, xerrors.WithStackTrace(fmt.Errorf("ydb: add message to closed message queue: %w", q.closedErr))
if q.stopReceiveMessagesReason != nil {
return waiter, xerrors.WithStackTrace(
fmt.Errorf("ydb: add message to closed message queue: %w", q.stopReceiveMessagesReason),
)
}

if err := q.checkNewMessagesBeforeAddNeedLock(messages); err != nil {
Expand Down Expand Up @@ -181,6 +184,19 @@ func (q *messageQueue) ackReceivedNeedLock(seqNo int64) error {
return nil
}

func (q *messageQueue) StopAddNewMessages(reason error) {
q.m.Lock()
defer q.m.Unlock()

q.stopAddNewMessagesNeedLock(reason)
}

func (q *messageQueue) stopAddNewMessagesNeedLock(reason error) {
if q.stopReceiveMessagesReason == nil {
q.stopReceiveMessagesReason = reason
}
}

func (q *messageQueue) Close(err error) error {
isFirstTimeClosed := false
q.m.Lock()
Expand All @@ -193,6 +209,8 @@ func (q *messageQueue) Close(err error) error {
}
}()

q.stopAddNewMessagesNeedLock(err)

if q.closed {
return xerrors.WithStackTrace(errCloseClosedMessageQueue)
}
Expand Down
20 changes: 17 additions & 3 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,17 @@ func (w *WriterReconnector) Flush(ctx context.Context) error {
}

func (w *WriterReconnector) Close(ctx context.Context) error {
return w.close(ctx, xerrors.WithStackTrace(errStopWriterReconnector))
reason := xerrors.WithStackTrace(errStopWriterReconnector)
w.queue.StopAddNewMessages(reason)

flushErr := w.Flush(ctx)
closeErr := w.close(ctx, reason)

if flushErr != nil {
return flushErr
}

return closeErr
}

func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr error) {
Expand All @@ -339,9 +349,13 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err
onDone(resErr)
}()

resErr = w.queue.Close(reason)
closeErr := w.queue.Close(reason)
if resErr == nil && closeErr != nil {
resErr = closeErr
}

bgErr := w.background.Close(ctx, reason)
if resErr == nil {
if resErr == nil && bgErr != nil {
resErr = bgErr
}

Expand Down
139 changes: 90 additions & 49 deletions internal/topic/topicwriterinternal/writer_reconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func TestWriterImpl_WriteCodecs(t *testing.T) {
Data: bytes.NewReader(messContent),
}}))

require.Equal(t, rawtopiccommon.CodecRaw, <-messReceived)
mess := <-messReceived
require.Equal(t, rawtopiccommon.CodecRaw, mess)
})
t.Run("ForceGzip", func(t *testing.T) {
var err error
Expand Down Expand Up @@ -531,67 +532,107 @@ func TestWriterImpl_Reconnect(t *testing.T) {
}

func TestWriterImpl_CloseWithFlush(t *testing.T) {
e := newTestEnv(t, nil)
type flushMethod func(ctx context.Context, writer *WriterReconnector) error

messageTime := time.Date(2023, 9, 7, 11, 34, 0, 0, time.UTC)
messageData := []byte("123")
f := func(t testing.TB, flush flushMethod) {
e := newTestEnv(t, nil)

const seqNo = 36
messageTime := time.Date(2023, 9, 7, 11, 34, 0, 0, time.UTC)
messageData := []byte("123")

writeCompleted := make(empty.Chan)
e.stream.EXPECT().Send(&rawtopicwriter.WriteRequest{
Messages: []rawtopicwriter.MessageData{
{
SeqNo: seqNo,
CreatedAt: messageTime,
UncompressedSize: int64(len(messageData)),
Partitioning: rawtopicwriter.Partitioning{},
Data: messageData,
const seqNo = 36

writeCompleted := make(empty.Chan)
e.stream.EXPECT().Send(&rawtopicwriter.WriteRequest{
Messages: []rawtopicwriter.MessageData{
{
SeqNo: seqNo,
CreatedAt: messageTime,
UncompressedSize: int64(len(messageData)),
Partitioning: rawtopicwriter.Partitioning{},
Data: messageData,
},
},
},
Codec: rawtopiccommon.CodecRaw,
}).Return(nil)
Codec: rawtopiccommon.CodecRaw,
}).Return(nil)

closeCompleted := make(empty.Chan)
go func() {
err := e.writer.Write(e.ctx, []PublicMessage{{
SeqNo: seqNo,
CreatedAt: messageTime,
Data: bytes.NewReader(messageData),
}})
close(writeCompleted)
require.NoError(t, err)
}()
flushCompleted := make(empty.Chan)
go func() {
err := e.writer.Write(e.ctx, []PublicMessage{{
SeqNo: seqNo,
CreatedAt: messageTime,
Data: bytes.NewReader(messageData),
}})
close(writeCompleted)
require.NoError(t, err)
}()

<-writeCompleted
<-writeCompleted

go func() {
require.NoError(t, e.writer.Flush(e.ctx))
require.NoError(t, e.writer.Close(e.ctx))
close(closeCompleted)
}()
go func() {
require.NoError(t, flush(e.ctx, e.writer))
close(flushCompleted)
}()

select {
case <-closeCompleted:
t.Fatal("flush and close must complete only after message is acked")
case <-time.After(100 * time.Millisecond):
// pass
}
select {
case <-flushCompleted:
t.Fatal("flush and close must complete only after message is acked")
case <-time.After(10 * time.Millisecond):
// pass
}

e.sendFromServer(&rawtopicwriter.WriteResult{
Acks: []rawtopicwriter.WriteAck{
{
SeqNo: seqNo,
MessageWriteStatus: rawtopicwriter.MessageWriteStatus{
Type: rawtopicwriter.WriteStatusTypeWritten,
WrittenOffset: 4,
e.sendFromServer(&rawtopicwriter.WriteResult{
Acks: []rawtopicwriter.WriteAck{
{
SeqNo: seqNo,
MessageWriteStatus: rawtopicwriter.MessageWriteStatus{
Type: rawtopicwriter.WriteStatusTypeWritten,
WrittenOffset: 4,
},
},
},
PartitionID: e.partitionID,
})

xtest.WaitChannelClosed(t, flushCompleted)
}

tests := []struct {
name string
flush flushMethod
}{
{
name: "close",
flush: func(ctx context.Context, writer *WriterReconnector) error {
return writer.Close(ctx)
},
},
PartitionID: e.partitionID,
})
{
name: "flush",
flush: func(ctx context.Context, writer *WriterReconnector) error {
return writer.Close(ctx)
},
},
{
name: "flush and close",
flush: func(ctx context.Context, writer *WriterReconnector) error {
err := writer.Flush(ctx)
if err != nil {
return err
}

return writer.Close(ctx)
},
},
}

xtest.WaitChannelClosed(t, closeCompleted)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
f(t, test.flush)
})
})
}
}

func TestAllMessagesHasSameBufCodec(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (scope *scopeT) Driver(opts ...ydb.Option) *ydb.Driver {
)...,
)
clean := func() {
scope.Require.NoError(driver.Close(scope.Ctx))
if driver != nil {
scope.Require.NoError(driver.Close(scope.Ctx))
}
}

return fixenv.NewGenericResultWithCleanup(driver, clean), err
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/topic_read_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,32 @@ func TestTopicWriterWithManualPartitionSelect(t *testing.T) {
require.NoError(t, err)
}

func TestWriterFlushMessagesBeforeClose(t *testing.T) {
s := newScope(t)
ctx := s.Ctx
writer, err := s.Driver().Topic().StartWriter(s.TopicPath(), topicoptions.WithWriterWaitServerAck(false))
require.NoError(t, err)

count := 1000
for i := 0; i < count; i++ {
require.NoError(t, writer.Write(ctx, topicwriter.Message{Data: strings.NewReader(strconv.Itoa(i))}))
}
require.NoError(t, writer.Close(ctx))

for i := 0; i < count; i++ {
readCtx, cancel := context.WithTimeout(ctx, time.Second)
mess, err := s.TopicReader().ReadMessage(readCtx)
cancel()
require.NoError(t, err)

messBody, err := io.ReadAll(mess)
require.NoError(t, err)
messBodyString := string(messBody)
require.Equal(t, strconv.Itoa(i), messBodyString)
cancel()
}
}

var topicCounter int

func createTopic(ctx context.Context, t testing.TB, db *ydb.Driver) (topicPath string) {
Expand Down
2 changes: 2 additions & 0 deletions topic/topicwriter/topicwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (w *Writer) WaitInitInfo(ctx context.Context) (info PublicInitialInfo, err
return publicInfo, nil
}

// Close will flush rested messages from buffer and close the writer.
// You can't write new messages after call Close
func (w *Writer) Close(ctx context.Context) error {
return w.inner.Close(ctx)
}
Expand Down

0 comments on commit b59f008

Please sign in to comment.