Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cdc helpers #1472

Merged
merged 7 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Added initial experimental topic and cdc-helpers, see examples in [tests/integration/topic_helpers_test.go](https://github.com/ydb-platform/ydb-go-sdk/blob/master/tests/integration/topic_helpers_test.go)
* Added experimental `sugar.UnmarshalRows` for user unmarshaller structs in own code in go 1.23, change example for use the iterator.
* Added `ydb_go_sdk_ydb_query_pool_size_limit` metrics

Expand Down
175 changes: 175 additions & 0 deletions tests/integration/topic_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
//go:build integration && go1.23
// +build integration,go1.23

package integration

import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"path"
"strings"
"testing"

"github.com/stretchr/testify/require"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/version"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
)

func TestMessageReaderIterator(t *testing.T) {
scope := newScope(t)
ctx := scope.Ctx

err := scope.TopicWriter().Write(ctx,
topicwriter.Message{Data: strings.NewReader("asd")},
topicwriter.Message{Data: strings.NewReader("ddd")},
topicwriter.Message{Data: strings.NewReader("ggg")},
)
require.NoError(t, err)

var results []string
for mess, err := range topicsugar.TopicMessageIterator(ctx, scope.TopicReader()) {
require.NoError(t, err)
content, err := io.ReadAll(mess)
require.NoError(t, err)

results = append(results, string(content))
if len(results) == 3 {
break
}
}
require.Equal(t, []string{"asd", "ddd", "ggg"}, results)
}

func TestMessageJsonUnmarshalIterator(t *testing.T) {
scope := newScope(t)
ctx := scope.Ctx

marshal := func(d any) io.Reader {
content, err := json.Marshal(d)
require.NoError(t, err)
return bytes.NewReader(content)
}

type testStruct struct {
A int
B string
}

err := scope.TopicWriter().Write(ctx,
topicwriter.Message{Data: marshal(testStruct{A: 1, B: "asd"})},
topicwriter.Message{Data: marshal(testStruct{A: 2, B: "fff"})},
topicwriter.Message{Data: marshal(testStruct{A: 5, B: "qwe"})},
)
require.NoError(t, err)

var results []testStruct
expectedSeqno := int64(1)
expectedOffset := int64(0)
for mess, err := range topicsugar.TopicUnmarshalJSONIterator[testStruct](ctx, scope.TopicReader()) {
require.NoError(t, err)
require.Equal(t, expectedSeqno, mess.SeqNo)
require.Equal(t, expectedOffset, mess.Offset)

results = append(results, mess.Data)
if len(results) == 3 {
break
}
expectedSeqno++
expectedOffset++
}

expectedResult := []testStruct{
{A: 1, B: "asd"},
{A: 2, B: "fff"},
{A: 5, B: "qwe"},
}
require.Equal(t, expectedResult, results)
}

func TestCDCReaderIterator(t *testing.T) {
if os.Getenv("YDB_VERSION") != "nightly" && version.Lt(os.Getenv("YDB_VERSION"), "24.1") {
t.Skip("require minimum version 24.1 for work with within yql")
}
scope := newScope(t)
ctx := scope.Ctx

query := fmt.Sprintf(`
PRAGMA TablePathPrefix("%s");

ALTER TABLE %s
ADD CHANGEFEED cdc WITH (
FORMAT='JSON',
MODE='NEW_AND_OLD_IMAGES'
)
`, scope.Folder(), scope.TableName())

_, err := scope.Driver().Scripting().Execute(ctx, query, nil)
require.NoError(t, err)

query = fmt.Sprintf(`
PRAGMA TablePathPrefix("%s");

ALTER TOPIC %s
ADD CONSUMER %s;
`, scope.Folder(), "`"+scope.TableName()+"/cdc`", "`"+scope.TopicConsumerName()+"`")

_, err = scope.Driver().Scripting().Execute(ctx, query, nil)
require.NoError(t, err)

require.Equal(t, "table", scope.TableName())

prefix := fmt.Sprintf(`PRAGMA TablePathPrefix("%s");`, scope.Folder())

err = scope.Driver().Query().Exec(ctx, prefix+`UPSERT INTO table (id, val) VALUES (4124, "asd")`)
require.NoError(t, err)

err = scope.Driver().Query().Exec(ctx, prefix+`UPDATE table SET val="qwe"`)
require.NoError(t, err)

err = scope.Driver().Query().Exec(ctx, prefix+`DELETE FROM table`)
require.NoError(t, err)

cdcPath := path.Join(scope.TablePath(), "cdc")
reader, err := scope.Driver().Topic().StartReader(scope.TopicConsumerName(), topicoptions.ReadTopic(cdcPath))
require.NoError(t, err)

var results []*topicsugar.TypedTopicMessage[topicsugar.YDBCDCMessage[*testCDCItem, int64]]
for event, err := range topicsugar.UnmarshalCDCStream[*testCDCItem, int64](ctx, reader) {
require.NoError(t, err)
results = append(results, event)
if len(results) == 3 {
break
}
}

require.Equal(t, &testCDCItem{ID: 4124, Val: "asd"}, results[0].Data.NewImage)
require.False(t, results[0].Data.IsErase())

require.Equal(t, &testCDCItem{ID: 4124, Val: "asd"}, results[1].Data.OldImage)
require.Equal(t, &testCDCItem{ID: 4124, Val: "qwe"}, results[1].Data.NewImage)
require.False(t, results[0].Data.IsErase())

require.Equal(t, &testCDCItem{ID: 4124, Val: "qwe"}, results[2].Data.OldImage)
require.True(t, results[2].Data.IsErase())
}

type testCDCItem struct {
ID int64
Val string
}

func (t *testCDCItem) ParseCDCKey(messages []json.RawMessage) (int64, error) {
var key int64
err := json.Unmarshal(messages[0], &key)
return key, err
}

func (t *testCDCItem) SetPrimaryKey(k int64) {
t.ID = k
}
91 changes: 91 additions & 0 deletions topic/topicsugar/cdc-reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//go:build go1.23

package topicsugar

import (
"context"
"encoding/json"
"fmt"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
)

// YDBCDCItem interface for represent record from table (and cdc event)
// The interface will be removed in the future (or may be set as optional)
// and replaced by field annotations
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type YDBCDCItem[K any] interface {
comparable
ParseCDCKey(keyFields []json.RawMessage) (K, error)
SetPrimaryKey(key K)
}

// YDBCDCMessage is typed representation of cdc event
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type YDBCDCMessage[T YDBCDCItem[Key], Key any] struct {
Update T
NewImage T
OldImage T
Key Key
Erase *struct{}
TS []uint64
}

// IsErase returns true if the event about erase record
func (c *YDBCDCMessage[T, Key]) IsErase() bool {
return c.Erase != nil
}

func (c *YDBCDCMessage[T, Key]) UnmarshalJSON(bytes []byte) error {
var rawItem struct {
Update T `json:"update"`
NewImage T `json:"newImage"`
OldImage T `json:"oldImage"`
Key []json.RawMessage `json:"key"`
Erase *struct{} `json:"erase"`
TS []uint64 `json:"ts"`
}

err := json.Unmarshal(bytes, &rawItem)
if err != nil {
return fmt.Errorf("failed to unmarshal cdcevent for type %T: %w", c, err)
}

var tZero T
key, err := tZero.ParseCDCKey(rawItem.Key)
if err != nil {
return fmt.Errorf("failed to unmarshal cdcevent key for type %T: %w", c, err)
}

c.Update = rawItem.Update
c.NewImage = rawItem.NewImage
c.OldImage = rawItem.OldImage
c.Key = key
c.Erase = rawItem.Erase
c.TS = rawItem.TS

if c.Update != tZero {
c.Update.SetPrimaryKey(key)
}
if c.OldImage != tZero {
c.OldImage.SetPrimaryKey(key)
}
if c.NewImage != tZero {
c.NewImage.SetPrimaryKey(key)
}

return nil
}

func UnmarshalCDCStream[T YDBCDCItem[K], K any](
ctx context.Context,
reader TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[YDBCDCMessage[T, K]], error] {
var unmarshal TypedUnmarshalFunc[*YDBCDCMessage[T, K]] = func(data []byte, dst *YDBCDCMessage[T, K]) error {
return json.Unmarshal(data, dst)
}

return TopicUnmarshalJSONFunc[YDBCDCMessage[T, K]](ctx, reader, unmarshal)
}
94 changes: 94 additions & 0 deletions topic/topicsugar/iterators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package topicsugar

import (
"context"
"encoding/json"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

// MessageReader is interface for topicreader.Message
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type TopicMessageReader interface {
ReadMessage(ctx context.Context) (*topicreader.Message, error)
}

// TopicMessagesIterator is typed representation of cdc event
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func TopicMessageIterator(ctx context.Context, r TopicMessageReader) xiter.Seq2[*topicreader.Message, error] {
return func(yield func(*topicreader.Message, error) bool) {
for {
mess, err := r.ReadMessage(ctx)
if !yield(mess, err) {
return
}

if err != nil {
return
}
}
}
}

// TopicUnmarshalJSONIterator is typed representation of cdc event
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func TopicUnmarshalJSONIterator[T any](
ctx context.Context,
r TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[T], error] {
var unmarshalFunc TypedUnmarshalFunc[*T] = func(data []byte, dst *T) error {
return json.Unmarshal(data, dst)
}

return TopicUnmarshalJSONFunc[T](ctx, r, unmarshalFunc)
}

// TopicUnmarshalJSONIterator is typed representation of cdc event
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func TopicUnmarshalJSONFunc[T any](
ctx context.Context,
r TopicMessageReader,
f TypedUnmarshalFunc[*T],
) xiter.Seq2[*TypedTopicMessage[T], error] {
return func(yield func(*TypedTopicMessage[T], error) bool) {
for {
mess, err := r.ReadMessage(ctx)
if err != nil {
yield(nil, err)

return
}

var res TypedTopicMessage[T]

var unmarshal UnmarshalFunc = func(data []byte, _ any) error {
return f(data, &res.Data)
}

err = UnmarshalMessageWith(mess, unmarshal, nil)
if err != nil {
yield(nil, err)

return
}

res.Message = mess

if !yield(&res, err) {
return
}
}
}
}

type TypedTopicMessage[T any] struct {
*topicreader.Message
Data T
}

type TypedUnmarshalFunc[T any] func(data []byte, dst T) error
Loading