diff --git a/internal/grpcwrapper/rawoptional/rawoptional.go b/internal/grpcwrapper/rawoptional/rawoptional.go index e4aec2c4a..0c9e99906 100644 --- a/internal/grpcwrapper/rawoptional/rawoptional.go +++ b/internal/grpcwrapper/rawoptional/rawoptional.go @@ -35,6 +35,26 @@ func (v *Duration) ToProto() *durationpb.Duration { return nil } +func (v *Duration) MustFromProto(proto *durationpb.Duration) { + if proto == nil { + v.Value = time.Duration(0) + v.HasValue = false + + return + } + + v.HasValue = true + v.Value = proto.AsDuration() +} + +func (v *Duration) ToDuration() *time.Duration { + if v.HasValue { + return nil + } + + return &v.Value +} + type Int64 struct { Value int64 HasValue bool @@ -74,3 +94,11 @@ func (v *Time) MustFromProto(proto *timestamppb.Timestamp) { v.HasValue = true v.Value = proto.AsTime() } + +func (v *Time) ToTime() *time.Time { + if v.HasValue { + return nil + } + + return &v.Value +} diff --git a/internal/grpcwrapper/rawtopic/client.go b/internal/grpcwrapper/rawtopic/client.go index 5d73badd4..77fa4001e 100644 --- a/internal/grpcwrapper/rawtopic/client.go +++ b/internal/grpcwrapper/rawtopic/client.go @@ -46,9 +46,27 @@ func (c *Client) CreateTopic( func (c *Client) DescribeTopic(ctx context.Context, req DescribeTopicRequest) (res DescribeTopicResult, err error) { resp, err := c.service.DescribeTopic(ctx, req.ToProto()) if err != nil { - return DescribeTopicResult{}, xerrors.WithStackTrace(xerrors.Wrap( - fmt.Errorf("ydb: describe topic grpc failed: %w", err), - )) + return DescribeTopicResult{}, xerrors.WithStackTrace( + xerrors.Wrap( + fmt.Errorf("ydb: describe topic grpc failed: %w", err), + ), + ) + } + err = res.FromProto(resp) + + return res, err +} + +func (c *Client) DescribeConsumer(ctx context.Context, req DescribeConsumerRequest) ( + res DescribeConsumerResult, err error, +) { + resp, err := c.service.DescribeConsumer(ctx, req.ToProto()) + if err != nil { + return DescribeConsumerResult{}, xerrors.WithStackTrace( + xerrors.Wrap( + fmt.Errorf("ydb: describe topic consumer grpc failed: %w", err), + ), + ) } err = res.FromProto(resp) diff --git a/internal/grpcwrapper/rawtopic/describe_consumer.go b/internal/grpcwrapper/rawtopic/describe_consumer.go new file mode 100644 index 000000000..226bfb680 --- /dev/null +++ b/internal/grpcwrapper/rawtopic/describe_consumer.go @@ -0,0 +1,151 @@ +package rawtopic + +import ( + "fmt" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/clone" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawoptional" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawscheme" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" +) + +type DescribeConsumerRequest struct { + OperationParams rawydb.OperationParams + Path string + Consumer string + IncludeStats bool +} + +func (req *DescribeConsumerRequest) ToProto() *Ydb_Topic.DescribeConsumerRequest { + return &Ydb_Topic.DescribeConsumerRequest{ + OperationParams: req.OperationParams.ToProto(), + Path: req.Path, + Consumer: req.Consumer, + IncludeStats: req.IncludeStats, + } +} + +type DescribeConsumerResult struct { + Operation rawydb.Operation + Self rawscheme.Entry + Consumer Consumer + Partitions []DescribeConsumerResultPartitionInfo +} + +func (res *DescribeConsumerResult) FromProto(response operation.Response) error { + if err := res.Operation.FromProtoWithStatusCheck(response.GetOperation()); err != nil { + return err + } + protoResult := &Ydb_Topic.DescribeConsumerResult{} + if err := response.GetOperation().GetResult().UnmarshalTo(protoResult); err != nil { + return xerrors.WithStackTrace( + fmt.Errorf( + "ydb: describe consumer result failed on unmarshal grpc result: %w", err, + ), + ) + } + + if err := res.Self.FromProto(protoResult.GetSelf()); err != nil { + return err + } + + res.Consumer.MustFromProto(protoResult.GetConsumer()) + + protoPartitions := protoResult.GetPartitions() + res.Partitions = make([]DescribeConsumerResultPartitionInfo, len(protoPartitions)) + for i, protoPartition := range protoPartitions { + if err := res.Partitions[i].FromProto(protoPartition); err != nil { + return err + } + } + + return nil +} + +type MultipleWindowsStat struct { + PerMinute int64 + PerHour int64 + PerDay int64 +} + +func (stat *MultipleWindowsStat) MustFromProto(proto *Ydb_Topic.MultipleWindowsStat) { + stat.PerMinute = proto.GetPerMinute() + stat.PerHour = proto.GetPerHour() + stat.PerDay = proto.GetPerDay() +} + +type PartitionStats struct { + PartitionsOffset rawtopiccommon.OffsetRange + StoreSizeBytes int64 + LastWriteTime rawoptional.Time + MaxWriteTimeLag rawoptional.Duration + BytesWritten MultipleWindowsStat +} + +func (ps *PartitionStats) FromProto(proto *Ydb_Topic.PartitionStats) error { + if proto == nil { + return nil + } + if err := ps.PartitionsOffset.FromProto(proto.GetPartitionOffsets()); err != nil { + return err + } + ps.StoreSizeBytes = proto.GetStoreSizeBytes() + ps.LastWriteTime.MustFromProto(proto.GetLastWriteTime()) + ps.MaxWriteTimeLag.ToProto() + ps.BytesWritten.MustFromProto(proto.GetBytesWritten()) + + return nil +} + +type PartitionConsumerStats struct { + LastReadOffset int64 + CommittedOffset int64 + ReadSessionID string + PartitionReadSessionCreateTime rawoptional.Time + LastReadTime rawoptional.Time + MaxReadTimeLag rawoptional.Duration + MaxWriteTimeLag rawoptional.Duration + BytesRead MultipleWindowsStat + ReaderName string +} + +func (stats *PartitionConsumerStats) FromProto(proto *Ydb_Topic.DescribeConsumerResult_PartitionConsumerStats) error { + if proto == nil { + return nil + } + stats.LastReadOffset = proto.GetLastReadOffset() + stats.CommittedOffset = proto.GetCommittedOffset() + stats.ReadSessionID = proto.GetReadSessionId() + stats.PartitionReadSessionCreateTime.MustFromProto(proto.GetPartitionReadSessionCreateTime()) + stats.LastReadTime.MustFromProto(proto.GetLastReadTime()) + stats.MaxReadTimeLag.MustFromProto(proto.GetMaxReadTimeLag()) + stats.MaxWriteTimeLag.MustFromProto(proto.GetMaxWriteTimeLag()) + stats.BytesRead.MustFromProto(proto.GetBytesRead()) + stats.ReaderName = proto.GetReaderName() + + return nil +} + +type DescribeConsumerResultPartitionInfo struct { + PartitionID int64 + Active bool + ChildPartitionIDs []int64 + ParentPartitionIDs []int64 + PartitionStats PartitionStats + PartitionConsumerStats PartitionConsumerStats +} + +func (pi *DescribeConsumerResultPartitionInfo) FromProto(proto *Ydb_Topic.DescribeConsumerResult_PartitionInfo) error { + pi.PartitionID = proto.GetPartitionId() + pi.Active = proto.GetActive() + + pi.ChildPartitionIDs = clone.Int64Slice(proto.GetChildPartitionIds()) + pi.ParentPartitionIDs = clone.Int64Slice(proto.GetParentPartitionIds()) + + return pi.PartitionStats.FromProto(proto.GetPartitionStats()) +} diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 729e25c93..60050c26e 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -181,6 +181,54 @@ func (c *Client) Describe( return res, nil } +// Describe topic consumer +func (c *Client) DescribeTopicConsumer( + ctx context.Context, + path string, + consumer string, + opts ...topicoptions.DescribeConsumerOption, +) (res topictypes.TopicConsumerDescription, _ error) { + req := rawtopic.DescribeConsumerRequest{ + OperationParams: c.defaultOperationParams, + Path: path, + Consumer: consumer, + } + + for _, opt := range opts { + if opt != nil { + opt(&req) + } + } + + var rawRes rawtopic.DescribeConsumerResult + + call := func(ctx context.Context) (describeErr error) { + rawRes, describeErr = c.rawClient.DescribeConsumer(ctx, req) + + return describeErr + } + + var err error + + if c.cfg.AutoRetry() { + err = retry.Retry(ctx, call, + retry.WithIdempotent(true), + retry.WithTrace(c.cfg.TraceRetry()), + retry.WithBudget(c.cfg.RetryBudget()), + ) + } else { + err = call(ctx) + } + + if err != nil { + return res, err + } + + res.FromRaw(&rawRes) + + return res, nil +} + // Drop topic func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.DropOption) error { req := rawtopic.DropTopicRequest{} diff --git a/tests/integration/topic_client_test.go b/tests/integration/topic_client_test.go index 81b94eee9..a4f5851a7 100644 --- a/tests/integration/topic_client_test.go +++ b/tests/integration/topic_client_test.go @@ -91,7 +91,7 @@ func TestTopicDescribe(t *testing.T) { ) require.NoError(t, err) - res, err := db.Topic().Describe(ctx, topicName) + topicDesc, err := db.Topic().Describe(ctx, topicName) require.NoError(t, err) expected := topictypes.TopicDescription{ @@ -131,13 +131,128 @@ func TestTopicDescribe(t *testing.T) { *subset = nil } - requireAndCleanSubset(&res.Attributes, &expected.Attributes) + requireAndCleanSubset(&topicDesc.Attributes, &expected.Attributes) for i := range expected.Consumers { - requireAndCleanSubset(&res.Consumers[i].Attributes, &expected.Consumers[i].Attributes) + requireAndCleanSubset(&topicDesc.Consumers[i].Attributes, &expected.Consumers[i].Attributes) } - require.Equal(t, expected, res) + require.Equal(t, expected, topicDesc) +} + +func TestDescribeTopicConsumer(t *testing.T) { + ctx := xtest.Context(t) + db := connect(t) + topicName := "test-topic-" + t.Name() + setImportant := true + if os.Getenv("YDB_VERSION") != "nightly" && version.Lt(os.Getenv("YDB_VERSION"), "24.1.1") { + setImportant = false + } + var ( + supportedCodecs = []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip} + minActivePartitions = int64(2) + consumers = []topictypes.Consumer{ + { + Name: "c1", + Important: setImportant, + SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, + ReadFrom: time.Date(2022, 9, 11, 10, 1, 2, 0, time.UTC), + }, + { + Name: "c2", + Important: false, + SupportedCodecs: []topictypes.Codec{}, + ReadFrom: time.Date(2021, 1, 2, 3, 4, 5, 0, time.UTC), + }, + } + ) + + _ = db.Topic().Drop(ctx, topicName) + err := db.Topic().Create(ctx, topicName, + topicoptions.CreateWithSupportedCodecs(supportedCodecs...), + topicoptions.CreateWithMinActivePartitions(minActivePartitions), + topicoptions.CreateWithConsumer(consumers...), + ) + require.NoError(t, err) + + consumer, err := db.Topic().DescribeTopicConsumer(ctx, topicName, "c1", topicoptions.IncludeConsumerStats()) + require.NoError(t, err) + + zeroTime := time.Time{} + zeroDuration := time.Duration(0) + expectedConsumerDesc := topictypes.TopicConsumerDescription{ + Path: path.Join(topicName, "c1"), + Consumer: topictypes.Consumer{ + Name: "c1", + Important: true, + SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, + ReadFrom: time.Date(2022, 9, 11, 10, 1, 2, 0, time.UTC), + Attributes: map[string]string{ + "_service_type": "data-streams", + }, + }, + Partitions: []topictypes.DescribeConsumerPartitionInfo{ + { + PartitionID: 0, + Active: true, + PartitionStats: topictypes.PartitionStats{ + PartitionsOffset: topictypes.OffsetRange{}, + StoreSizeBytes: 0, + LastWriteTime: nil, + MaxWriteTimeLag: &zeroDuration, + BytesWritten: topictypes.MultipleWindowsStat{}, + }, + PartitionConsumerStats: topictypes.PartitionConsumerStats{ + LastReadOffset: 0, + CommittedOffset: 0, + ReadSessionID: "", + PartitionReadSessionCreateTime: &zeroTime, + LastReadTime: &zeroTime, + MaxReadTimeLag: &zeroDuration, + MaxWriteTimeLag: &zeroDuration, + BytesRead: topictypes.MultipleWindowsStat{}, + ReaderName: "", + }, + }, + { + PartitionID: 1, + Active: true, + PartitionStats: topictypes.PartitionStats{ + PartitionsOffset: topictypes.OffsetRange{}, + StoreSizeBytes: 0, + LastWriteTime: nil, + MaxWriteTimeLag: &zeroDuration, + BytesWritten: topictypes.MultipleWindowsStat{}, + }, + PartitionConsumerStats: topictypes.PartitionConsumerStats{ + LastReadOffset: 0, + CommittedOffset: 0, + ReadSessionID: "", + PartitionReadSessionCreateTime: &zeroTime, + LastReadTime: &zeroTime, + MaxReadTimeLag: &zeroDuration, + MaxWriteTimeLag: &zeroDuration, + BytesRead: topictypes.MultipleWindowsStat{}, + ReaderName: "", + }, + }, + }, + } + + requireAndCleanSubset := func(checked *map[string]string, subset *map[string]string) { + t.Helper() + for k, subValue := range *subset { + checkedValue, ok := (*checked)[k] + require.True(t, ok, k) + require.Equal(t, subValue, checkedValue) + } + *checked = nil + *subset = nil + } + + requireAndCleanSubset(&consumer.Consumer.Attributes, &expectedConsumerDesc.Consumer.Attributes) + + require.Equal(t, expectedConsumerDesc, consumer) } func TestSchemeList(t *testing.T) { diff --git a/topic/client.go b/topic/client.go index 09b08579a..8941ce484 100644 --- a/topic/client.go +++ b/topic/client.go @@ -23,6 +23,11 @@ type Client interface { // Describe topic Describe(ctx context.Context, path string, opts ...topicoptions.DescribeOption) (topictypes.TopicDescription, error) + // Describe topic consumer + DescribeTopicConsumer( + ctx context.Context, path string, consumer string, opts ...topicoptions.DescribeConsumerOption, + ) (topictypes.TopicConsumerDescription, error) + // Drop topic Drop(ctx context.Context, path string, opts ...topicoptions.DropOption) error diff --git a/topic/example_test.go b/topic/example_test.go index 9df1fab42..5be57fe89 100644 --- a/topic/example_test.go +++ b/topic/example_test.go @@ -84,13 +84,38 @@ func Example_describeTopic() { descResult, err := db.Topic().Describe(ctx, "topic-path") if err != nil { - log.Printf("failed drop topic: %v", err) + log.Printf("failed describe topic: %v", err) return } fmt.Printf("describe: %#v\n", descResult) } +func Example_desrcibeTopicConsumer() { + ctx := context.TODO() + connectionString := os.Getenv("YDB_CONNECTION_STRING") + if connectionString == "" { + connectionString = "grpc://localhost:2136/local" + } + db, err := ydb.Open( + ctx, connectionString, + ) + if err != nil { + log.Printf("failed connect: %v", err) + + return + } + defer db.Close(ctx) // cleanup resources + + descResult, err := db.Topic().DescribeTopicConsumer(ctx, "topic-path", "new-consumer") + if err != nil { + log.Printf("failed describe topic consumer: %v", err) + + return + } + fmt.Printf("describe consumer: %#v\n", descResult) +} + func Example_dropTopic() { ctx := context.TODO() connectionString := os.Getenv("YDB_CONNECTION_STRING") diff --git a/topic/topicoptions/topicoptions_describe.go b/topic/topicoptions/topicoptions_describe.go index 322c2631f..41004bf36 100644 --- a/topic/topicoptions/topicoptions_describe.go +++ b/topic/topicoptions/topicoptions_describe.go @@ -4,3 +4,12 @@ import "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" // DescribeOption type for options of describe method. Not used now. type DescribeOption func(req *rawtopic.DescribeTopicRequest) + +// DescribeConsumerOption type for options of describe consumer method. +type DescribeConsumerOption func(req *rawtopic.DescribeConsumerRequest) + +func IncludeConsumerStats() DescribeConsumerOption { + return func(req *rawtopic.DescribeConsumerRequest) { + req.IncludeStats = true + } +} diff --git a/topic/topictypes/topictypes.go b/topic/topictypes/topictypes.go index 8b66a48cc..38b907acc 100644 --- a/topic/topictypes/topictypes.go +++ b/topic/topictypes/topictypes.go @@ -6,6 +6,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/clone" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topiclistenerinternal" ) // Codec code for use in topics @@ -178,3 +179,94 @@ func (p *PartitionInfo) FromRaw(raw *rawtopic.PartitionInfo) { p.ChildPartitionIDs = clone.Int64Slice(raw.ChildPartitionIDs) p.ParentPartitionIDs = clone.Int64Slice(raw.ParentPartitionIDs) } + +type MultipleWindowsStat struct { + PerMinute int64 + PerHour int64 + PerDay int64 +} + +func (m *MultipleWindowsStat) FromRaw(raw *rawtopic.MultipleWindowsStat) { + if raw != nil { + m.PerMinute = raw.PerMinute + m.PerHour = raw.PerHour + m.PerDay = raw.PerDay + } +} + +type OffsetRange topiclistenerinternal.PublicOffsetsRange + +type PartitionStats struct { + PartitionsOffset OffsetRange + StoreSizeBytes int64 + LastWriteTime *time.Time + MaxWriteTimeLag *time.Duration + BytesWritten MultipleWindowsStat +} + +func (p *PartitionStats) FromRaw(raw *rawtopic.PartitionStats) { + p.PartitionsOffset.Start = raw.PartitionsOffset.Start.ToInt64() + p.PartitionsOffset.End = raw.PartitionsOffset.End.ToInt64() + p.StoreSizeBytes = raw.StoreSizeBytes + p.LastWriteTime = raw.LastWriteTime.ToTime() + p.MaxWriteTimeLag = raw.MaxWriteTimeLag.ToDuration() + p.BytesWritten.FromRaw(&raw.BytesWritten) +} + +type PartitionConsumerStats struct { + LastReadOffset int64 + CommittedOffset int64 + ReadSessionID string + PartitionReadSessionCreateTime *time.Time + LastReadTime *time.Time + MaxReadTimeLag *time.Duration + MaxWriteTimeLag *time.Duration + BytesRead MultipleWindowsStat + ReaderName string +} + +func (s *PartitionConsumerStats) FromRaw(raw *rawtopic.PartitionConsumerStats) { + s.LastReadOffset = raw.LastReadOffset + s.CommittedOffset = raw.CommittedOffset + s.ReadSessionID = raw.ReadSessionID + s.PartitionReadSessionCreateTime = raw.PartitionReadSessionCreateTime.ToTime() + s.LastReadTime = raw.LastReadTime.ToTime() + s.MaxReadTimeLag = raw.MaxReadTimeLag.ToDuration() + s.MaxWriteTimeLag = raw.MaxWriteTimeLag.ToDuration() + s.BytesRead.FromRaw(&raw.BytesRead) + s.ReaderName = raw.ReaderName +} + +type DescribeConsumerPartitionInfo struct { + PartitionID int64 + Active bool + ChildPartitionIDs []int64 + ParentPartitionIDs []int64 + PartitionStats PartitionStats + PartitionConsumerStats PartitionConsumerStats +} + +func (p *DescribeConsumerPartitionInfo) FromRaw(raw *rawtopic.DescribeConsumerResultPartitionInfo) { + p.PartitionID = raw.PartitionID + p.Active = raw.Active + p.ChildPartitionIDs = clone.Int64Slice(raw.ChildPartitionIDs) + p.ParentPartitionIDs = clone.Int64Slice(raw.ParentPartitionIDs) + p.PartitionStats.FromRaw(&raw.PartitionStats) + p.PartitionConsumerStats.FromRaw(&raw.PartitionConsumerStats) +} + +type TopicConsumerDescription struct { + Path string + Consumer Consumer + Partitions []DescribeConsumerPartitionInfo +} + +func (d *TopicConsumerDescription) FromRaw(raw *rawtopic.DescribeConsumerResult) { + d.Path = raw.Self.Name + d.Consumer.FromRaw(&raw.Consumer) + + d.Partitions = make([]DescribeConsumerPartitionInfo, len(raw.Partitions)) + for i := range raw.Partitions { + d.Partitions[i].FromRaw(&raw.Partitions[i]) + } +} diff --git a/topic/topictypes/topictypes_test.go b/topic/topictypes/topictypes_test.go index 91489333e..606e07610 100644 --- a/topic/topictypes/topictypes_test.go +++ b/topic/topictypes/topictypes_test.go @@ -153,12 +153,184 @@ func TestTopicDescriptionFromRaw(t *testing.T) { for _, v := range testData { v := v - t.Run(v.testName, func(t *testing.T) { - d := TopicDescription{} - d.FromRaw(v.rawTopicDescription) - if !reflect.DeepEqual(d, v.expectedDescription) { - t.Errorf("got\n%+v\nexpected\n %+v", d, v.expectedDescription) - } - }) + t.Run( + v.testName, func(t *testing.T) { + d := TopicDescription{} + d.FromRaw(v.rawTopicDescription) + if !reflect.DeepEqual(d, v.expectedDescription) { + t.Errorf("got\n%+v\nexpected\n %+v", d, v.expectedDescription) + } + }, + ) + } +} + +func TestTopicConsumerDescriptionFromRaw(t *testing.T) { + fourPM := time.Date(2024, 0o1, 0o1, 16, 0, 0, 0, time.UTC) + hour := time.Hour + testData := []struct { + testName string + expectedDescription TopicConsumerDescription + rawConsumerDescription *rawtopic.DescribeConsumerResult + }{ + { + testName: "empty topic consumer description", + expectedDescription: TopicConsumerDescription{ + Path: "", + Consumer: Consumer{ + SupportedCodecs: make([]Codec, 0), + }, + Partitions: make([]DescribeConsumerPartitionInfo, 0), + }, + rawConsumerDescription: &rawtopic.DescribeConsumerResult{}, + }, + { + testName: "all fields set", + expectedDescription: TopicConsumerDescription{ + Path: "topic/consumer", + Consumer: Consumer{ + Attributes: map[string]string{ + "privet": "mir", + "hello": "world", + }, + Name: "c1", + Important: true, + SupportedCodecs: []Codec{ + CodecRaw, + }, + ReadFrom: time.Date(2022, time.March, 8, 12, 12, 12, 0, time.UTC), + }, + Partitions: []DescribeConsumerPartitionInfo{ + { + PartitionID: 42, + Active: true, + ChildPartitionIDs: []int64{ + 1, 2, 3, + }, + ParentPartitionIDs: []int64{ + 1, 2, 3, + }, + PartitionStats: PartitionStats{ + PartitionsOffset: OffsetRange{ + Start: 10, + End: 20, + }, + StoreSizeBytes: 1024, + LastWriteTime: &fourPM, + MaxWriteTimeLag: &hour, + BytesWritten: MultipleWindowsStat{ + PerMinute: 1, + PerHour: 60, + PerDay: 1440, + }, + }, + PartitionConsumerStats: PartitionConsumerStats{ + LastReadOffset: 20, + CommittedOffset: 10, + ReadSessionID: "session1", + PartitionReadSessionCreateTime: &fourPM, + LastReadTime: &fourPM, + MaxReadTimeLag: &hour, + MaxWriteTimeLag: &hour, + BytesRead: MultipleWindowsStat{ + PerMinute: 1, + PerHour: 60, + PerDay: 1440, + }, + ReaderName: "reader1", + }, + }, + }, + }, + rawConsumerDescription: &rawtopic.DescribeConsumerResult{ + Self: rawscheme.Entry{ + Name: "topic/consumer", + }, + Consumer: rawtopic.Consumer{ + Name: "c1", + Important: true, + SupportedCodecs: rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw}, + ReadFrom: rawoptional.Time{ + Value: time.Date(2022, time.March, 8, 12, 12, 12, 0, time.UTC), + HasValue: true, + }, + Attributes: map[string]string{ + "privet": "mir", + "hello": "world", + }, + }, + Partitions: []rawtopic.DescribeConsumerResultPartitionInfo{ + { + PartitionID: 42, + Active: true, + ChildPartitionIDs: []int64{ + 1, 2, 3, + }, + ParentPartitionIDs: []int64{ + 1, 2, 3, + }, + PartitionStats: rawtopic.PartitionStats{ + PartitionsOffset: rawtopiccommon.OffsetRange{ + Start: 10, + End: 20, + }, + StoreSizeBytes: 1024, + LastWriteTime: rawoptional.Time{ + Value: fourPM, + HasValue: true, + }, + MaxWriteTimeLag: rawoptional.Duration{ + Value: hour, + HasValue: true, + }, + BytesWritten: rawtopic.MultipleWindowsStat{ + PerMinute: 1, + PerHour: 60, + PerDay: 1440, + }, + }, + PartitionConsumerStats: rawtopic.PartitionConsumerStats{ + LastReadOffset: 20, + CommittedOffset: 10, + ReadSessionID: "session1", + PartitionReadSessionCreateTime: rawoptional.Time{ + Value: fourPM, + HasValue: true, + }, + LastReadTime: rawoptional.Time{ + Value: fourPM, + HasValue: true, + }, + MaxReadTimeLag: rawoptional.Duration{ + Value: hour, + HasValue: true, + }, + MaxWriteTimeLag: rawoptional.Duration{ + Value: hour, + HasValue: true, + }, + BytesRead: rawtopic.MultipleWindowsStat{ + PerMinute: 1, + PerHour: 60, + PerDay: 1440, + }, + ReaderName: "reader1", + }, + }, + }, + }, + }, + } + for _, v := range testData { + v := v + t.Run( + v.testName, func(t *testing.T) { + d := TopicConsumerDescription{} + d.FromRaw(v.rawConsumerDescription) + if !reflect.DeepEqual(d.Consumer, v.expectedDescription.Consumer) { + t.Errorf("got\n%+v\nexpected\n%+v", d, v.expectedDescription) + } + }, + ) } }