From 0110335d179838643038bdd593fee997deabe0c6 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 23 Aug 2024 17:59:07 +0800 Subject: [PATCH] enhance: print schema info in show channel-watched Signed-off-by: Wei Liu --- models/channel_watch.go | 36 ++++++++++++++++++++++++++++- states/etcd/common/channel.go | 2 +- states/etcd/show/channel_watched.go | 32 +++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 2 deletions(-) diff --git a/models/channel_watch.go b/models/channel_watch.go index 5de7d51f..b8ead290 100644 --- a/models/channel_watch.go +++ b/models/channel_watch.go @@ -1,5 +1,11 @@ package models +import ( + "github.com/samber/lo" + + "github.com/milvus-io/birdwatcher/proto/v2.2/schemapb" +) + type ChannelWatch struct { Vchan VChannelInfo StartTs int64 @@ -7,7 +13,8 @@ type ChannelWatch struct { TimeoutTs int64 // key - key string + key string + Schema CollectionSchema } func (c *ChannelWatch) Key() string { @@ -49,6 +56,33 @@ func GetChannelWatchInfo[ChannelWatchBase interface { } } +func GetChannelWatchInfoV2[ChannelWatchBase interface { + GetVchan() vchan + GetStartTs() int64 + GetState() watchState + GetTimeoutTs() int64 + GetSchema() *schemapb.CollectionSchema +}, watchState ~int32, vchan interface { + vchannelInfoBase + GetSeekPosition() pos +}, pos msgPosBase](info ChannelWatchBase, key string) *ChannelWatch { + schema := newSchemaFromBase(info.GetSchema()) + schema.Fields = lo.Map(info.GetSchema().GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) FieldSchema { + fs := NewFieldSchemaFromBase[*schemapb.FieldSchema, schemapb.DataType](fieldSchema) + fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams()) + return fs + }) + + return &ChannelWatch{ + Vchan: getVChannelInfo[vchan, pos](info.GetVchan()), + StartTs: info.GetStartTs(), + State: ChannelWatchState(info.GetState()), + TimeoutTs: info.GetTimeoutTs(), + key: key, + Schema: schema, + } +} + func getVChannelInfo[info interface { vchannelInfoBase GetSeekPosition() pos diff --git a/states/etcd/common/channel.go b/states/etcd/common/channel.go index 3f765d96..efd812fe 100644 --- a/states/etcd/common/channel.go +++ b/states/etcd/common/channel.go @@ -66,7 +66,7 @@ func ListChannelWatch(ctx context.Context, cli clientv3.KV, basePath string, ver return nil, err } result = lo.Map(infos, func(info datapbv2.ChannelWatchInfo, idx int) *models.ChannelWatch { - return models.GetChannelWatchInfo[*datapbv2.ChannelWatchInfo, datapbv2.ChannelWatchState, *datapbv2.VchannelInfo, *msgpbv2.MsgPosition](&info, paths[idx]) + return models.GetChannelWatchInfoV2[*datapbv2.ChannelWatchInfo, datapbv2.ChannelWatchState, *datapbv2.VchannelInfo, *msgpbv2.MsgPosition](&info, paths[idx]) }) default: return nil, errors.New("version not supported") diff --git a/states/etcd/show/channel_watched.go b/states/etcd/show/channel_watched.go index f5934ddf..021ae6f6 100644 --- a/states/etcd/show/channel_watched.go +++ b/states/etcd/show/channel_watched.go @@ -3,6 +3,7 @@ package show import ( "context" "fmt" + "sort" "strings" "time" @@ -70,4 +71,35 @@ func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *mode fmt.Fprintf(sb, "Unflushed segments: %v\n", info.Vchan.UnflushedSegmentIds) fmt.Fprintf(sb, "Flushed segments: %v\n", info.Vchan.FlushedSegmentIds) fmt.Fprintf(sb, "Dropped segments: %v\n", info.Vchan.DroppedSegmentIds) + + fmt.Fprintf(sb, "Fields:\n") + fields := info.Schema.Fields + sort.Slice(fields, func(i, j int) bool { + return fields[i].FieldID < fields[j].FieldID + }) + for _, field := range fields { + fmt.Fprintf(sb, " - Field ID: %d \t Field Name: %s \t Field Type: %s\n", field.FieldID, field.Name, field.DataType.String()) + if field.IsPrimaryKey { + fmt.Fprintf(sb, "\t - Primary Key: %t, AutoID: %t\n", field.IsPrimaryKey, field.AutoID) + } + if field.IsDynamic { + fmt.Fprintf(sb, "\t - Dynamic Field\n") + } + if field.IsPartitionKey { + fmt.Fprintf(sb, "\t - Partition Key\n") + } + if field.IsClusteringKey { + fmt.Fprintf(sb, "\t - Clustering Key\n") + } + // print element type if field is array + if field.DataType == models.DataTypeArray { + fmt.Fprintf(sb, "\t - Element Type: %s\n", field.ElementType.String()) + } + // type params + for key, value := range field.Properties { + fmt.Fprintf(sb, "\t - Type Param %s: %s\n", key, value) + } + } + + fmt.Fprintf(sb, "Enable Dynamic Schema: %t\n", info.Schema.EnableDynamicSchema) }