diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d839e89b..807b91827 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Added initial experimental topic and cdc-helpers, see examples in [tests/integration/topic_helpers_test.go](https://github.com/ydb-platform/ydb-go-sdk/blob/master/tests/integration/topic_helpers_test.go) * Added experimental `sugar.UnmarshalRows` for user unmarshaller structs in own code in go 1.23, change example for use the iterator. * Added `ydb_go_sdk_ydb_query_pool_size_limit` metrics diff --git a/tests/integration/topic_helpers_test.go b/tests/integration/topic_helpers_test.go new file mode 100644 index 000000000..61a2b0569 --- /dev/null +++ b/tests/integration/topic_helpers_test.go @@ -0,0 +1,175 @@ +//go:build integration && go1.23 +// +build integration,go1.23 + +package integration + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "os" + "path" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter" +) + +func TestMessageReaderIterator(t *testing.T) { + scope := newScope(t) + ctx := scope.Ctx + + err := scope.TopicWriter().Write(ctx, + topicwriter.Message{Data: strings.NewReader("asd")}, + topicwriter.Message{Data: strings.NewReader("ddd")}, + topicwriter.Message{Data: strings.NewReader("ggg")}, + ) + require.NoError(t, err) + + var results []string + for mess, err := range topicsugar.TopicMessageIterator(ctx, scope.TopicReader()) { + require.NoError(t, err) + content, err := io.ReadAll(mess) + require.NoError(t, err) + + results = append(results, string(content)) + if len(results) == 3 { + break + } + } + require.Equal(t, []string{"asd", "ddd", "ggg"}, results) +} + +func TestMessageJsonUnmarshalIterator(t *testing.T) { + scope := newScope(t) + ctx := scope.Ctx + + marshal := func(d any) io.Reader { + content, err := json.Marshal(d) + require.NoError(t, err) + return bytes.NewReader(content) + } + + type testStruct struct { + A int + B string + } + + err := scope.TopicWriter().Write(ctx, + topicwriter.Message{Data: marshal(testStruct{A: 1, B: "asd"})}, + topicwriter.Message{Data: marshal(testStruct{A: 2, B: "fff"})}, + topicwriter.Message{Data: marshal(testStruct{A: 5, B: "qwe"})}, + ) + require.NoError(t, err) + + var results []testStruct + expectedSeqno := int64(1) + expectedOffset := int64(0) + for mess, err := range topicsugar.TopicUnmarshalJSONIterator[testStruct](ctx, scope.TopicReader()) { + require.NoError(t, err) + require.Equal(t, expectedSeqno, mess.SeqNo) + require.Equal(t, expectedOffset, mess.Offset) + + results = append(results, mess.Data) + if len(results) == 3 { + break + } + expectedSeqno++ + expectedOffset++ + } + + expectedResult := []testStruct{ + {A: 1, B: "asd"}, + {A: 2, B: "fff"}, + {A: 5, B: "qwe"}, + } + require.Equal(t, expectedResult, results) +} + +func TestCDCReaderIterator(t *testing.T) { + if os.Getenv("YDB_VERSION") != "nightly" && version.Lt(os.Getenv("YDB_VERSION"), "24.1") { + t.Skip("require minimum version 24.1 for work with within yql") + } + scope := newScope(t) + ctx := scope.Ctx + + query := fmt.Sprintf(` +PRAGMA TablePathPrefix("%s"); + +ALTER TABLE %s +ADD CHANGEFEED cdc WITH ( + FORMAT='JSON', + MODE='NEW_AND_OLD_IMAGES' +) +`, scope.Folder(), scope.TableName()) + + _, err := scope.Driver().Scripting().Execute(ctx, query, nil) + require.NoError(t, err) + + query = fmt.Sprintf(` +PRAGMA TablePathPrefix("%s"); + +ALTER TOPIC %s +ADD CONSUMER %s; +`, scope.Folder(), "`"+scope.TableName()+"/cdc`", "`"+scope.TopicConsumerName()+"`") + + _, err = scope.Driver().Scripting().Execute(ctx, query, nil) + require.NoError(t, err) + + require.Equal(t, "table", scope.TableName()) + + prefix := fmt.Sprintf(`PRAGMA TablePathPrefix("%s");`, scope.Folder()) + + err = scope.Driver().Query().Exec(ctx, prefix+`UPSERT INTO table (id, val) VALUES (4124, "asd")`) + require.NoError(t, err) + + err = scope.Driver().Query().Exec(ctx, prefix+`UPDATE table SET val="qwe"`) + require.NoError(t, err) + + err = scope.Driver().Query().Exec(ctx, prefix+`DELETE FROM table`) + require.NoError(t, err) + + cdcPath := path.Join(scope.TablePath(), "cdc") + reader, err := scope.Driver().Topic().StartReader(scope.TopicConsumerName(), topicoptions.ReadTopic(cdcPath)) + require.NoError(t, err) + + var results []*topicsugar.TypedTopicMessage[topicsugar.YDBCDCMessage[*testCDCItem, int64]] + for event, err := range topicsugar.UnmarshalCDCStream[*testCDCItem, int64](ctx, reader) { + require.NoError(t, err) + results = append(results, event) + if len(results) == 3 { + break + } + } + + require.Equal(t, &testCDCItem{ID: 4124, Val: "asd"}, results[0].Data.NewImage) + require.False(t, results[0].Data.IsErase()) + + require.Equal(t, &testCDCItem{ID: 4124, Val: "asd"}, results[1].Data.OldImage) + require.Equal(t, &testCDCItem{ID: 4124, Val: "qwe"}, results[1].Data.NewImage) + require.False(t, results[0].Data.IsErase()) + + require.Equal(t, &testCDCItem{ID: 4124, Val: "qwe"}, results[2].Data.OldImage) + require.True(t, results[2].Data.IsErase()) +} + +type testCDCItem struct { + ID int64 + Val string +} + +func (t *testCDCItem) ParseCDCKey(messages []json.RawMessage) (int64, error) { + var key int64 + err := json.Unmarshal(messages[0], &key) + return key, err +} + +func (t *testCDCItem) SetPrimaryKey(k int64) { + t.ID = k +} diff --git a/topic/topicsugar/cdc-reader.go b/topic/topicsugar/cdc-reader.go new file mode 100644 index 000000000..44d86fcd2 --- /dev/null +++ b/topic/topicsugar/cdc-reader.go @@ -0,0 +1,91 @@ +//go:build go1.23 + +package topicsugar + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" +) + +// YDBCDCItem interface for represent record from table (and cdc event) +// The interface will be removed in the future (or may be set as optional) +// and replaced by field annotations +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +type YDBCDCItem[K any] interface { + comparable + ParseCDCKey(keyFields []json.RawMessage) (K, error) + SetPrimaryKey(key K) +} + +// YDBCDCMessage is typed representation of cdc event +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +type YDBCDCMessage[T YDBCDCItem[Key], Key any] struct { + Update T + NewImage T + OldImage T + Key Key + Erase *struct{} + TS []uint64 +} + +// IsErase returns true if the event about erase record +func (c *YDBCDCMessage[T, Key]) IsErase() bool { + return c.Erase != nil +} + +func (c *YDBCDCMessage[T, Key]) UnmarshalJSON(bytes []byte) error { + var rawItem struct { + Update T `json:"update"` + NewImage T `json:"newImage"` + OldImage T `json:"oldImage"` + Key []json.RawMessage `json:"key"` + Erase *struct{} `json:"erase"` + TS []uint64 `json:"ts"` + } + + err := json.Unmarshal(bytes, &rawItem) + if err != nil { + return fmt.Errorf("failed to unmarshal cdcevent for type %T: %w", c, err) + } + + var tZero T + key, err := tZero.ParseCDCKey(rawItem.Key) + if err != nil { + return fmt.Errorf("failed to unmarshal cdcevent key for type %T: %w", c, err) + } + + c.Update = rawItem.Update + c.NewImage = rawItem.NewImage + c.OldImage = rawItem.OldImage + c.Key = key + c.Erase = rawItem.Erase + c.TS = rawItem.TS + + if c.Update != tZero { + c.Update.SetPrimaryKey(key) + } + if c.OldImage != tZero { + c.OldImage.SetPrimaryKey(key) + } + if c.NewImage != tZero { + c.NewImage.SetPrimaryKey(key) + } + + return nil +} + +func UnmarshalCDCStream[T YDBCDCItem[K], K any]( + ctx context.Context, + reader TopicMessageReader, +) xiter.Seq2[*TypedTopicMessage[YDBCDCMessage[T, K]], error] { + var unmarshal TypedUnmarshalFunc[*YDBCDCMessage[T, K]] = func(data []byte, dst *YDBCDCMessage[T, K]) error { + return json.Unmarshal(data, dst) + } + + return TopicUnmarshalJSONFunc[YDBCDCMessage[T, K]](ctx, reader, unmarshal) +} diff --git a/topic/topicsugar/iterators.go b/topic/topicsugar/iterators.go new file mode 100644 index 000000000..7f5ee5b2c --- /dev/null +++ b/topic/topicsugar/iterators.go @@ -0,0 +1,94 @@ +package topicsugar + +import ( + "context" + "encoding/json" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" +) + +// MessageReader is interface for topicreader.Message +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +type TopicMessageReader interface { + ReadMessage(ctx context.Context) (*topicreader.Message, error) +} + +// TopicMessagesIterator is typed representation of cdc event +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func TopicMessageIterator(ctx context.Context, r TopicMessageReader) xiter.Seq2[*topicreader.Message, error] { + return func(yield func(*topicreader.Message, error) bool) { + for { + mess, err := r.ReadMessage(ctx) + if !yield(mess, err) { + return + } + + if err != nil { + return + } + } + } +} + +// TopicUnmarshalJSONIterator is typed representation of cdc event +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func TopicUnmarshalJSONIterator[T any]( + ctx context.Context, + r TopicMessageReader, +) xiter.Seq2[*TypedTopicMessage[T], error] { + var unmarshalFunc TypedUnmarshalFunc[*T] = func(data []byte, dst *T) error { + return json.Unmarshal(data, dst) + } + + return TopicUnmarshalJSONFunc[T](ctx, r, unmarshalFunc) +} + +// TopicUnmarshalJSONIterator is typed representation of cdc event +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func TopicUnmarshalJSONFunc[T any]( + ctx context.Context, + r TopicMessageReader, + f TypedUnmarshalFunc[*T], +) xiter.Seq2[*TypedTopicMessage[T], error] { + return func(yield func(*TypedTopicMessage[T], error) bool) { + for { + mess, err := r.ReadMessage(ctx) + if err != nil { + yield(nil, err) + + return + } + + var res TypedTopicMessage[T] + + var unmarshal UnmarshalFunc = func(data []byte, _ any) error { + return f(data, &res.Data) + } + + err = UnmarshalMessageWith(mess, unmarshal, nil) + if err != nil { + yield(nil, err) + + return + } + + res.Message = mess + + if !yield(&res, err) { + return + } + } + } +} + +type TypedTopicMessage[T any] struct { + *topicreader.Message + Data T +} + +type TypedUnmarshalFunc[T any] func(data []byte, dst T) error