From 46851729bda3eb1662c77f5e144bc092e70cdb55 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 14 Dec 2023 17:24:38 +0300 Subject: [PATCH 1/5] add metadata to messages --- CHANGELOG.md | 1 + examples/go.mod | 2 +- examples/go.sum | 1 + go.mod | 2 +- go.sum | 4 +- .../rawtopiccommon/message_metadata.go | 6 ++ .../rawtopic/rawtopicreader/messages.go | 10 +++ .../rawtopic/rawtopicwriter/messages.go | 9 ++ internal/topic/topicreaderinternal/batch.go | 7 ++ internal/topic/topicreaderinternal/message.go | 10 +++ .../stream_reader_impl_test.go | 88 ++++++++++++++++++- .../topic/topicwriterinternal/encoders.go | 9 +- .../topicwriterinternal/encoders_test.go | 24 ++--- internal/topic/topicwriterinternal/message.go | 37 ++++++-- internal/topic/topicwriterinternal/writer.go | 2 +- .../topicwriterinternal/writer_reconnector.go | 17 +++- .../writer_reconnector_test.go | 57 +++++++++--- .../writer_stream_interface.go | 2 +- .../writer_stream_interface_mock_test.go | 2 +- .../topic/topicwriterinternal/writer_test.go | 4 +- internal/xbytes/clone.go | 14 +++ internal/xbytes/clone_go1.20.go | 13 +++ tests/integration/helpers_test.go | 66 ++++++++++++++ tests/integration/topic_read_writer_test.go | 42 +++++++++ tests/slo/go.mod | 2 +- tests/slo/go.sum | 1 + topic/topicwriter/topicwriter.go | 2 +- 27 files changed, 384 insertions(+), 50 deletions(-) create mode 100644 internal/grpcwrapper/rawtopic/rawtopiccommon/message_metadata.go create mode 100644 internal/xbytes/clone.go create mode 100644 internal/xbytes/clone_go1.20.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 102f73eb7..baeeb4d58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Added per message metadata support for topic api * Context for call options now have same lifetime as driver (previous - same lifetime as context for call Open function). * Extended metrics (fill database.sql callbacks, recognize TLI error) * Refactored config prefix in metrics diff --git a/examples/go.mod b/examples/go.mod index 1984047c3..085257f95 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -49,7 +49,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect github.com/yandex-cloud/go-genproto v0.0.0-20220815090733-4c139c0154e2 // indirect - github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd // indirect + github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a // indirect github.com/ydb-platform/ydb-go-sdk-metrics v0.16.3 // indirect github.com/ydb-platform/ydb-go-yc-metadata v0.5.4 // indirect golang.org/x/crypto v0.13.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index b804b0da3..37a8c5a73 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -1197,6 +1197,7 @@ github.com/ydb-platform/xorm v0.0.3 h1:MXk42lANB6r/MMLg/XdJfyXJycGUDlCeLiMlLGDKV github.com/ydb-platform/xorm v0.0.3/go.mod h1:hFsU7EUF0o3S+l5c0eyP2yPVjJ0d4gsFdqCsyazzwBc= github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE= github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 h1:EYSI1kulnHb0H0zt3yOw4cRj4ABMSMGwNe43D+fX7e4= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2/go.mod h1:Xfjce+VMU9yJVr1lj60yK2fFPWjB4jr/4cp3K7cjzi4= github.com/ydb-platform/ydb-go-sdk-metrics v0.16.3 h1:30D5jErLAiGjchVG2D9JiCLbST5LpAiyS7DoUtHkWsU= diff --git a/go.mod b/go.mod index d0a2647ce..809750187 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.4.1 github.com/google/uuid v1.3.0 github.com/jonboulle/clockwork v0.3.0 - github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd + github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a golang.org/x/sync v0.3.0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/grpc v1.57.1 diff --git a/go.sum b/go.sum index 0c37233e8..7720358a3 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a h1:9wx+kCrCQCdwmDe1AFW5yAHdzlo+RV7lcy6y7Zq661s= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/internal/grpcwrapper/rawtopic/rawtopiccommon/message_metadata.go b/internal/grpcwrapper/rawtopic/rawtopiccommon/message_metadata.go new file mode 100644 index 000000000..33a67d4c1 --- /dev/null +++ b/internal/grpcwrapper/rawtopic/rawtopiccommon/message_metadata.go @@ -0,0 +1,6 @@ +package rawtopiccommon + +type MetadataItem struct { + Key string + Value []byte +} diff --git a/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go b/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go index 02775a89f..a29d48f80 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go +++ b/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go @@ -245,6 +245,15 @@ func (r *ReadResponse) fromProto(p *Ydb_Topic.StreamReadMessage_ReadResponse) er dstMessage.Data = srcMessage.Data dstMessage.UncompressedSize = srcMessage.UncompressedSize dstMessage.MessageGroupID = srcMessage.MessageGroupId + if len(srcMessage.MetadataItems) > 0 { + dstMessage.MetadataItems = make([]rawtopiccommon.MetadataItem, 0, len(srcMessage.MetadataItems)) + for _, protoItem := range srcMessage.MetadataItems { + dstMessage.MetadataItems = append(dstMessage.MetadataItems, rawtopiccommon.MetadataItem{ + Key: protoItem.Key, + Value: protoItem.Value[:len(protoItem.Value):len(protoItem.Value)], + }) + } + } } } } @@ -273,6 +282,7 @@ type MessageData struct { Data []byte UncompressedSize int64 MessageGroupID string + MetadataItems []rawtopiccommon.MetadataItem } // diff --git a/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go b/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go index 48f505a57..7c4b466ce 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go +++ b/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go @@ -171,6 +171,7 @@ type MessageData struct { CreatedAt time.Time UncompressedSize int64 Partitioning Partitioning + MetadataItems []rawtopiccommon.MetadataItem Data []byte } @@ -185,6 +186,14 @@ func (d *MessageData) ToProto() (*Ydb_Topic.StreamWriteMessage_WriteRequest_Mess if err != nil { return nil, err } + + for _, item := range d.MetadataItems { + res.MetadataItems = append(res.MetadataItems, &Ydb_Topic.MetadataItem{ + Key: item.Key, + Value: item.Value, + }) + } + return res, nil } diff --git a/internal/topic/topicreaderinternal/batch.go b/internal/topic/topicreaderinternal/batch.go index 6665cd4ef..f1edfab3a 100644 --- a/internal/topic/topicreaderinternal/batch.go +++ b/internal/topic/topicreaderinternal/batch.go @@ -86,6 +86,13 @@ func newBatchFromStream( dstMess.commitRange.commitOffsetStart = prevOffset + 1 dstMess.commitRange.commitOffsetEnd = sMess.Offset + 1 + if len(sMess.MetadataItems) > 0 { + dstMess.Metadata = make(map[string][]byte, len(sMess.MetadataItems)) + for _, item := range sMess.MetadataItems { + dstMess.Metadata[item.Key] = item.Value + } + } + prevOffset = sMess.Offset } diff --git a/internal/topic/topicreaderinternal/message.go b/internal/topic/topicreaderinternal/message.go index 77bfe007c..57aa89c5a 100644 --- a/internal/topic/topicreaderinternal/message.go +++ b/internal/topic/topicreaderinternal/message.go @@ -9,6 +9,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xbytes" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" ) @@ -28,6 +29,7 @@ type PublicMessage struct { Offset int64 WrittenAt time.Time ProducerID string + Metadata map[string][]byte // Metadata, nil if no metadata commitRange commitRange data oneTimeReader @@ -135,6 +137,14 @@ func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBu return pmb } +func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder { + pmb.mess.Metadata = make(map[string][]byte, len(metadata)) + for key, val := range metadata { + pmb.mess.Metadata[key] = xbytes.Clone(val) + } + return pmb +} + // MessageGroupID set message MessageGroupID func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder { pmb.mess.MessageGroupID = messageGroupID diff --git a/internal/topic/topicreaderinternal/stream_reader_impl_test.go b/internal/topic/topicreaderinternal/stream_reader_impl_test.go index f63526b2e..9f76fae9a 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl_test.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl_test.go @@ -583,7 +583,7 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) { prevOffset := e.partitionSession.lastReceivedMessageOffset() sendDataRequestCompleted := make(empty.Chan) - dataSize := 4 + dataSize := 6 e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize}).Do(func(_ interface{}) { close(sendDataRequestCompleted) }) @@ -639,17 +639,60 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) { }, }, }, + { + Codec: rawtopiccommon.CodecRaw, + WriteSessionMeta: map[string]string{"a": "b", "c": "d"}, + WrittenAt: testTime(7), + MessageData: []rawtopicreader.MessageData{ + { + Offset: prevOffset + 30, + SeqNo: 5, + CreatedAt: testTime(5), + Data: []byte("test"), + UncompressedSize: 4, + MessageGroupID: "1", + MetadataItems: []rawtopiccommon.MetadataItem{ + { + Key: "first", + Value: []byte("first-value"), + }, + { + Key: "second", + Value: []byte("second-value"), + }, + }, + }, + { + Offset: prevOffset + 31, + SeqNo: 6, + CreatedAt: testTime(5), + Data: []byte("4567"), + UncompressedSize: 4, + MessageGroupID: "1", + MetadataItems: []rawtopiccommon.MetadataItem{ + { + Key: "doubled-key", + Value: []byte("bad"), + }, + { + Key: "doubled-key", + Value: []byte("good"), + }, + }, + }, + }, + }, }, }, }, }, ) - expectedData := [][]byte{[]byte("123"), []byte("4567"), []byte("098"), []byte("0987")} + expectedData := [][]byte{[]byte("123"), []byte("4567"), []byte("098"), []byte("0987"), []byte("test"), []byte("4567")} expectedBatch := &PublicBatch{ commitRange: commitRange{ commitOffsetStart: prevOffset + 1, - commitOffsetEnd: prevOffset + 21, + commitOffsetEnd: prevOffset + 32, partitionSession: e.partitionSession, }, Messages: []*PublicMessage{ @@ -713,11 +756,48 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) { partitionSession: e.partitionSession, }, }, + { + SeqNo: 5, + CreatedAt: testTime(5), + MessageGroupID: "1", + Metadata: map[string][]byte{ + "first": []byte("first-value"), + "second": []byte("second-value"), + }, + Offset: prevOffset.ToInt64() + 30, + WrittenAt: testTime(7), + WriteSessionMetadata: map[string]string{"a": "b", "c": "d"}, + UncompressedSize: 4, + rawDataLen: 4, + commitRange: commitRange{ + commitOffsetStart: prevOffset + 21, + commitOffsetEnd: prevOffset + 31, + partitionSession: e.partitionSession, + }, + }, + { + SeqNo: 6, + CreatedAt: testTime(5), + MessageGroupID: "1", + Metadata: map[string][]byte{ + "doubled-key": []byte("good"), + }, + Offset: prevOffset.ToInt64() + 31, + WrittenAt: testTime(7), + WriteSessionMetadata: map[string]string{"a": "b", "c": "d"}, + UncompressedSize: 4, + rawDataLen: 4, + commitRange: commitRange{ + commitOffsetStart: prevOffset + 31, + commitOffsetEnd: prevOffset + 32, + partitionSession: e.partitionSession, + }, + }, }, } opts := newReadMessageBatchOptions() - opts.MinCount = 4 + opts.MinCount = 6 batch, err := e.reader.ReadMessageBatch(e.ctx, opts) require.NoError(t, err) diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index 1043a2c02..60eb3e2dc 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -119,7 +119,7 @@ func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (r len(messages), trace.TopicWriterCompressMessagesReasonCompressData, ) - err = readInParallelWithCodec(messages, codec, s.parallelCompressors) + err = cacheMessages(messages, codec, s.parallelCompressors) onCompressDone(err) } return codec, err @@ -188,7 +188,7 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt len(messages), trace.TopicWriterCompressMessagesReasonCodecsMeasure, ) - err := readInParallelWithCodec(messages, codec, s.parallelCompressors) + err := cacheMessages(messages, codec, s.parallelCompressors) onCompressDone(err) if err != nil { return codecUnknown, err @@ -215,8 +215,7 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt return s.allowedCodecs[minSizeIndex], nil } -func readInParallelWithCodec(messages []messageWithDataContent, codec rawtopiccommon.Codec, parallel int) error { - workerCount := parallel +func cacheMessages(messages []messageWithDataContent, codec rawtopiccommon.Codec, workerCount int) error { if len(messages) < workerCount { workerCount = len(messages) } @@ -253,7 +252,7 @@ func readInParallelWithCodec(messages []messageWithDataContent, codec rawtopicco if localErr != nil { return } - _, localErr = task.GetEncodedBytes(codec) + localErr = task.CacheMessageData(codec) if localErr != nil { resErrMutex.WithLock(func() { resErr = localErr diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index feecc6d74..63b929068 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -51,13 +51,13 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { var messages []messageWithDataContent for i := 0; i < smallCount; i++ { data := make([]byte, smallSize) - message := newMessageDataWithContent(Message{Data: bytes.NewReader(data)}, testCommonEncoders) + message := newMessageDataWithContent(PublicMessage{Data: bytes.NewReader(data)}, testCommonEncoders) messages = append(messages, message) } for i := 0; i < largeCount; i++ { data := make([]byte, largeSize) - message := newMessageDataWithContent(Message{Data: bytes.NewReader(data)}, testCommonEncoders) + message := newMessageDataWithContent(PublicMessage{Data: bytes.NewReader(data)}, testCommonEncoders) messages = append(messages, message) } @@ -152,30 +152,30 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { func TestCompressMessages(t *testing.T) { t.Run("NoMessages", func(t *testing.T) { - require.NoError(t, readInParallelWithCodec(nil, rawtopiccommon.CodecRaw, 1)) + require.NoError(t, cacheMessages(nil, rawtopiccommon.CodecRaw, 1)) }) t.Run("RawOk", func(t *testing.T) { messages := newTestMessagesWithContent(1) - require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecRaw, 1)) + require.NoError(t, cacheMessages(messages, rawtopiccommon.CodecRaw, 1)) }) t.Run("RawError", func(t *testing.T) { - mess := newMessageDataWithContent(Message{}, testCommonEncoders) + mess := newMessageDataWithContent(PublicMessage{}, testCommonEncoders) _, err := mess.GetEncodedBytes(rawtopiccommon.CodecGzip) require.NoError(t, err) messages := []messageWithDataContent{mess} - require.Error(t, readInParallelWithCodec(messages, rawtopiccommon.CodecRaw, 1)) + require.Error(t, cacheMessages(messages, rawtopiccommon.CodecRaw, 1)) }) const messageCount = 10 t.Run("GzipOneThread", func(t *testing.T) { var messages []messageWithDataContent for i := 0; i < messageCount; i++ { - mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders) + mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders) messages = append(messages, mess) } - require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecGzip, 1)) + require.NoError(t, cacheMessages(messages, rawtopiccommon.CodecGzip, 1)) for i := 0; i < messageCount; i++ { require.Equal(t, rawtopiccommon.CodecGzip, messages[i].bufCodec) } @@ -185,11 +185,11 @@ func TestCompressMessages(t *testing.T) { t.Run("GzipOk", func(t *testing.T) { var messages []messageWithDataContent for i := 0; i < messageCount; i++ { - mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders) + mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders) messages = append(messages, mess) } - require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecGzip, parallelCount)) + require.NoError(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount)) for i := 0; i < messageCount; i++ { require.Equal(t, rawtopiccommon.CodecGzip, messages[i].bufCodec) } @@ -198,11 +198,11 @@ func TestCompressMessages(t *testing.T) { t.Run("GzipErr", func(t *testing.T) { var messages []messageWithDataContent for i := 0; i < messageCount; i++ { - mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders) + mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders) messages = append(messages, mess) } messages[0].dataWasRead = true - require.Error(t, readInParallelWithCodec(messages, rawtopiccommon.CodecGzip, parallelCount)) + require.Error(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount)) }) } diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index 2ef3e569e..9b1088b95 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -9,15 +9,17 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xbytes" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" ) var errNoRawContent = xerrors.Wrap(errors.New("ydb: internal state error - no raw message content")) -type Message struct { +type PublicMessage struct { SeqNo int64 CreatedAt time.Time Data io.Reader + Metadata map[string][]byte // partitioning at level message available by protocol, but doesn't available by current server implementation // the field hidden from public access for prevent runtime errors. @@ -53,7 +55,7 @@ func NewPartitioningWithPartitionID(id int64) PublicFuturePartitioning { } type messageWithDataContent struct { - Message + PublicMessage dataWasRead bool encoders *EncoderMap @@ -63,6 +65,7 @@ type messageWithDataContent struct { bufCodec rawtopiccommon.Codec bufEncoded bytes.Buffer BufUncompressedSize int + metadataCached bool } func (m *messageWithDataContent) GetEncodedBytes(codec rawtopiccommon.Codec) ([]byte, error) { @@ -73,6 +76,30 @@ func (m *messageWithDataContent) GetEncodedBytes(codec rawtopiccommon.Codec) ([] return m.getEncodedBytes(codec) } +func (m *messageWithDataContent) cacheMetadata() { + if m.metadataCached { + return + } + + // ensure message metadata can't be changed by external code + if len(m.Metadata) > 0 { + ownCopy := make(map[string][]byte, len(m.Metadata)) + for key, val := range m.Metadata { + ownCopy[key] = xbytes.Clone(val) + } + m.Metadata = ownCopy + } else { + m.Metadata = nil + } + m.metadataCached = true +} + +func (m *messageWithDataContent) CacheMessageData(codec rawtopiccommon.Codec) error { + m.cacheMetadata() + _, err := m.GetEncodedBytes(codec) + return err +} + func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([]byte, error) { if !m.hasRawContent { return nil, xerrors.WithStackTrace(errNoRawContent) @@ -182,11 +209,11 @@ func (m *messageWithDataContent) getEncodedBytes(codec rawtopiccommon.Codec) ([] } func newMessageDataWithContent( - message Message, //nolint:gocritic + message PublicMessage, //nolint:gocritic encoders *EncoderMap, ) messageWithDataContent { return messageWithDataContent{ - Message: message, - encoders: encoders, + PublicMessage: message, + encoders: encoders, } } diff --git a/internal/topic/topicwriterinternal/writer.go b/internal/topic/topicwriterinternal/writer.go index 9b3758666..d3a74ffd0 100644 --- a/internal/topic/topicwriterinternal/writer.go +++ b/internal/topic/topicwriterinternal/writer.go @@ -40,7 +40,7 @@ func NewWriter(cred credentials.Credentials, options []PublicWriterOption) (*Wri }, nil } -func (w *Writer) Write(ctx context.Context, messages ...Message) error { +func (w *Writer) Write(ctx context.Context, messages ...PublicMessage) error { if err := ctx.Err(); err != nil { return err } diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 4e34eab96..c9b7ab48d 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -207,7 +207,7 @@ func (w *WriterReconnector) start() { w.background.Start(name+", sendloop", w.connectionLoop) } -func (w *WriterReconnector) Write(ctx context.Context, messages []Message) error { +func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) error { if err := w.background.CloseReason(); err != nil { return xerrors.WithStackTrace(fmt.Errorf("ydb: writer is closed: %w", err)) } @@ -291,7 +291,7 @@ func (w *WriterReconnector) checkMessages(messages []messageWithDataContent) err return nil } -func (w *WriterReconnector) createMessagesWithContent(messages []Message) ([]messageWithDataContent, error) { +func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage) ([]messageWithDataContent, error) { res := make([]messageWithDataContent, 0, len(messages)) for i := range messages { mess := newMessageDataWithContent(messages[i], w.encodersMap) @@ -316,7 +316,7 @@ func (w *WriterReconnector) createMessagesWithContent(messages []Message) ([]mes if targetCodec == rawtopiccommon.CodecUNSPECIFIED { targetCodec = rawtopiccommon.CodecRaw } - err := readInParallelWithCodec(res, targetCodec, w.cfg.compressorCount) + err := cacheMessages(res, targetCodec, w.cfg.compressorCount) onCompressDone(err) if err != nil { return nil, err @@ -653,6 +653,17 @@ func createRawMessageData( res.UncompressedSize = int64(mess.BufUncompressedSize) res.Data, err = mess.GetEncodedBytes(codec) + + if len(mess.Metadata) > 0 { + res.MetadataItems = make([]rawtopiccommon.MetadataItem, 0, len(mess.Metadata)) + for key, val := range mess.Metadata { + res.MetadataItems = append(res.MetadataItems, rawtopiccommon.MetadataItem{ + Key: key, + Value: val, + }) + } + } + return res, err } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 91d266136..198f46af8 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "sort" "sync" "sync/atomic" "testing" @@ -77,10 +78,10 @@ func TestWriterImpl_CheckMessages(t *testing.T) { maxSize := 5 w.cfg.MaxMessageSize = maxSize - err := w.Write(ctx, []Message{{Data: bytes.NewReader(make([]byte, maxSize))}}) + err := w.Write(ctx, []PublicMessage{{Data: bytes.NewReader(make([]byte, maxSize))}}) require.NoError(t, err) - err = w.Write(ctx, []Message{{Data: bytes.NewReader(make([]byte, maxSize+1))}}) + err = w.Write(ctx, []PublicMessage{{Data: bytes.NewReader(make([]byte, maxSize+1))}}) require.Error(t, err) }) } @@ -141,7 +142,11 @@ func TestWriterImpl_Write(t *testing.T) { writeCompleted := make(empty.Chan) go func() { - err := e.writer.Write(e.ctx, []Message{{SeqNo: seqNo, CreatedAt: messageTime, Data: bytes.NewReader(messageData)}}) + err := e.writer.Write(e.ctx, []PublicMessage{{ + SeqNo: seqNo, + CreatedAt: messageTime, + Data: bytes.NewReader(messageData), + }}) require.NoError(t, err) close(writeCompleted) }() @@ -187,7 +192,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) { }) require.NoError(t, err) - require.NoError(t, e.writer.Write(e.ctx, []Message{{ + require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{ Data: bytes.NewReader(messContent), }})) @@ -215,7 +220,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) { }) require.NoError(t, err) - require.NoError(t, e.writer.Write(e.ctx, []Message{{ + require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{ Data: bytes.NewReader(messContent), }})) @@ -242,7 +247,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) { codecs := make(map[rawtopiccommon.Codec]empty.Struct) for i := 0; i < codecMeasureIntervalBatches; i++ { - require.NoError(t, e.writer.Write(e.ctx, []Message{{ + require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{ Data: bytes.NewReader(messContentShort), }})) // wait send @@ -251,7 +256,7 @@ func TestWriterImpl_WriteCodecs(t *testing.T) { } for i := 0; i < codecMeasureIntervalBatches; i++ { - require.NoError(t, e.writer.Write(e.ctx, []Message{{ + require.NoError(t, e.writer.Write(e.ctx, []PublicMessage{{ Data: bytes.NewReader(messContentLong), }})) // wait send @@ -547,6 +552,38 @@ func TestCreateRawMessageData(t *testing.T) { req, ) }) + t.Run("WithMessageMetadata", func(t *testing.T) { + messages := newTestMessagesWithContent(1) + messages[0].Metadata = map[string][]byte{ + "a": {1, 2, 3}, + "b": {4, 5}, + } + req, err := createWriteRequest(messages, rawtopiccommon.CodecRaw) + + sort.Slice(req.Messages[0].MetadataItems, func(i, j int) bool { + return req.Messages[0].MetadataItems[i].Key < req.Messages[0].MetadataItems[j].Key + }) + + require.NoError(t, err) + require.Equal(t, rawtopicwriter.WriteRequest{ + Messages: []rawtopicwriter.MessageData{ + { + SeqNo: 1, + MetadataItems: []rawtopiccommon.MetadataItem{ + { + Key: "a", + Value: []byte{1, 2, 3}, + }, + { + Key: "b", + Value: []byte{4, 5}, + }, + }, + }, + }, + Codec: rawtopiccommon.CodecRaw, + }, req) + }) t.Run("WithSeqno", func(t *testing.T) { req, err := createWriteRequest(newTestMessagesWithContent(1, 2, 3), rawtopiccommon.CodecRaw) require.NoError(t, err) @@ -711,12 +748,12 @@ func TestCalculateAllowedCodecs(t *testing.T) { } func newTestMessageWithDataContent(num int) messageWithDataContent { - res := newMessageDataWithContent(Message{SeqNo: int64(num)}, testCommonEncoders) + res := newMessageDataWithContent(PublicMessage{SeqNo: int64(num)}, testCommonEncoders) return res } -func newTestMessages(numbers ...int) []Message { - messages := make([]Message, len(numbers)) +func newTestMessages(numbers ...int) []PublicMessage { + messages := make([]PublicMessage, len(numbers)) for i, num := range numbers { messages[i].SeqNo = int64(num) } diff --git a/internal/topic/topicwriterinternal/writer_stream_interface.go b/internal/topic/topicwriterinternal/writer_stream_interface.go index 5455641b4..7973f227f 100644 --- a/internal/topic/topicwriterinternal/writer_stream_interface.go +++ b/internal/topic/topicwriterinternal/writer_stream_interface.go @@ -6,7 +6,7 @@ import ( //go:generate mockgen -source writer_stream_interface.go -destination writer_stream_interface_mock_test.go -package topicwriterinternal -write_package_comment=false type StreamWriter interface { - Write(ctx context.Context, messages []Message) error + Write(ctx context.Context, messages []PublicMessage) error WaitInit(ctx context.Context) (info InitialInfo, err error) Close(ctx context.Context) error } diff --git a/internal/topic/topicwriterinternal/writer_stream_interface_mock_test.go b/internal/topic/topicwriterinternal/writer_stream_interface_mock_test.go index fb53d60d6..a663d3409 100644 --- a/internal/topic/topicwriterinternal/writer_stream_interface_mock_test.go +++ b/internal/topic/topicwriterinternal/writer_stream_interface_mock_test.go @@ -63,7 +63,7 @@ func (mr *MockStreamWriterMockRecorder) WaitInit(ctx interface{}) *gomock.Call { } // Write mocks base method. -func (m *MockStreamWriter) Write(ctx context.Context, messages []Message) error { +func (m *MockStreamWriter) Write(ctx context.Context, messages []PublicMessage) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Write", ctx, messages) ret0, _ := ret[0].(error) diff --git a/internal/topic/topicwriterinternal/writer_test.go b/internal/topic/topicwriterinternal/writer_test.go index 96d91cd46..2be44ed68 100644 --- a/internal/topic/topicwriterinternal/writer_test.go +++ b/internal/topic/topicwriterinternal/writer_test.go @@ -41,7 +41,7 @@ func TestWriterWrite(t *testing.T) { w := Writer{ streamWriter: strm, } - require.NoError(t, w.Write(ctx, Message{SeqNo: 1})) + require.NoError(t, w.Write(ctx, PublicMessage{SeqNo: 1})) }) }) } @@ -56,6 +56,6 @@ func TestWriterWriteMessage(t *testing.T) { w := Writer{ streamWriter: strm, } - require.NoError(t, w.Write(ctx, Message{SeqNo: 1}, Message{SeqNo: 3})) + require.NoError(t, w.Write(ctx, PublicMessage{SeqNo: 1}, PublicMessage{SeqNo: 3})) }) } diff --git a/internal/xbytes/clone.go b/internal/xbytes/clone.go new file mode 100644 index 000000000..2b923e742 --- /dev/null +++ b/internal/xbytes/clone.go @@ -0,0 +1,14 @@ +//go:build !go1.20 +// +build !go1.20 + +package xbytes + +// Clone returns a copy of b[:len(b)]. +// The result may have additional unused capacity. +// Clone(nil) returns nil. +func Clone(b []byte) []byte { + if b == nil { + return nil + } + return append([]byte{}, b...) +} diff --git a/internal/xbytes/clone_go1.20.go b/internal/xbytes/clone_go1.20.go new file mode 100644 index 000000000..fd6df4ae8 --- /dev/null +++ b/internal/xbytes/clone_go1.20.go @@ -0,0 +1,13 @@ +//go:build go1.20 +// +build go1.20 + +package xbytes + +import "bytes" + +// Clone returns a copy of b[:len(b)]. +// The result may have additional unused capacity. +// Clone(nil) returns nil. +func Clone(b []byte) []byte { + return bytes.Clone(b) +} diff --git a/tests/integration/helpers_test.go b/tests/integration/helpers_test.go index ffb6098dc..6a1a6afa4 100644 --- a/tests/integration/helpers_test.go +++ b/tests/integration/helpers_test.go @@ -24,6 +24,10 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/sugar" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -160,6 +164,68 @@ func (scope *scopeT) LoggerMinLevel(level log.Level) *testLogger { }).(*testLogger) } +func (scope *scopeT) TopicConsumerName() string { + return "test-consumer" +} + +func (scope *scopeT) TopicPath() string { + return scope.CacheWithCleanup(nil, nil, func() (res interface{}, cleanup fixenv.FixtureCleanupFunc, err error) { + topicName := strings.Replace(scope.T().Name(), "/", "__", -1) + topicPath := path.Join(scope.Folder(), topicName) + client := scope.Driver().Topic() + + cleanup = func() { + if !scope.Failed() { + _ = client.Drop(scope.Ctx, topicPath) + } + } + cleanup() + + err = client.Create(scope.Ctx, topicPath, topicoptions.CreateWithConsumer( + topictypes.Consumer{ + Name: scope.TopicConsumerName(), + }, + )) + + return topicPath, cleanup, err + }).(string) +} + +func (scope *scopeT) TopicReader() *topicreader.Reader { + return scope.CacheWithCleanup(nil, nil, func() (res interface{}, cleanup fixenv.FixtureCleanupFunc, err error) { + reader, err := scope.Driver().Topic().StartReader( + scope.TopicConsumerName(), + topicoptions.ReadTopic(scope.TopicPath()), + ) + cleanup = func() { + if reader != nil { + _ = reader.Close(scope.Ctx) + } + } + return reader, cleanup, err + }).(*topicreader.Reader) +} + +func (scope *scopeT) TopicWriter() *topicwriter.Writer { + return scope.CacheWithCleanup(nil, nil, func() (res interface{}, cleanup fixenv.FixtureCleanupFunc, err error) { + writer, err := scope.Driver().Topic().StartWriter( + scope.TopicPath(), + topicoptions.WithWriterProducerID(scope.TopicWriterProducerID()), + topicoptions.WithWriterWaitServerAck(true), + ) + cleanup = func() { + if writer != nil { + _ = writer.Close(scope.Ctx) + } + } + return writer, cleanup, err + }).(*topicwriter.Writer) +} + +func (scope *scopeT) TopicWriterProducerID() string { + return "test-producer-id" +} + type tableNameParams struct { tableName string createTableQueryTemplate string diff --git a/tests/integration/topic_read_writer_test.go b/tests/integration/topic_read_writer_test.go index c13fe4472..bfc679f85 100644 --- a/tests/integration/topic_read_writer_test.go +++ b/tests/integration/topic_read_writer_test.go @@ -112,6 +112,48 @@ func TestSendSyncMessages(t *testing.T) { }) } +func TestMessageMetadata(t *testing.T) { + t.Run("NoMetadata", func(t *testing.T) { + e := newScope(t) + err := e.TopicWriter().Write(e.Ctx, topicwriter.Message{}) + e.Require.NoError(err) + + mess, err := e.TopicReader().ReadMessage(e.Ctx) + e.Require.NoError(err) + e.Require.Nil(mess.Metadata) + }) + t.Run("Meta1", func(t *testing.T) { + e := newScope(t) + meta := map[string][]byte{ + "key": []byte("val"), + } + err := e.TopicWriter().Write(e.Ctx, topicwriter.Message{ + Metadata: meta, + }) + e.Require.NoError(err) + + mess, err := e.TopicReader().ReadMessage(e.Ctx) + e.Require.NoError(err) + e.Require.Equal(meta, mess.Metadata) + }) + t.Run("Meta2", func(t *testing.T) { + e := newScope(t) + meta := map[string][]byte{ + "key1": []byte("val1"), + "key2": []byte("val2"), + "key3": []byte("val3"), + } + err := e.TopicWriter().Write(e.Ctx, topicwriter.Message{ + Metadata: meta, + }) + e.Require.NoError(err) + + mess, err := e.TopicReader().ReadMessage(e.Ctx) + e.Require.NoError(err) + e.Require.Equal(meta, mess.Metadata) + }) +} + func TestManyConcurentReadersWriters(t *testing.T) { xtest.AllowByFlag(t, "ISSUE-389") diff --git a/tests/slo/go.mod b/tests/slo/go.mod index 92395169e..35a9ce667 100644 --- a/tests/slo/go.mod +++ b/tests/slo/go.mod @@ -33,7 +33,7 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect github.com/yandex-cloud/go-genproto v0.0.0-20230403093326-123923969dc6 // indirect - github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd // indirect + github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a // indirect github.com/ydb-platform/ydb-go-sdk-auth-environ v0.2.0 // indirect github.com/ydb-platform/ydb-go-yc v0.10.2 // indirect github.com/ydb-platform/ydb-go-yc-metadata v0.5.3 // indirect diff --git a/tests/slo/go.sum b/tests/slo/go.sum index 9860f6931..9f3d1c7ed 100644 --- a/tests/slo/go.sum +++ b/tests/slo/go.sum @@ -1041,6 +1041,7 @@ github.com/ydb-platform/xorm v0.0.6 h1:mlclMIXR7Obwho3cYIIgBoMlMZ+APJZ9gnJQICyVA github.com/ydb-platform/xorm v0.0.6/go.mod h1:vLAI6Xqpa+48y9I9HJnjD6IDKp/GnATYbtDgWzQb88c= github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE= github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.2.0 h1:IG5bPd+Lqyc+zsw2kmxqfGLkaDHuAEnWX63/8RBBiA4= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.2.0/go.mod h1:l6lZ+osdQOjDRBgRA4PQ06BuvmXN2neYjnRw8rCfd7s= github.com/ydb-platform/ydb-go-yc v0.10.2 h1:RAHy6g7ncxk1y0N4oS2MwYXLATqRqKBI6DYXuxpV2wo= diff --git a/topic/topicwriter/topicwriter.go b/topic/topicwriter/topicwriter.go index d06efe588..341e16da4 100644 --- a/topic/topicwriter/topicwriter.go +++ b/topic/topicwriter/topicwriter.go @@ -7,7 +7,7 @@ import ( ) type ( - Message = topicwriterinternal.Message + Message = topicwriterinternal.PublicMessage ) var ErrQueueLimitExceed = topicwriterinternal.PublicErrQueueIsFull From 734edc155852a9624783cfbd5e139d4be7e78ad3 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 15 Dec 2023 15:11:06 +0300 Subject: [PATCH 2/5] fix integration tests for skip message metadata for old versions --- .../topic/topicwriterinternal/writer_reconnector_test.go | 2 ++ tests/integration/topic_read_writer_test.go | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 198f46af8..a1c5572f3 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -106,7 +106,9 @@ func TestWriterImpl_Write(t *testing.T) { mess := expectedMap[k] _, err = mess.GetEncodedBytes(rawtopiccommon.CodecRaw) require.NoError(t, err) + mess.metadataCached = true expectedMap[k] = mess + } require.Equal(t, expectedMap, w.queue.messagesByOrder) diff --git a/tests/integration/topic_read_writer_test.go b/tests/integration/topic_read_writer_test.go index bfc679f85..c1e10f9f7 100644 --- a/tests/integration/topic_read_writer_test.go +++ b/tests/integration/topic_read_writer_test.go @@ -9,6 +9,7 @@ import ( "encoding/binary" "errors" "io" + "os" "runtime/pprof" "strconv" "strings" @@ -22,6 +23,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xatomic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" @@ -123,6 +125,9 @@ func TestMessageMetadata(t *testing.T) { e.Require.Nil(mess.Metadata) }) t.Run("Meta1", func(t *testing.T) { + if version.Lt(os.Getenv("YDB_VERSION"), "24.0") { + t.Skip() + } e := newScope(t) meta := map[string][]byte{ "key": []byte("val"), @@ -137,6 +142,9 @@ func TestMessageMetadata(t *testing.T) { e.Require.Equal(meta, mess.Metadata) }) t.Run("Meta2", func(t *testing.T) { + if version.Lt(os.Getenv("YDB_VERSION"), "24.0") { + t.Skip() + } e := newScope(t) meta := map[string][]byte{ "key1": []byte("val1"), From 30e9586587de392736b11e5d8fe0719c258e320d Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 15 Dec 2023 15:35:46 +0300 Subject: [PATCH 3/5] fix coloring test --- log/logger_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/log/logger_test.go b/log/logger_test.go index aac28cb19..946743c28 100644 --- a/log/logger_test.go +++ b/log/logger_test.go @@ -2,12 +2,16 @@ package log import ( "testing" + "time" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" ) func TestColoring(t *testing.T) { + zeroClock := clockwork.NewFakeClock() + fullDuration := zeroClock.Now().Sub(time.Date(1984, 4, 4, 0, 0, 0, 0, time.Local)) + zeroClock.Advance(-fullDuration) // set zero time for _, tt := range []struct { l *defaultLogger msg string @@ -16,7 +20,7 @@ func TestColoring(t *testing.T) { { l: &defaultLogger{ coloring: true, - clock: clockwork.NewFakeClock(), + clock: zeroClock, }, msg: "test", exp: "\u001B[31m1984-04-04 00:00:00.000 \u001B[0m\u001B[101mERROR\u001B[0m\u001B[31m 'test.scope' => message\u001B[0m", //nolint:lll @@ -24,7 +28,7 @@ func TestColoring(t *testing.T) { { l: &defaultLogger{ coloring: false, - clock: clockwork.NewFakeClock(), + clock: zeroClock, }, msg: "test", exp: "1984-04-04 00:00:00.000 ERROR 'test.scope' => message", From a620003c7c444c7806c25182c5f127321160857e Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 15 Dec 2023 16:33:10 +0300 Subject: [PATCH 4/5] fix go.sum --- examples/go.sum | 3 +-- tests/slo/go.sum | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/go.sum b/examples/go.sum index 37a8c5a73..5e7567048 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -1195,8 +1195,7 @@ github.com/ydb-platform/gorm-driver v0.0.5 h1:q6Cg/iSFw4TAmSyMh25YM0GRmr6LVM2gnF github.com/ydb-platform/gorm-driver v0.0.5/go.mod h1:fkCvWZlA3PzL5MiMc7yFOzxUOzLpY1uT8yZo+e4SV4Y= github.com/ydb-platform/xorm v0.0.3 h1:MXk42lANB6r/MMLg/XdJfyXJycGUDlCeLiMlLGDKVPw= github.com/ydb-platform/xorm v0.0.3/go.mod h1:hFsU7EUF0o3S+l5c0eyP2yPVjJ0d4gsFdqCsyazzwBc= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a h1:9wx+kCrCQCdwmDe1AFW5yAHdzlo+RV7lcy6y7Zq661s= github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 h1:EYSI1kulnHb0H0zt3yOw4cRj4ABMSMGwNe43D+fX7e4= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2/go.mod h1:Xfjce+VMU9yJVr1lj60yK2fFPWjB4jr/4cp3K7cjzi4= diff --git a/tests/slo/go.sum b/tests/slo/go.sum index 9f3d1c7ed..e12fb847f 100644 --- a/tests/slo/go.sum +++ b/tests/slo/go.sum @@ -1039,8 +1039,7 @@ github.com/ydb-platform/gorm-driver v0.1.1 h1:PkN+sGSJehOZn9jQIFEmAOfhE73FNDMq+u github.com/ydb-platform/gorm-driver v0.1.1/go.mod h1:Zv368SD5tHqkblmaOG6r2KTvtSIzPuB5p8rBaE6wVmw= github.com/ydb-platform/xorm v0.0.6 h1:mlclMIXR7Obwho3cYIIgBoMlMZ+APJZ9gnJQICyVAYY= github.com/ydb-platform/xorm v0.0.6/go.mod h1:vLAI6Xqpa+48y9I9HJnjD6IDKp/GnATYbtDgWzQb88c= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a h1:9wx+kCrCQCdwmDe1AFW5yAHdzlo+RV7lcy6y7Zq661s= github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.2.0 h1:IG5bPd+Lqyc+zsw2kmxqfGLkaDHuAEnWX63/8RBBiA4= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.2.0/go.mod h1:l6lZ+osdQOjDRBgRA4PQ06BuvmXN2neYjnRw8rCfd7s= From 7fd9756981404178e3df0e8173bad7741a3a16f1 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 15 Dec 2023 16:40:51 +0300 Subject: [PATCH 5/5] fix linter --- internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go | 6 +++--- internal/topic/topicreaderinternal/batch.go | 4 ++-- .../topic/topicwriterinternal/writer_reconnector_test.go | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go b/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go index 7c4b466ce..c943506fa 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go +++ b/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go @@ -187,10 +187,10 @@ func (d *MessageData) ToProto() (*Ydb_Topic.StreamWriteMessage_WriteRequest_Mess return nil, err } - for _, item := range d.MetadataItems { + for i := range d.MetadataItems { res.MetadataItems = append(res.MetadataItems, &Ydb_Topic.MetadataItem{ - Key: item.Key, - Value: item.Value, + Key: d.MetadataItems[i].Key, + Value: d.MetadataItems[i].Value, }) } diff --git a/internal/topic/topicreaderinternal/batch.go b/internal/topic/topicreaderinternal/batch.go index f1edfab3a..14283d0fe 100644 --- a/internal/topic/topicreaderinternal/batch.go +++ b/internal/topic/topicreaderinternal/batch.go @@ -88,8 +88,8 @@ func newBatchFromStream( if len(sMess.MetadataItems) > 0 { dstMess.Metadata = make(map[string][]byte, len(sMess.MetadataItems)) - for _, item := range sMess.MetadataItems { - dstMess.Metadata[item.Key] = item.Value + for metadataIndex := range sMess.MetadataItems { + dstMess.Metadata[sMess.MetadataItems[metadataIndex].Key] = sMess.MetadataItems[metadataIndex].Value } } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index a1c5572f3..a029154c7 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -108,7 +108,6 @@ func TestWriterImpl_Write(t *testing.T) { require.NoError(t, err) mess.metadataCached = true expectedMap[k] = mess - } require.Equal(t, expectedMap, w.queue.messagesByOrder)