From bcd97f9397c6dbe28b473b3a4ad81f8baa2a9210 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 18 Sep 2024 16:52:38 +0300 Subject: [PATCH 1/5] sync --- tests/integration/topic_helpers.go | 1 + topic/topicsugar/cdc-reader.go | 81 ++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 tests/integration/topic_helpers.go create mode 100644 topic/topicsugar/cdc-reader.go diff --git a/tests/integration/topic_helpers.go b/tests/integration/topic_helpers.go new file mode 100644 index 000000000..76ab1b728 --- /dev/null +++ b/tests/integration/topic_helpers.go @@ -0,0 +1 @@ +package integration diff --git a/topic/topicsugar/cdc-reader.go b/topic/topicsugar/cdc-reader.go new file mode 100644 index 000000000..6eaa5214a --- /dev/null +++ b/topic/topicsugar/cdc-reader.go @@ -0,0 +1,81 @@ +package topicsugar + +import ( + "encoding/json" + "fmt" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" +) + +// YDBCDCItem interface for represent record from table (and cdc event) +// The interface will be removed in the future (or may be set as optional) +// and replaced by field annotations +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +type YDBCDCItem[K any] interface { + comparable + ParseCDCKey([]json.RawMessage) (K, error) + SetPrimaryKey(K) +} + +// YDBCDCMessage is typed representation of cdc event +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +type YDBCDCMessage[T interface { + YDBCDCItem[Key] +}, Key any] struct { + Update T + NewImage T + OldImage T + Key Key + Erase *struct{} + TS []uint64 +} + +// IsErase returns true if the event about erase record +func (c *YDBCDCMessage[T, Key]) IsErase() bool { + return c.Erase != nil +} + +func (c *YDBCDCMessage[T, Key]) UnmarshalJSON(bytes []byte) error { + var rawItem struct { + Update T + NewImage T + OldImage T + Key []json.RawMessage + Erase *struct{} + TS []uint64 + } + + err := json.Unmarshal(bytes, &rawItem) + if err != nil { + return fmt.Errorf("failed to unmarshal cdcevent for type %T: %w", c, err) + } + + var tZero T + key, err := tZero.ParseCDCKey(rawItem.Key) + if err != nil { + return fmt.Errorf("failed to unmarshal cdcevent key for type %T: %w", c, err) + } + + c.Update = rawItem.Update + c.NewImage = rawItem.NewImage + c.OldImage = rawItem.OldImage + c.Key = key + c.Erase = rawItem.Erase + c.TS = rawItem.TS + + if c.Update != tZero { + c.Update.SetPrimaryKey(key) + } + if c.OldImage != tZero { + c.OldImage.SetPrimaryKey(key) + } + if c.NewImage != tZero { + c.NewImage.SetPrimaryKey(key) + } + + return nil +} + +func UnmarsharCDCStream[K any]() xiter.Seq2[YDB] From 17be2cd1b73869d241dfc203ede26ea38fcf3209 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 18 Sep 2024 19:25:42 +0300 Subject: [PATCH 2/5] added initial topic reader helpers --- CHANGELOG.md | 2 + tests/integration/topic_helpers.go | 1 - tests/integration/topic_helpers_test.go | 170 ++++++++++++++++++++++++ topic/topicsugar/cdc-reader.go | 34 +++-- topic/topicsugar/iterators.go | 94 +++++++++++++ 5 files changed, 288 insertions(+), 13 deletions(-) delete mode 100644 tests/integration/topic_helpers.go create mode 100644 tests/integration/topic_helpers_test.go create mode 100644 topic/topicsugar/iterators.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 5adf6a08b..ab83176f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added initial experimental topic and cdc-helpers, see examples in https://github.com/ydb-platform/ydb-go-sdk/blob/master/tests/integration/topic_helpers_test.go + ## v3.80.7 * Doesn't rollback a the transaction on the operation error in table service diff --git a/tests/integration/topic_helpers.go b/tests/integration/topic_helpers.go deleted file mode 100644 index 76ab1b728..000000000 --- a/tests/integration/topic_helpers.go +++ /dev/null @@ -1 +0,0 @@ -package integration diff --git a/tests/integration/topic_helpers_test.go b/tests/integration/topic_helpers_test.go new file mode 100644 index 000000000..3217a04cd --- /dev/null +++ b/tests/integration/topic_helpers_test.go @@ -0,0 +1,170 @@ +//go:build integration && go1.23 +// +build integration,go1.23 + +package integration + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "path" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter" +) + +func TestMessageReaderIterator(t *testing.T) { + scope := newScope(t) + ctx := scope.Ctx + + err := scope.TopicWriter().Write(ctx, + topicwriter.Message{Data: strings.NewReader("asd")}, + topicwriter.Message{Data: strings.NewReader("ddd")}, + topicwriter.Message{Data: strings.NewReader("ggg")}, + ) + require.NoError(t, err) + + var results []string + for mess, err := range topicsugar.TopicMessageIterator(ctx, scope.TopicReader()) { + require.NoError(t, err) + content, err := io.ReadAll(mess) + require.NoError(t, err) + + results = append(results, string(content)) + if len(results) == 3 { + break + } + } + require.Equal(t, []string{"asd", "ddd", "ggg"}, results) +} + +func TestMessageJsonUnmarshalIterator(t *testing.T) { + scope := newScope(t) + ctx := scope.Ctx + + marshal := func(d any) io.Reader { + content, err := json.Marshal(d) + require.NoError(t, err) + return bytes.NewReader(content) + } + + type testStruct struct { + A int + B string + } + + err := scope.TopicWriter().Write(ctx, + topicwriter.Message{Data: marshal(testStruct{A: 1, B: "asd"})}, + topicwriter.Message{Data: marshal(testStruct{A: 2, B: "fff"})}, + topicwriter.Message{Data: marshal(testStruct{A: 5, B: "qwe"})}, + ) + require.NoError(t, err) + + var results []testStruct + expectedSeqno := int64(1) + expectedOffset := int64(0) + for mess, err := range topicsugar.TopicUnmarshalJSONIterator[testStruct](ctx, scope.TopicReader()) { + require.NoError(t, err) + require.Equal(t, expectedSeqno, mess.SeqNo) + require.Equal(t, expectedOffset, mess.Offset) + + results = append(results, mess.Data) + if len(results) == 3 { + break + } + expectedSeqno++ + expectedOffset++ + } + + expectedResult := []testStruct{ + {A: 1, B: "asd"}, + {A: 2, B: "fff"}, + {A: 5, B: "qwe"}, + } + require.Equal(t, expectedResult, results) +} + +func TestCDCReaderIterator(t *testing.T) { + scope := newScope(t) + ctx := scope.Ctx + + query := fmt.Sprintf(` +PRAGMA TablePathPrefix("%s"); + +ALTER TABLE %s +ADD CHANGEFEED cdc WITH ( + FORMAT='JSON', + MODE='NEW_AND_OLD_IMAGES' +) +`, scope.Folder(), scope.TableName()) + + err := scope.Driver().Query().Exec(ctx, query) + require.NoError(t, err) + + query = fmt.Sprintf(` +PRAGMA TablePathPrefix("%s"); + +ALTER TOPIC %s +ADD CONSUMER %s; +`, scope.Folder(), "`"+scope.TableName()+"/cdc`", "`"+scope.TopicConsumerName()+"`") + + _, err = scope.Driver().Scripting().Execute(ctx, query, nil) + require.NoError(t, err) + + require.Equal(t, "table", scope.TableName()) + + prefix := fmt.Sprintf(`PRAGMA TablePathPrefix("%s");`, scope.Folder()) + + err = scope.Driver().Query().Exec(ctx, prefix+`UPSERT INTO table (id, val) VALUES (4124, "asd")`) + require.NoError(t, err) + + err = scope.Driver().Query().Exec(ctx, prefix+`UPDATE table SET val="qwe"`) + require.NoError(t, err) + + err = scope.Driver().Query().Exec(ctx, prefix+`DELETE FROM table`) + require.NoError(t, err) + + cdcPath := path.Join(scope.TablePath(), "cdc") + reader, err := scope.Driver().Topic().StartReader(scope.TopicConsumerName(), topicoptions.ReadTopic(cdcPath)) + require.NoError(t, err) + + var results []*topicsugar.TypedTopicMessage[topicsugar.YDBCDCMessage[*testCDCItem, int64]] + for event, err := range topicsugar.UnmarshalCDCStream[*testCDCItem, int64](ctx, reader) { + require.NoError(t, err) + results = append(results, event) + if len(results) == 3 { + break + } + } + + require.Equal(t, &testCDCItem{ID: 4124, Val: "asd"}, results[0].Data.NewImage) + require.False(t, results[0].Data.IsErase()) + + require.Equal(t, &testCDCItem{ID: 4124, Val: "asd"}, results[1].Data.OldImage) + require.Equal(t, &testCDCItem{ID: 4124, Val: "qwe"}, results[1].Data.NewImage) + require.False(t, results[0].Data.IsErase()) + + require.Equal(t, &testCDCItem{ID: 4124, Val: "qwe"}, results[2].Data.OldImage) + require.True(t, results[2].Data.IsErase()) +} + +type testCDCItem struct { + ID int64 + Val string +} + +func (t *testCDCItem) ParseCDCKey(messages []json.RawMessage) (int64, error) { + var key int64 + err := json.Unmarshal(messages[0], &key) + return key, err +} + +func (t *testCDCItem) SetPrimaryKey(k int64) { + t.ID = k +} diff --git a/topic/topicsugar/cdc-reader.go b/topic/topicsugar/cdc-reader.go index 6eaa5214a..44d86fcd2 100644 --- a/topic/topicsugar/cdc-reader.go +++ b/topic/topicsugar/cdc-reader.go @@ -1,6 +1,9 @@ +//go:build go1.23 + package topicsugar import ( + "context" "encoding/json" "fmt" @@ -14,16 +17,14 @@ import ( // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental type YDBCDCItem[K any] interface { comparable - ParseCDCKey([]json.RawMessage) (K, error) - SetPrimaryKey(K) + ParseCDCKey(keyFields []json.RawMessage) (K, error) + SetPrimaryKey(key K) } // YDBCDCMessage is typed representation of cdc event // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental -type YDBCDCMessage[T interface { - YDBCDCItem[Key] -}, Key any] struct { +type YDBCDCMessage[T YDBCDCItem[Key], Key any] struct { Update T NewImage T OldImage T @@ -39,12 +40,12 @@ func (c *YDBCDCMessage[T, Key]) IsErase() bool { func (c *YDBCDCMessage[T, Key]) UnmarshalJSON(bytes []byte) error { var rawItem struct { - Update T - NewImage T - OldImage T - Key []json.RawMessage - Erase *struct{} - TS []uint64 + Update T `json:"update"` + NewImage T `json:"newImage"` + OldImage T `json:"oldImage"` + Key []json.RawMessage `json:"key"` + Erase *struct{} `json:"erase"` + TS []uint64 `json:"ts"` } err := json.Unmarshal(bytes, &rawItem) @@ -78,4 +79,13 @@ func (c *YDBCDCMessage[T, Key]) UnmarshalJSON(bytes []byte) error { return nil } -func UnmarsharCDCStream[K any]() xiter.Seq2[YDB] +func UnmarshalCDCStream[T YDBCDCItem[K], K any]( + ctx context.Context, + reader TopicMessageReader, +) xiter.Seq2[*TypedTopicMessage[YDBCDCMessage[T, K]], error] { + var unmarshal TypedUnmarshalFunc[*YDBCDCMessage[T, K]] = func(data []byte, dst *YDBCDCMessage[T, K]) error { + return json.Unmarshal(data, dst) + } + + return TopicUnmarshalJSONFunc[YDBCDCMessage[T, K]](ctx, reader, unmarshal) +} diff --git a/topic/topicsugar/iterators.go b/topic/topicsugar/iterators.go new file mode 100644 index 000000000..7f5ee5b2c --- /dev/null +++ b/topic/topicsugar/iterators.go @@ -0,0 +1,94 @@ +package topicsugar + +import ( + "context" + "encoding/json" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" +) + +// MessageReader is interface for topicreader.Message +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +type TopicMessageReader interface { + ReadMessage(ctx context.Context) (*topicreader.Message, error) +} + +// TopicMessagesIterator is typed representation of cdc event +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func TopicMessageIterator(ctx context.Context, r TopicMessageReader) xiter.Seq2[*topicreader.Message, error] { + return func(yield func(*topicreader.Message, error) bool) { + for { + mess, err := r.ReadMessage(ctx) + if !yield(mess, err) { + return + } + + if err != nil { + return + } + } + } +} + +// TopicUnmarshalJSONIterator is typed representation of cdc event +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func TopicUnmarshalJSONIterator[T any]( + ctx context.Context, + r TopicMessageReader, +) xiter.Seq2[*TypedTopicMessage[T], error] { + var unmarshalFunc TypedUnmarshalFunc[*T] = func(data []byte, dst *T) error { + return json.Unmarshal(data, dst) + } + + return TopicUnmarshalJSONFunc[T](ctx, r, unmarshalFunc) +} + +// TopicUnmarshalJSONIterator is typed representation of cdc event +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func TopicUnmarshalJSONFunc[T any]( + ctx context.Context, + r TopicMessageReader, + f TypedUnmarshalFunc[*T], +) xiter.Seq2[*TypedTopicMessage[T], error] { + return func(yield func(*TypedTopicMessage[T], error) bool) { + for { + mess, err := r.ReadMessage(ctx) + if err != nil { + yield(nil, err) + + return + } + + var res TypedTopicMessage[T] + + var unmarshal UnmarshalFunc = func(data []byte, _ any) error { + return f(data, &res.Data) + } + + err = UnmarshalMessageWith(mess, unmarshal, nil) + if err != nil { + yield(nil, err) + + return + } + + res.Message = mess + + if !yield(&res, err) { + return + } + } + } +} + +type TypedTopicMessage[T any] struct { + *topicreader.Message + Data T +} + +type TypedUnmarshalFunc[T any] func(data []byte, dst T) error From ff50f7f6cb54201ee3dcaf361fe83f194b945a5d Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 18 Sep 2024 19:32:28 +0300 Subject: [PATCH 3/5] fix link in changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab83176f4..d12d032d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -* Added initial experimental topic and cdc-helpers, see examples in https://github.com/ydb-platform/ydb-go-sdk/blob/master/tests/integration/topic_helpers_test.go +* Added initial experimental topic and cdc-helpers, see examples in [tests/integration/topic_helpers_test.go](https://github.com/ydb-platform/ydb-go-sdk/blob/master/tests/integration/topic_helpers_test.go) ## v3.80.7 * Doesn't rollback a the transaction on the operation error in table service From ec9ee63bca6d8592adc385d603fe4eabec0d6072 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 18 Sep 2024 19:42:55 +0300 Subject: [PATCH 4/5] fix query service to scripting --- tests/integration/topic_helpers_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/topic_helpers_test.go b/tests/integration/topic_helpers_test.go index 3217a04cd..b8f5ccc8c 100644 --- a/tests/integration/topic_helpers_test.go +++ b/tests/integration/topic_helpers_test.go @@ -104,7 +104,7 @@ ADD CHANGEFEED cdc WITH ( ) `, scope.Folder(), scope.TableName()) - err := scope.Driver().Query().Exec(ctx, query) + _, err := scope.Driver().Scripting().Execute(ctx, query, nil) require.NoError(t, err) query = fmt.Sprintf(` From 2cf008a06541430981f8048be4b1ed8d6a853ca0 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 18 Sep 2024 20:23:11 +0300 Subject: [PATCH 5/5] skip test for old ydb --- tests/integration/topic_helpers_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/integration/topic_helpers_test.go b/tests/integration/topic_helpers_test.go index b8f5ccc8c..61a2b0569 100644 --- a/tests/integration/topic_helpers_test.go +++ b/tests/integration/topic_helpers_test.go @@ -8,12 +8,14 @@ import ( "encoding/json" "fmt" "io" + "os" "path" "strings" "testing" "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter" @@ -91,6 +93,9 @@ func TestMessageJsonUnmarshalIterator(t *testing.T) { } func TestCDCReaderIterator(t *testing.T) { + if os.Getenv("YDB_VERSION") != "nightly" && version.Lt(os.Getenv("YDB_VERSION"), "24.1") { + t.Skip("require minimum version 24.1 for work with within yql") + } scope := newScope(t) ctx := scope.Ctx