Skip to content

Commit

Permalink
add explicit initialization of fields of structures in examples/topic
Browse files Browse the repository at this point in the history
  • Loading branch information
kozyrev-m committed Apr 24, 2024
1 parent 90ca7de commit 01ea3c0
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 14 deletions.
1 change: 1 addition & 0 deletions examples/topic/cdc-cache-bus-freeseats/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type balancer struct {
func newBalancer(handlers ...http.Handler) *balancer {
return &balancer{
handlers: handlers,
counter: 0,
}
}

Expand Down
6 changes: 4 additions & 2 deletions examples/topic/cdc-cache-bus-freeseats/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ type Cache struct {

func NewCache(timeout time.Duration) *Cache {
return &Cache{
timeout: timeout,
values: make(map[string]CacheItem),
timeout: timeout,
values: make(map[string]CacheItem),
m: sync.Mutex{},
setCounter: 0,
}
}

Expand Down
6 changes: 5 additions & 1 deletion examples/topic/cdc-cache-bus-freeseats/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ UPSERT INTO bus (id, freeSeats) VALUES ("bus1", 40), ("bus2", 60);
func createCosumers(ctx context.Context, db *ydb.Driver, consumersCount int) error {
for i := 0; i < consumersCount; i++ {
err := db.Topic().Alter(ctx, "bus/updates", topicoptions.AlterWithAddConsumers(topictypes.Consumer{
Name: consumerName(i),
Name: consumerName(i),
Important: false,
SupportedCodecs: nil,
ReadFrom: time.Time{},
Attributes: nil,
}))
if err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions examples/topic/cdc-cache-bus-freeseats/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ type server struct {

func newServer(id int, db *ydb.Driver, cacheTimeout time.Duration, useCDC bool) *server {
res := &server{
cache: NewCache(cacheTimeout),
db: db,
id: id,
cache: NewCache(cacheTimeout),
db: db,
id: id,
mux: http.ServeMux{},
dbCounter: 0,
}

res.mux.HandleFunc("/", res.IndexPageHandler)
Expand Down
6 changes: 4 additions & 2 deletions examples/topic/cdc-cache-bus-freeseats/webserver_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ func (s *server) cdcLoop() {
consumer := consumerName(s.id)
reader, err := s.db.Topic().StartReader(consumer, topicoptions.ReadSelectors{
{
Path: "bus/updates",
ReadFrom: time.Now(),
Path: "bus/updates",
ReadFrom: time.Now(),
Partitions: nil,
MaxTimeLag: time.Duration(0),
},
},
)
Expand Down
6 changes: 5 additions & 1 deletion examples/topic/cdc-fill-and-read/cdc-reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"time"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
Expand All @@ -14,7 +15,10 @@ func cdcRead(ctx context.Context, db *ydb.Driver, consumerName, topicPath string
// Connect to changefeed

log.Println("Start cdc read")
reader, err := db.Topic().StartReader(consumerName, []topicoptions.ReadSelector{{Path: topicPath}})
reader, err := db.Topic().StartReader(
consumerName,
[]topicoptions.ReadSelector{{Path: topicPath, Partitions: nil, ReadFrom: time.Time{}, MaxTimeLag: time.Duration(0)}},
)
if err != nil {
log.Fatal("failed to start read feed", err)
}
Expand Down
6 changes: 5 additions & 1 deletion examples/topic/cdc-fill-and-read/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ func prepareTableWithCDC(ctx context.Context, db *ydb.Driver, prefix, tableName,

log.Println("Create consumer")
err = db.Topic().Alter(ctx, topicPath, topicoptions.AlterWithAddConsumers(topictypes.Consumer{
Name: consumerName,
Name: consumerName,
Important: false,
SupportedCodecs: nil,
ReadFrom: time.Time{},
Attributes: nil,
}))
if err != nil {
panic(fmt.Errorf("failed to create feed consumer: %w", err))
Expand Down
2 changes: 1 addition & 1 deletion examples/topic/topicreader/topicreader_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func UnmarshalMessageContentToJSONStruct(msg *topicreader.Message) {

// UnmarshalMessageContentToProtobufStruct is example for effective way for unmarshal protobuf message content to value
func UnmarshalMessageContentToProtobufStruct(msg *topicreader.Message) {
v := &firestore.BundledDocumentMetadata{} // protobuf type
v := new(firestore.BundledDocumentMetadata) // protobuf type

_ = topicsugar.ProtoUnmarshal(msg, v)
}
61 changes: 61 additions & 0 deletions examples/topic/topicreader/topicreader_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@ func CommitNotify(ctx context.Context, db *ydb.Driver) {
// called when receive commit notify from server
fmt.Println(info.Topic, info.PartitionID, info.CommittedOffset)
},
OnReaderStart: nil,
OnReaderReconnect: nil,
OnReaderReconnectRequest: nil,
OnReaderPartitionReadStartResponse: nil,
OnReaderPartitionReadStopResponse: nil,
OnReaderCommit: nil,
OnReaderSendCommitMessage: nil,
OnReaderClose: nil,
OnReaderInit: nil,
OnReaderError: nil,
OnReaderUpdateToken: nil,
OnReaderSentDataRequest: nil,
OnReaderReceiveDataResponse: nil,
OnReaderReadMessages: nil,
OnReaderUnknownGrpcMessage: nil,
OnWriterReconnect: nil,
OnWriterInitStream: nil,
OnWriterClose: nil,
OnWriterCompressMessages: nil,
OnWriterSendMessages: nil,
OnWriterReadUnknownGrpcMessage: nil,
},
),
)
Expand Down Expand Up @@ -61,6 +82,26 @@ func ExplicitPartitionStartStopHandler(ctx context.Context, db *ydb.Driver) {

return nil
},
OnReaderStart: nil,
OnReaderReconnect: nil,
OnReaderReconnectRequest: nil,
OnReaderCommit: nil,
OnReaderSendCommitMessage: nil,
OnReaderCommittedNotify: nil,
OnReaderClose: nil,
OnReaderInit: nil,
OnReaderError: nil,
OnReaderUpdateToken: nil,
OnReaderSentDataRequest: nil,
OnReaderReceiveDataResponse: nil,
OnReaderReadMessages: nil,
OnReaderUnknownGrpcMessage: nil,
OnWriterReconnect: nil,
OnWriterInitStream: nil,
OnWriterClose: nil,
OnWriterCompressMessages: nil,
OnWriterSendMessages: nil,
OnWriterReadUnknownGrpcMessage: nil,
},
),
)
Expand Down Expand Up @@ -135,6 +176,26 @@ func PartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db
trace.Topic{
OnReaderPartitionReadStartResponse: onPartitionStart,
OnReaderPartitionReadStopResponse: onPartitionStop,
OnReaderStart: nil,
OnReaderReconnect: nil,
OnReaderReconnectRequest: nil,
OnReaderCommit: nil,
OnReaderSendCommitMessage: nil,
OnReaderCommittedNotify: nil,
OnReaderClose: nil,
OnReaderInit: nil,
OnReaderError: nil,
OnReaderUpdateToken: nil,
OnReaderSentDataRequest: nil,
OnReaderReceiveDataResponse: nil,
OnReaderReadMessages: nil,
OnReaderUnknownGrpcMessage: nil,
OnWriterReconnect: nil,
OnWriterInitStream: nil,
OnWriterClose: nil,
OnWriterCompressMessages: nil,
OnWriterSendMessages: nil,
OnWriterReadUnknownGrpcMessage: nil,
},
),
)
Expand Down
7 changes: 4 additions & 3 deletions examples/topic/topicwriter/topicwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package topicwriter
import (
"bytes"
"context"
"time"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
Expand Down Expand Up @@ -30,15 +31,15 @@ func ConnectSelectCodec(ctx context.Context, db *ydb.Driver) *topicwriter.Writer

func SendMessagesOneByOne(ctx context.Context, w *topicwriter.Writer) {
data := []byte{1, 2, 3}
mess := topicwriter.Message{Data: bytes.NewReader(data)}
mess := topicwriter.Message{Data: bytes.NewReader(data), SeqNo: 0, CreatedAt: time.Time{}, Metadata: nil}
_ = w.Write(ctx, mess)
}

func SendGroupOfMessages(ctx context.Context, w *topicwriter.Writer) {
data1 := []byte{1, 2, 3}
data2 := []byte{4, 5, 6}
mess1 := topicwriter.Message{Data: bytes.NewReader(data1)}
mess2 := topicwriter.Message{Data: bytes.NewReader(data2)}
mess1 := topicwriter.Message{Data: bytes.NewReader(data1), SeqNo: 0, CreatedAt: time.Time{}, Metadata: nil}
mess2 := topicwriter.Message{Data: bytes.NewReader(data2), SeqNo: 0, CreatedAt: time.Time{}, Metadata: nil}

_ = w.Write(ctx, mess1, mess2)
}

0 comments on commit 01ea3c0

Please sign in to comment.