diff --git a/CHANGELOG.md b/CHANGELOG.md index 102f73eb7..baeeb4d58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Added per message metadata support for topic api * Context for call options now have same lifetime as driver (previous - same lifetime as context for call Open function). * Extended metrics (fill database.sql callbacks, recognize TLI error) * Refactored config prefix in metrics diff --git a/examples/go.mod b/examples/go.mod index 1984047c3..085257f95 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -49,7 +49,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect github.com/yandex-cloud/go-genproto v0.0.0-20220815090733-4c139c0154e2 // indirect - github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd // indirect + github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a // indirect github.com/ydb-platform/ydb-go-sdk-metrics v0.16.3 // indirect github.com/ydb-platform/ydb-go-yc-metadata v0.5.4 // indirect golang.org/x/crypto v0.13.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index b804b0da3..5e7567048 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -1195,8 +1195,8 @@ github.com/ydb-platform/gorm-driver v0.0.5 h1:q6Cg/iSFw4TAmSyMh25YM0GRmr6LVM2gnF github.com/ydb-platform/gorm-driver v0.0.5/go.mod h1:fkCvWZlA3PzL5MiMc7yFOzxUOzLpY1uT8yZo+e4SV4Y= github.com/ydb-platform/xorm v0.0.3 h1:MXk42lANB6r/MMLg/XdJfyXJycGUDlCeLiMlLGDKVPw= github.com/ydb-platform/xorm v0.0.3/go.mod h1:hFsU7EUF0o3S+l5c0eyP2yPVjJ0d4gsFdqCsyazzwBc= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a h1:9wx+kCrCQCdwmDe1AFW5yAHdzlo+RV7lcy6y7Zq661s= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 h1:EYSI1kulnHb0H0zt3yOw4cRj4ABMSMGwNe43D+fX7e4= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2/go.mod h1:Xfjce+VMU9yJVr1lj60yK2fFPWjB4jr/4cp3K7cjzi4= github.com/ydb-platform/ydb-go-sdk-metrics v0.16.3 h1:30D5jErLAiGjchVG2D9JiCLbST5LpAiyS7DoUtHkWsU= diff --git a/go.mod b/go.mod index d0a2647ce..809750187 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.4.1 github.com/google/uuid v1.3.0 github.com/jonboulle/clockwork v0.3.0 - github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd + github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a golang.org/x/sync v0.3.0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/grpc v1.57.1 diff --git a/go.sum b/go.sum index 0c37233e8..7720358a3 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a h1:9wx+kCrCQCdwmDe1AFW5yAHdzlo+RV7lcy6y7Zq661s= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/internal/grpcwrapper/rawtopic/rawtopiccommon/message_metadata.go b/internal/grpcwrapper/rawtopic/rawtopiccommon/message_metadata.go new file mode 100644 index 000000000..33a67d4c1 --- /dev/null +++ b/internal/grpcwrapper/rawtopic/rawtopiccommon/message_metadata.go @@ -0,0 +1,6 @@ +package rawtopiccommon + +type MetadataItem struct { + Key string + Value []byte +} diff --git a/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go b/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go index 02775a89f..a29d48f80 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go +++ b/internal/grpcwrapper/rawtopic/rawtopicreader/messages.go @@ -245,6 +245,15 @@ func (r *ReadResponse) fromProto(p *Ydb_Topic.StreamReadMessage_ReadResponse) er dstMessage.Data = srcMessage.Data dstMessage.UncompressedSize = srcMessage.UncompressedSize dstMessage.MessageGroupID = srcMessage.MessageGroupId + if len(srcMessage.MetadataItems) > 0 { + dstMessage.MetadataItems = make([]rawtopiccommon.MetadataItem, 0, len(srcMessage.MetadataItems)) + for _, protoItem := range srcMessage.MetadataItems { + dstMessage.MetadataItems = append(dstMessage.MetadataItems, rawtopiccommon.MetadataItem{ + Key: protoItem.Key, + Value: protoItem.Value[:len(protoItem.Value):len(protoItem.Value)], + }) + } + } } } } @@ -273,6 +282,7 @@ type MessageData struct { Data []byte UncompressedSize int64 MessageGroupID string + MetadataItems []rawtopiccommon.MetadataItem } // diff --git a/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go b/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go index 48f505a57..c943506fa 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go +++ b/internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go @@ -171,6 +171,7 @@ type MessageData struct { CreatedAt time.Time UncompressedSize int64 Partitioning Partitioning + MetadataItems []rawtopiccommon.MetadataItem Data []byte } @@ -185,6 +186,14 @@ func (d *MessageData) ToProto() (*Ydb_Topic.StreamWriteMessage_WriteRequest_Mess if err != nil { return nil, err } + + for i := range d.MetadataItems { + res.MetadataItems = append(res.MetadataItems, &Ydb_Topic.MetadataItem{ + Key: d.MetadataItems[i].Key, + Value: d.MetadataItems[i].Value, + }) + } + return res, nil } diff --git a/internal/topic/topicreaderinternal/batch.go b/internal/topic/topicreaderinternal/batch.go index 6665cd4ef..14283d0fe 100644 --- a/internal/topic/topicreaderinternal/batch.go +++ b/internal/topic/topicreaderinternal/batch.go @@ -86,6 +86,13 @@ func newBatchFromStream( dstMess.commitRange.commitOffsetStart = prevOffset + 1 dstMess.commitRange.commitOffsetEnd = sMess.Offset + 1 + if len(sMess.MetadataItems) > 0 { + dstMess.Metadata = make(map[string][]byte, len(sMess.MetadataItems)) + for metadataIndex := range sMess.MetadataItems { + dstMess.Metadata[sMess.MetadataItems[metadataIndex].Key] = sMess.MetadataItems[metadataIndex].Value + } + } + prevOffset = sMess.Offset } diff --git a/internal/topic/topicreaderinternal/message.go b/internal/topic/topicreaderinternal/message.go index 77bfe007c..57aa89c5a 100644 --- a/internal/topic/topicreaderinternal/message.go +++ b/internal/topic/topicreaderinternal/message.go @@ -9,6 +9,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xbytes" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" ) @@ -28,6 +29,7 @@ type PublicMessage struct { Offset int64 WrittenAt time.Time ProducerID string + Metadata map[string][]byte // Metadata, nil if no metadata commitRange commitRange data oneTimeReader @@ -135,6 +137,14 @@ func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBu return pmb } +func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder { + pmb.mess.Metadata = make(map[string][]byte, len(metadata)) + for key, val := range metadata { + pmb.mess.Metadata[key] = xbytes.Clone(val) + } + return pmb +} + // MessageGroupID set message MessageGroupID func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder { pmb.mess.MessageGroupID = messageGroupID diff --git a/internal/topic/topicreaderinternal/stream_reader_impl_test.go b/internal/topic/topicreaderinternal/stream_reader_impl_test.go index f63526b2e..9f76fae9a 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl_test.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl_test.go @@ -583,7 +583,7 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) { prevOffset := e.partitionSession.lastReceivedMessageOffset() sendDataRequestCompleted := make(empty.Chan) - dataSize := 4 + dataSize := 6 e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize}).Do(func(_ interface{}) { close(sendDataRequestCompleted) }) @@ -639,17 +639,60 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) { }, }, }, + { + Codec: rawtopiccommon.CodecRaw, + WriteSessionMeta: map[string]string{"a": "b", "c": "d"}, + WrittenAt: testTime(7), + MessageData: []rawtopicreader.MessageData{ + { + Offset: prevOffset + 30, + SeqNo: 5, + CreatedAt: testTime(5), + Data: []byte("test"), + UncompressedSize: 4, + MessageGroupID: "1", + MetadataItems: []rawtopiccommon.MetadataItem{ + { + Key: "first", + Value: []byte("first-value"), + }, + { + Key: "second", + Value: []byte("second-value"), + }, + }, + }, + { + Offset: prevOffset + 31, + SeqNo: 6, + CreatedAt: testTime(5), + Data: []byte("4567"), + UncompressedSize: 4, + MessageGroupID: "1", + MetadataItems: []rawtopiccommon.MetadataItem{ + { + Key: "doubled-key", + Value: []byte("bad"), + }, + { + Key: "doubled-key", + Value: []byte("good"), + }, + }, + }, + }, + }, }, }, }, }, ) - expectedData := [][]byte{[]byte("123"), []byte("4567"), []byte("098"), []byte("0987")} + expectedData := [][]byte{[]byte("123"), []byte("4567"), []byte("098"), []byte("0987"), []byte("test"), []byte("4567")} expectedBatch := &PublicBatch{ commitRange: commitRange{ commitOffsetStart: prevOffset + 1, - commitOffsetEnd: prevOffset + 21, + commitOffsetEnd: prevOffset + 32, partitionSession: e.partitionSession, }, Messages: []*PublicMessage{ @@ -713,11 +756,48 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) { partitionSession: e.partitionSession, }, }, + { + SeqNo: 5, + CreatedAt: testTime(5), + MessageGroupID: "1", + Metadata: map[string][]byte{ + "first": []byte("first-value"), + "second": []byte("second-value"), + }, + Offset: prevOffset.ToInt64() + 30, + WrittenAt: testTime(7), + WriteSessionMetadata: map[string]string{"a": "b", "c": "d"}, + UncompressedSize: 4, + rawDataLen: 4, + commitRange: commitRange{ + commitOffsetStart: prevOffset + 21, + commitOffsetEnd: prevOffset + 31, + partitionSession: e.partitionSession, + }, + }, + { + SeqNo: 6, + CreatedAt: testTime(5), + MessageGroupID: "1", + Metadata: map[string][]byte{ + "doubled-key": []byte("good"), + }, + Offset: prevOffset.ToInt64() + 31, + WrittenAt: testTime(7), + WriteSessionMetadata: map[string]string{"a": "b", "c": "d"}, + UncompressedSize: 4, + rawDataLen: 4, + commitRange: commitRange{ + commitOffsetStart: prevOffset + 31, + commitOffsetEnd: prevOffset + 32, + partitionSession: e.partitionSession, + }, + }, }, } opts := newReadMessageBatchOptions() - opts.MinCount = 4 + opts.MinCount = 6 batch, err := e.reader.ReadMessageBatch(e.ctx, opts) require.NoError(t, err) diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index 1043a2c02..60eb3e2dc 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -119,7 +119,7 @@ func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (r len(messages), trace.TopicWriterCompressMessagesReasonCompressData, ) - err = readInParallelWithCodec(messages, codec, s.parallelCompressors) + err = cacheMessages(messages, codec, s.parallelCompressors) onCompressDone(err) } return codec, err @@ -188,7 +188,7 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt len(messages), trace.TopicWriterCompressMessagesReasonCodecsMeasure, ) - err := readInParallelWithCodec(messages, codec, s.parallelCompressors) + err := cacheMessages(messages, codec, s.parallelCompressors) onCompressDone(err) if err != nil { return codecUnknown, err @@ -215,8 +215,7 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt return s.allowedCodecs[minSizeIndex], nil } -func readInParallelWithCodec(messages []messageWithDataContent, codec rawtopiccommon.Codec, parallel int) error { - workerCount := parallel +func cacheMessages(messages []messageWithDataContent, codec rawtopiccommon.Codec, workerCount int) error { if len(messages) < workerCount { workerCount = len(messages) } @@ -253,7 +252,7 @@ func readInParallelWithCodec(messages []messageWithDataContent, codec rawtopicco if localErr != nil { return } - _, localErr = task.GetEncodedBytes(codec) + localErr = task.CacheMessageData(codec) if localErr != nil { resErrMutex.WithLock(func() { resErr = localErr diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index 02609f600..63b929068 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -152,19 +152,19 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { func TestCompressMessages(t *testing.T) { t.Run("NoMessages", func(t *testing.T) { - require.NoError(t, readInParallelWithCodec(nil, rawtopiccommon.CodecRaw, 1)) + require.NoError(t, cacheMessages(nil, rawtopiccommon.CodecRaw, 1)) }) t.Run("RawOk", func(t *testing.T) { messages := newTestMessagesWithContent(1) - require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecRaw, 1)) + require.NoError(t, cacheMessages(messages, rawtopiccommon.CodecRaw, 1)) }) t.Run("RawError", func(t *testing.T) { mess := newMessageDataWithContent(PublicMessage{}, testCommonEncoders) _, err := mess.GetEncodedBytes(rawtopiccommon.CodecGzip) require.NoError(t, err) messages := []messageWithDataContent{mess} - require.Error(t, readInParallelWithCodec(messages, rawtopiccommon.CodecRaw, 1)) + require.Error(t, cacheMessages(messages, rawtopiccommon.CodecRaw, 1)) }) const messageCount = 10 @@ -175,7 +175,7 @@ func TestCompressMessages(t *testing.T) { messages = append(messages, mess) } - require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecGzip, 1)) + require.NoError(t, cacheMessages(messages, rawtopiccommon.CodecGzip, 1)) for i := 0; i < messageCount; i++ { require.Equal(t, rawtopiccommon.CodecGzip, messages[i].bufCodec) } @@ -189,7 +189,7 @@ func TestCompressMessages(t *testing.T) { messages = append(messages, mess) } - require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecGzip, parallelCount)) + require.NoError(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount)) for i := 0; i < messageCount; i++ { require.Equal(t, rawtopiccommon.CodecGzip, messages[i].bufCodec) } @@ -203,6 +203,6 @@ func TestCompressMessages(t *testing.T) { } messages[0].dataWasRead = true - require.Error(t, readInParallelWithCodec(messages, rawtopiccommon.CodecGzip, parallelCount)) + require.Error(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount)) }) } diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index 1749e461c..9b1088b95 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -9,6 +9,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xbytes" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" ) @@ -18,6 +19,7 @@ type PublicMessage struct { SeqNo int64 CreatedAt time.Time Data io.Reader + Metadata map[string][]byte // partitioning at level message available by protocol, but doesn't available by current server implementation // the field hidden from public access for prevent runtime errors. @@ -63,6 +65,7 @@ type messageWithDataContent struct { bufCodec rawtopiccommon.Codec bufEncoded bytes.Buffer BufUncompressedSize int + metadataCached bool } func (m *messageWithDataContent) GetEncodedBytes(codec rawtopiccommon.Codec) ([]byte, error) { @@ -73,6 +76,30 @@ func (m *messageWithDataContent) GetEncodedBytes(codec rawtopiccommon.Codec) ([] return m.getEncodedBytes(codec) } +func (m *messageWithDataContent) cacheMetadata() { + if m.metadataCached { + return + } + + // ensure message metadata can't be changed by external code + if len(m.Metadata) > 0 { + ownCopy := make(map[string][]byte, len(m.Metadata)) + for key, val := range m.Metadata { + ownCopy[key] = xbytes.Clone(val) + } + m.Metadata = ownCopy + } else { + m.Metadata = nil + } + m.metadataCached = true +} + +func (m *messageWithDataContent) CacheMessageData(codec rawtopiccommon.Codec) error { + m.cacheMetadata() + _, err := m.GetEncodedBytes(codec) + return err +} + func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([]byte, error) { if !m.hasRawContent { return nil, xerrors.WithStackTrace(errNoRawContent) diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index a0518df11..c9b7ab48d 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -316,7 +316,7 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage) if targetCodec == rawtopiccommon.CodecUNSPECIFIED { targetCodec = rawtopiccommon.CodecRaw } - err := readInParallelWithCodec(res, targetCodec, w.cfg.compressorCount) + err := cacheMessages(res, targetCodec, w.cfg.compressorCount) onCompressDone(err) if err != nil { return nil, err @@ -653,6 +653,17 @@ func createRawMessageData( res.UncompressedSize = int64(mess.BufUncompressedSize) res.Data, err = mess.GetEncodedBytes(codec) + + if len(mess.Metadata) > 0 { + res.MetadataItems = make([]rawtopiccommon.MetadataItem, 0, len(mess.Metadata)) + for key, val := range mess.Metadata { + res.MetadataItems = append(res.MetadataItems, rawtopiccommon.MetadataItem{ + Key: key, + Value: val, + }) + } + } + return res, err } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index 0141cc67a..a029154c7 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "sort" "sync" "sync/atomic" "testing" @@ -105,6 +106,7 @@ func TestWriterImpl_Write(t *testing.T) { mess := expectedMap[k] _, err = mess.GetEncodedBytes(rawtopiccommon.CodecRaw) require.NoError(t, err) + mess.metadataCached = true expectedMap[k] = mess } @@ -551,6 +553,38 @@ func TestCreateRawMessageData(t *testing.T) { req, ) }) + t.Run("WithMessageMetadata", func(t *testing.T) { + messages := newTestMessagesWithContent(1) + messages[0].Metadata = map[string][]byte{ + "a": {1, 2, 3}, + "b": {4, 5}, + } + req, err := createWriteRequest(messages, rawtopiccommon.CodecRaw) + + sort.Slice(req.Messages[0].MetadataItems, func(i, j int) bool { + return req.Messages[0].MetadataItems[i].Key < req.Messages[0].MetadataItems[j].Key + }) + + require.NoError(t, err) + require.Equal(t, rawtopicwriter.WriteRequest{ + Messages: []rawtopicwriter.MessageData{ + { + SeqNo: 1, + MetadataItems: []rawtopiccommon.MetadataItem{ + { + Key: "a", + Value: []byte{1, 2, 3}, + }, + { + Key: "b", + Value: []byte{4, 5}, + }, + }, + }, + }, + Codec: rawtopiccommon.CodecRaw, + }, req) + }) t.Run("WithSeqno", func(t *testing.T) { req, err := createWriteRequest(newTestMessagesWithContent(1, 2, 3), rawtopiccommon.CodecRaw) require.NoError(t, err) diff --git a/internal/xbytes/clone.go b/internal/xbytes/clone.go new file mode 100644 index 000000000..2b923e742 --- /dev/null +++ b/internal/xbytes/clone.go @@ -0,0 +1,14 @@ +//go:build !go1.20 +// +build !go1.20 + +package xbytes + +// Clone returns a copy of b[:len(b)]. +// The result may have additional unused capacity. +// Clone(nil) returns nil. +func Clone(b []byte) []byte { + if b == nil { + return nil + } + return append([]byte{}, b...) +} diff --git a/internal/xbytes/clone_go1.20.go b/internal/xbytes/clone_go1.20.go new file mode 100644 index 000000000..fd6df4ae8 --- /dev/null +++ b/internal/xbytes/clone_go1.20.go @@ -0,0 +1,13 @@ +//go:build go1.20 +// +build go1.20 + +package xbytes + +import "bytes" + +// Clone returns a copy of b[:len(b)]. +// The result may have additional unused capacity. +// Clone(nil) returns nil. +func Clone(b []byte) []byte { + return bytes.Clone(b) +} diff --git a/log/logger_test.go b/log/logger_test.go index aac28cb19..946743c28 100644 --- a/log/logger_test.go +++ b/log/logger_test.go @@ -2,12 +2,16 @@ package log import ( "testing" + "time" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" ) func TestColoring(t *testing.T) { + zeroClock := clockwork.NewFakeClock() + fullDuration := zeroClock.Now().Sub(time.Date(1984, 4, 4, 0, 0, 0, 0, time.Local)) + zeroClock.Advance(-fullDuration) // set zero time for _, tt := range []struct { l *defaultLogger msg string @@ -16,7 +20,7 @@ func TestColoring(t *testing.T) { { l: &defaultLogger{ coloring: true, - clock: clockwork.NewFakeClock(), + clock: zeroClock, }, msg: "test", exp: "\u001B[31m1984-04-04 00:00:00.000 \u001B[0m\u001B[101mERROR\u001B[0m\u001B[31m 'test.scope' => message\u001B[0m", //nolint:lll @@ -24,7 +28,7 @@ func TestColoring(t *testing.T) { { l: &defaultLogger{ coloring: false, - clock: clockwork.NewFakeClock(), + clock: zeroClock, }, msg: "test", exp: "1984-04-04 00:00:00.000 ERROR 'test.scope' => message", diff --git a/tests/integration/helpers_test.go b/tests/integration/helpers_test.go index ffb6098dc..6a1a6afa4 100644 --- a/tests/integration/helpers_test.go +++ b/tests/integration/helpers_test.go @@ -24,6 +24,10 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/sugar" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -160,6 +164,68 @@ func (scope *scopeT) LoggerMinLevel(level log.Level) *testLogger { }).(*testLogger) } +func (scope *scopeT) TopicConsumerName() string { + return "test-consumer" +} + +func (scope *scopeT) TopicPath() string { + return scope.CacheWithCleanup(nil, nil, func() (res interface{}, cleanup fixenv.FixtureCleanupFunc, err error) { + topicName := strings.Replace(scope.T().Name(), "/", "__", -1) + topicPath := path.Join(scope.Folder(), topicName) + client := scope.Driver().Topic() + + cleanup = func() { + if !scope.Failed() { + _ = client.Drop(scope.Ctx, topicPath) + } + } + cleanup() + + err = client.Create(scope.Ctx, topicPath, topicoptions.CreateWithConsumer( + topictypes.Consumer{ + Name: scope.TopicConsumerName(), + }, + )) + + return topicPath, cleanup, err + }).(string) +} + +func (scope *scopeT) TopicReader() *topicreader.Reader { + return scope.CacheWithCleanup(nil, nil, func() (res interface{}, cleanup fixenv.FixtureCleanupFunc, err error) { + reader, err := scope.Driver().Topic().StartReader( + scope.TopicConsumerName(), + topicoptions.ReadTopic(scope.TopicPath()), + ) + cleanup = func() { + if reader != nil { + _ = reader.Close(scope.Ctx) + } + } + return reader, cleanup, err + }).(*topicreader.Reader) +} + +func (scope *scopeT) TopicWriter() *topicwriter.Writer { + return scope.CacheWithCleanup(nil, nil, func() (res interface{}, cleanup fixenv.FixtureCleanupFunc, err error) { + writer, err := scope.Driver().Topic().StartWriter( + scope.TopicPath(), + topicoptions.WithWriterProducerID(scope.TopicWriterProducerID()), + topicoptions.WithWriterWaitServerAck(true), + ) + cleanup = func() { + if writer != nil { + _ = writer.Close(scope.Ctx) + } + } + return writer, cleanup, err + }).(*topicwriter.Writer) +} + +func (scope *scopeT) TopicWriterProducerID() string { + return "test-producer-id" +} + type tableNameParams struct { tableName string createTableQueryTemplate string diff --git a/tests/integration/topic_read_writer_test.go b/tests/integration/topic_read_writer_test.go index c13fe4472..c1e10f9f7 100644 --- a/tests/integration/topic_read_writer_test.go +++ b/tests/integration/topic_read_writer_test.go @@ -9,6 +9,7 @@ import ( "encoding/binary" "errors" "io" + "os" "runtime/pprof" "strconv" "strings" @@ -22,6 +23,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/empty" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xatomic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" @@ -112,6 +114,54 @@ func TestSendSyncMessages(t *testing.T) { }) } +func TestMessageMetadata(t *testing.T) { + t.Run("NoMetadata", func(t *testing.T) { + e := newScope(t) + err := e.TopicWriter().Write(e.Ctx, topicwriter.Message{}) + e.Require.NoError(err) + + mess, err := e.TopicReader().ReadMessage(e.Ctx) + e.Require.NoError(err) + e.Require.Nil(mess.Metadata) + }) + t.Run("Meta1", func(t *testing.T) { + if version.Lt(os.Getenv("YDB_VERSION"), "24.0") { + t.Skip() + } + e := newScope(t) + meta := map[string][]byte{ + "key": []byte("val"), + } + err := e.TopicWriter().Write(e.Ctx, topicwriter.Message{ + Metadata: meta, + }) + e.Require.NoError(err) + + mess, err := e.TopicReader().ReadMessage(e.Ctx) + e.Require.NoError(err) + e.Require.Equal(meta, mess.Metadata) + }) + t.Run("Meta2", func(t *testing.T) { + if version.Lt(os.Getenv("YDB_VERSION"), "24.0") { + t.Skip() + } + e := newScope(t) + meta := map[string][]byte{ + "key1": []byte("val1"), + "key2": []byte("val2"), + "key3": []byte("val3"), + } + err := e.TopicWriter().Write(e.Ctx, topicwriter.Message{ + Metadata: meta, + }) + e.Require.NoError(err) + + mess, err := e.TopicReader().ReadMessage(e.Ctx) + e.Require.NoError(err) + e.Require.Equal(meta, mess.Metadata) + }) +} + func TestManyConcurentReadersWriters(t *testing.T) { xtest.AllowByFlag(t, "ISSUE-389") diff --git a/tests/slo/go.mod b/tests/slo/go.mod index 92395169e..35a9ce667 100644 --- a/tests/slo/go.mod +++ b/tests/slo/go.mod @@ -33,7 +33,7 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect github.com/yandex-cloud/go-genproto v0.0.0-20230403093326-123923969dc6 // indirect - github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd // indirect + github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a // indirect github.com/ydb-platform/ydb-go-sdk-auth-environ v0.2.0 // indirect github.com/ydb-platform/ydb-go-yc v0.10.2 // indirect github.com/ydb-platform/ydb-go-yc-metadata v0.5.3 // indirect diff --git a/tests/slo/go.sum b/tests/slo/go.sum index 9860f6931..e12fb847f 100644 --- a/tests/slo/go.sum +++ b/tests/slo/go.sum @@ -1039,8 +1039,8 @@ github.com/ydb-platform/gorm-driver v0.1.1 h1:PkN+sGSJehOZn9jQIFEmAOfhE73FNDMq+u github.com/ydb-platform/gorm-driver v0.1.1/go.mod h1:Zv368SD5tHqkblmaOG6r2KTvtSIzPuB5p8rBaE6wVmw= github.com/ydb-platform/xorm v0.0.6 h1:mlclMIXR7Obwho3cYIIgBoMlMZ+APJZ9gnJQICyVAYY= github.com/ydb-platform/xorm v0.0.6/go.mod h1:vLAI6Xqpa+48y9I9HJnjD6IDKp/GnATYbtDgWzQb88c= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a h1:9wx+kCrCQCdwmDe1AFW5yAHdzlo+RV7lcy6y7Zq661s= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.2.0 h1:IG5bPd+Lqyc+zsw2kmxqfGLkaDHuAEnWX63/8RBBiA4= github.com/ydb-platform/ydb-go-sdk-auth-environ v0.2.0/go.mod h1:l6lZ+osdQOjDRBgRA4PQ06BuvmXN2neYjnRw8rCfd7s= github.com/ydb-platform/ydb-go-yc v0.10.2 h1:RAHy6g7ncxk1y0N4oS2MwYXLATqRqKBI6DYXuxpV2wo=