Skip to content

Commit

Permalink
enhance: print schema info in show channel-watched
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Aug 23, 2024
1 parent 6692403 commit 0110335
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 2 deletions.
36 changes: 35 additions & 1 deletion models/channel_watch.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package models

import (
"github.com/samber/lo"

"github.com/milvus-io/birdwatcher/proto/v2.2/schemapb"
)

type ChannelWatch struct {
Vchan VChannelInfo
StartTs int64
State ChannelWatchState
TimeoutTs int64

// key
key string
key string
Schema CollectionSchema
}

func (c *ChannelWatch) Key() string {
Expand Down Expand Up @@ -49,6 +56,33 @@ func GetChannelWatchInfo[ChannelWatchBase interface {
}
}

func GetChannelWatchInfoV2[ChannelWatchBase interface {
GetVchan() vchan
GetStartTs() int64
GetState() watchState
GetTimeoutTs() int64
GetSchema() *schemapb.CollectionSchema
}, watchState ~int32, vchan interface {
vchannelInfoBase
GetSeekPosition() pos
}, pos msgPosBase](info ChannelWatchBase, key string) *ChannelWatch {
schema := newSchemaFromBase(info.GetSchema())
schema.Fields = lo.Map(info.GetSchema().GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) FieldSchema {
fs := NewFieldSchemaFromBase[*schemapb.FieldSchema, schemapb.DataType](fieldSchema)
fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams())
return fs
})

return &ChannelWatch{
Vchan: getVChannelInfo[vchan, pos](info.GetVchan()),
StartTs: info.GetStartTs(),
State: ChannelWatchState(info.GetState()),
TimeoutTs: info.GetTimeoutTs(),
key: key,
Schema: schema,
}
}

func getVChannelInfo[info interface {
vchannelInfoBase
GetSeekPosition() pos
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/common/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func ListChannelWatch(ctx context.Context, cli clientv3.KV, basePath string, ver
return nil, err
}
result = lo.Map(infos, func(info datapbv2.ChannelWatchInfo, idx int) *models.ChannelWatch {
return models.GetChannelWatchInfo[*datapbv2.ChannelWatchInfo, datapbv2.ChannelWatchState, *datapbv2.VchannelInfo, *msgpbv2.MsgPosition](&info, paths[idx])
return models.GetChannelWatchInfoV2[*datapbv2.ChannelWatchInfo, datapbv2.ChannelWatchState, *datapbv2.VchannelInfo, *msgpbv2.MsgPosition](&info, paths[idx])
})
default:
return nil, errors.New("version not supported")
Expand Down
32 changes: 32 additions & 0 deletions states/etcd/show/channel_watched.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package show
import (
"context"
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -70,4 +71,35 @@ func (rs *ChannelsWatched) printChannelWatchInfo(sb *strings.Builder, info *mode
fmt.Fprintf(sb, "Unflushed segments: %v\n", info.Vchan.UnflushedSegmentIds)
fmt.Fprintf(sb, "Flushed segments: %v\n", info.Vchan.FlushedSegmentIds)
fmt.Fprintf(sb, "Dropped segments: %v\n", info.Vchan.DroppedSegmentIds)

fmt.Fprintf(sb, "Fields:\n")
fields := info.Schema.Fields
sort.Slice(fields, func(i, j int) bool {
return fields[i].FieldID < fields[j].FieldID
})
for _, field := range fields {
fmt.Fprintf(sb, " - Field ID: %d \t Field Name: %s \t Field Type: %s\n", field.FieldID, field.Name, field.DataType.String())
if field.IsPrimaryKey {
fmt.Fprintf(sb, "\t - Primary Key: %t, AutoID: %t\n", field.IsPrimaryKey, field.AutoID)
}
if field.IsDynamic {
fmt.Fprintf(sb, "\t - Dynamic Field\n")
}
if field.IsPartitionKey {
fmt.Fprintf(sb, "\t - Partition Key\n")
}
if field.IsClusteringKey {
fmt.Fprintf(sb, "\t - Clustering Key\n")
}
// print element type if field is array
if field.DataType == models.DataTypeArray {
fmt.Fprintf(sb, "\t - Element Type: %s\n", field.ElementType.String())
}
// type params
for key, value := range field.Properties {
fmt.Fprintf(sb, "\t - Type Param %s: %s\n", key, value)
}
}

fmt.Fprintf(sb, "Enable Dynamic Schema: %t\n", info.Schema.EnableDynamicSchema)
}

0 comments on commit 0110335

Please sign in to comment.