From 859a140e61cd994ec8b5af50b311d5308198caf8 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 26 Apr 2024 15:35:17 +0300 Subject: [PATCH] Fix tests hungup --- .../topicwriterinternal/writer_reconnector.go | 16 +++++++++++----- .../writer_reconnector_test.go | 3 ++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index a6259bfb1..66d646291 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -330,7 +330,17 @@ func (w *WriterReconnector) Flush(ctx context.Context) error { } func (w *WriterReconnector) Close(ctx context.Context) error { - return w.close(ctx, xerrors.WithStackTrace(errStopWriterReconnector)) + reason := xerrors.WithStackTrace(errStopWriterReconnector) + w.queue.StopAddNewMessages(reason) + + flushErr := w.Flush(ctx) + closeErr := w.close(ctx, reason) + + if flushErr != nil { + return flushErr + } + + return closeErr } func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr error) { @@ -339,10 +349,6 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err onDone(resErr) }() - w.queue.StopAddNewMessages(reason) - - resErr = w.Flush(ctx) - closeErr := w.queue.Close(reason) if resErr == nil && closeErr != nil { resErr = closeErr diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 195f80af8..70630d74d 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -196,7 +196,8 @@ func TestWriterImpl_WriteCodecs(t *testing.T) { Data: bytes.NewReader(messContent), }})) - require.Equal(t, rawtopiccommon.CodecRaw, <-messReceived) + mess := <-messReceived + require.Equal(t, rawtopiccommon.CodecRaw, mess) }) t.Run("ForceGzip", func(t *testing.T) { var err error