diff --git a/examples/topic/cdc-cache-bus-freeseats/balancer.go b/examples/topic/cdc-cache-bus-freeseats/balancer.go index 6fba494e0..989d13819 100644 --- a/examples/topic/cdc-cache-bus-freeseats/balancer.go +++ b/examples/topic/cdc-cache-bus-freeseats/balancer.go @@ -13,6 +13,7 @@ type balancer struct { func newBalancer(handlers ...http.Handler) *balancer { return &balancer{ handlers: handlers, + counter: 0, } } diff --git a/examples/topic/cdc-cache-bus-freeseats/cache.go b/examples/topic/cdc-cache-bus-freeseats/cache.go index 4e67084f6..077e42ec7 100644 --- a/examples/topic/cdc-cache-bus-freeseats/cache.go +++ b/examples/topic/cdc-cache-bus-freeseats/cache.go @@ -15,8 +15,10 @@ type Cache struct { func NewCache(timeout time.Duration) *Cache { return &Cache{ - timeout: timeout, - values: make(map[string]CacheItem), + timeout: timeout, + values: make(map[string]CacheItem), + m: sync.Mutex{}, + setCounter: 0, } } diff --git a/examples/topic/cdc-cache-bus-freeseats/database.go b/examples/topic/cdc-cache-bus-freeseats/database.go index 43ad188ce..2b44a5b55 100644 --- a/examples/topic/cdc-cache-bus-freeseats/database.go +++ b/examples/topic/cdc-cache-bus-freeseats/database.go @@ -67,7 +67,11 @@ UPSERT INTO bus (id, freeSeats) VALUES ("bus1", 40), ("bus2", 60); func createCosumers(ctx context.Context, db *ydb.Driver, consumersCount int) error { for i := 0; i < consumersCount; i++ { err := db.Topic().Alter(ctx, "bus/updates", topicoptions.AlterWithAddConsumers(topictypes.Consumer{ - Name: consumerName(i), + Name: consumerName(i), + Important: false, + SupportedCodecs: nil, + ReadFrom: time.Time{}, + Attributes: nil, })) if err != nil { return err diff --git a/examples/topic/cdc-cache-bus-freeseats/webserver.go b/examples/topic/cdc-cache-bus-freeseats/webserver.go index 3825fabce..b10ffd42d 100644 --- a/examples/topic/cdc-cache-bus-freeseats/webserver.go +++ b/examples/topic/cdc-cache-bus-freeseats/webserver.go @@ -27,9 +27,11 @@ type server struct { func newServer(id int, db *ydb.Driver, cacheTimeout time.Duration, useCDC bool) *server { res := &server{ - cache: NewCache(cacheTimeout), - db: db, - id: id, + cache: NewCache(cacheTimeout), + db: db, + id: id, + mux: http.ServeMux{}, + dbCounter: 0, } res.mux.HandleFunc("/", res.IndexPageHandler) diff --git a/examples/topic/cdc-cache-bus-freeseats/webserver_cdc.go b/examples/topic/cdc-cache-bus-freeseats/webserver_cdc.go index 70f4a1f22..96bcebe06 100644 --- a/examples/topic/cdc-cache-bus-freeseats/webserver_cdc.go +++ b/examples/topic/cdc-cache-bus-freeseats/webserver_cdc.go @@ -14,8 +14,10 @@ func (s *server) cdcLoop() { consumer := consumerName(s.id) reader, err := s.db.Topic().StartReader(consumer, topicoptions.ReadSelectors{ { - Path: "bus/updates", - ReadFrom: time.Now(), + Path: "bus/updates", + ReadFrom: time.Now(), + Partitions: nil, + MaxTimeLag: time.Duration(0), }, }, ) diff --git a/examples/topic/cdc-fill-and-read/cdc-reader.go b/examples/topic/cdc-fill-and-read/cdc-reader.go index b3771009d..50bf3b4cb 100644 --- a/examples/topic/cdc-fill-and-read/cdc-reader.go +++ b/examples/topic/cdc-fill-and-read/cdc-reader.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "time" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" @@ -14,7 +15,10 @@ func cdcRead(ctx context.Context, db *ydb.Driver, consumerName, topicPath string // Connect to changefeed log.Println("Start cdc read") - reader, err := db.Topic().StartReader(consumerName, []topicoptions.ReadSelector{{Path: topicPath}}) + reader, err := db.Topic().StartReader( + consumerName, + []topicoptions.ReadSelector{{Path: topicPath, Partitions: nil, ReadFrom: time.Time{}, MaxTimeLag: time.Duration(0)}}, + ) if err != nil { log.Fatal("failed to start read feed", err) } diff --git a/examples/topic/cdc-fill-and-read/main.go b/examples/topic/cdc-fill-and-read/main.go index d0cfdbe47..175395655 100644 --- a/examples/topic/cdc-fill-and-read/main.go +++ b/examples/topic/cdc-fill-and-read/main.go @@ -103,7 +103,11 @@ func prepareTableWithCDC(ctx context.Context, db *ydb.Driver, prefix, tableName, log.Println("Create consumer") err = db.Topic().Alter(ctx, topicPath, topicoptions.AlterWithAddConsumers(topictypes.Consumer{ - Name: consumerName, + Name: consumerName, + Important: false, + SupportedCodecs: nil, + ReadFrom: time.Time{}, + Attributes: nil, })) if err != nil { panic(fmt.Errorf("failed to create feed consumer: %w", err)) diff --git a/examples/topic/topicreader/topicreader_simple.go b/examples/topic/topicreader/topicreader_simple.go index 6fb568037..c58a5e1eb 100644 --- a/examples/topic/topicreader/topicreader_simple.go +++ b/examples/topic/topicreader/topicreader_simple.go @@ -44,7 +44,7 @@ func UnmarshalMessageContentToJSONStruct(msg *topicreader.Message) { // UnmarshalMessageContentToProtobufStruct is example for effective way for unmarshal protobuf message content to value func UnmarshalMessageContentToProtobufStruct(msg *topicreader.Message) { - v := &firestore.BundledDocumentMetadata{} // protobuf type + v := new(firestore.BundledDocumentMetadata) // protobuf type _ = topicsugar.ProtoUnmarshal(msg, v) } diff --git a/examples/topic/topicreader/topicreader_trace.go b/examples/topic/topicreader/topicreader_trace.go index 28ab11427..461f8dc9e 100644 --- a/examples/topic/topicreader/topicreader_trace.go +++ b/examples/topic/topicreader/topicreader_trace.go @@ -17,6 +17,27 @@ func CommitNotify(ctx context.Context, db *ydb.Driver) { // called when receive commit notify from server fmt.Println(info.Topic, info.PartitionID, info.CommittedOffset) }, + OnReaderStart: nil, + OnReaderReconnect: nil, + OnReaderReconnectRequest: nil, + OnReaderPartitionReadStartResponse: nil, + OnReaderPartitionReadStopResponse: nil, + OnReaderCommit: nil, + OnReaderSendCommitMessage: nil, + OnReaderClose: nil, + OnReaderInit: nil, + OnReaderError: nil, + OnReaderUpdateToken: nil, + OnReaderSentDataRequest: nil, + OnReaderReceiveDataResponse: nil, + OnReaderReadMessages: nil, + OnReaderUnknownGrpcMessage: nil, + OnWriterReconnect: nil, + OnWriterInitStream: nil, + OnWriterClose: nil, + OnWriterCompressMessages: nil, + OnWriterSendMessages: nil, + OnWriterReadUnknownGrpcMessage: nil, }, ), ) @@ -61,6 +82,26 @@ func ExplicitPartitionStartStopHandler(ctx context.Context, db *ydb.Driver) { return nil }, + OnReaderStart: nil, + OnReaderReconnect: nil, + OnReaderReconnectRequest: nil, + OnReaderCommit: nil, + OnReaderSendCommitMessage: nil, + OnReaderCommittedNotify: nil, + OnReaderClose: nil, + OnReaderInit: nil, + OnReaderError: nil, + OnReaderUpdateToken: nil, + OnReaderSentDataRequest: nil, + OnReaderReceiveDataResponse: nil, + OnReaderReadMessages: nil, + OnReaderUnknownGrpcMessage: nil, + OnWriterReconnect: nil, + OnWriterInitStream: nil, + OnWriterClose: nil, + OnWriterCompressMessages: nil, + OnWriterSendMessages: nil, + OnWriterReadUnknownGrpcMessage: nil, }, ), ) @@ -135,6 +176,26 @@ func PartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db trace.Topic{ OnReaderPartitionReadStartResponse: onPartitionStart, OnReaderPartitionReadStopResponse: onPartitionStop, + OnReaderStart: nil, + OnReaderReconnect: nil, + OnReaderReconnectRequest: nil, + OnReaderCommit: nil, + OnReaderSendCommitMessage: nil, + OnReaderCommittedNotify: nil, + OnReaderClose: nil, + OnReaderInit: nil, + OnReaderError: nil, + OnReaderUpdateToken: nil, + OnReaderSentDataRequest: nil, + OnReaderReceiveDataResponse: nil, + OnReaderReadMessages: nil, + OnReaderUnknownGrpcMessage: nil, + OnWriterReconnect: nil, + OnWriterInitStream: nil, + OnWriterClose: nil, + OnWriterCompressMessages: nil, + OnWriterSendMessages: nil, + OnWriterReadUnknownGrpcMessage: nil, }, ), ) diff --git a/examples/topic/topicwriter/topicwriter.go b/examples/topic/topicwriter/topicwriter.go index a378f8a90..ca864c573 100644 --- a/examples/topic/topicwriter/topicwriter.go +++ b/examples/topic/topicwriter/topicwriter.go @@ -3,6 +3,7 @@ package topicwriter import ( "bytes" "context" + "time" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" @@ -30,15 +31,15 @@ func ConnectSelectCodec(ctx context.Context, db *ydb.Driver) *topicwriter.Writer func SendMessagesOneByOne(ctx context.Context, w *topicwriter.Writer) { data := []byte{1, 2, 3} - mess := topicwriter.Message{Data: bytes.NewReader(data)} + mess := topicwriter.Message{Data: bytes.NewReader(data), SeqNo: 0, CreatedAt: time.Time{}, Metadata: nil} _ = w.Write(ctx, mess) } func SendGroupOfMessages(ctx context.Context, w *topicwriter.Writer) { data1 := []byte{1, 2, 3} data2 := []byte{4, 5, 6} - mess1 := topicwriter.Message{Data: bytes.NewReader(data1)} - mess2 := topicwriter.Message{Data: bytes.NewReader(data2)} + mess1 := topicwriter.Message{Data: bytes.NewReader(data1), SeqNo: 0, CreatedAt: time.Time{}, Metadata: nil} + mess2 := topicwriter.Message{Data: bytes.NewReader(data2), SeqNo: 0, CreatedAt: time.Time{}, Metadata: nil} _ = w.Write(ctx, mess1, mess2) }