diff --git a/.goreleaser-darwin.yaml b/.goreleaser-darwin.yaml index 2fd4449..a59a698 100644 --- a/.goreleaser-darwin.yaml +++ b/.goreleaser-darwin.yaml @@ -9,6 +9,7 @@ before: builds: - env: - CGO_ENABLED=1 + main: ./cmd/birdwatcher goos: # - linux #- windows diff --git a/.goreleaser-linux.yaml b/.goreleaser-linux.yaml index 3bff850..56f0729 100644 --- a/.goreleaser-linux.yaml +++ b/.goreleaser-linux.yaml @@ -9,6 +9,7 @@ before: builds: - env: - CGO_ENABLED=1 + main: ./cmd/birdwatcher goos: - linux #- windows diff --git a/Makefile b/Makefile index 378c109..5c86390 100644 --- a/Makefile +++ b/Makefile @@ -20,12 +20,12 @@ all: static-check birdwatcher birdwatcher: @echo "Compiling birdwatcher" @mkdir -p bin - @CGO_ENABLED=0 go build -o bin/birdwatcher main.go + @CGO_ENABLED=0 go build -o bin/birdwatcher cmd/birdwatcher/main.go birdwatcher_wkafka: @echo "Compiling birdwatcher with kafka(CGO_ENABLED)" @mkdir -p bin - @CGO_ENABLED=1 go build -o bin/birdwatcher_wkafka -tags WKAFKA main.go + @CGO_ENABLED=1 go build -o bin/birdwatcher_wkafka -tags WKAFKA cmd/birdwatcher/main.go getdeps: @mkdir -p $(INSTALL_PATH) diff --git a/cmd/birdwatcher/main.go b/cmd/birdwatcher/main.go new file mode 100644 index 0000000..decf433 --- /dev/null +++ b/cmd/birdwatcher/main.go @@ -0,0 +1,75 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "os/exec" + + _ "github.com/milvus-io/birdwatcher/asap" + "github.com/milvus-io/birdwatcher/bapps" + "github.com/milvus-io/birdwatcher/common" + "github.com/milvus-io/birdwatcher/configs" + "github.com/milvus-io/birdwatcher/states" +) + +var ( + oneLineCommand = flag.String("olc", "", "one line command execution mode") + simple = flag.Bool("simple", false, "use simple ui without suggestion and history") + restServer = flag.Bool("rest", false, "rest server address") + webPort = flag.Int("port", 8002, "listening port for web server") + printVersion = flag.Bool("version", false, "print version") +) + +func main() { + flag.Parse() + + var appFactory func(config *configs.Config) bapps.BApp + + switch { + // Print current birdwatcher version + case *printVersion: + fmt.Println("Birdwatcher Version", common.Version) + return + case *simple: + appFactory = func(*configs.Config) bapps.BApp { return bapps.NewSimpleApp() } + case len(*oneLineCommand) > 0: + appFactory = func(*configs.Config) bapps.BApp { return bapps.NewOlcApp(*oneLineCommand) } + case *restServer: + appFactory = func(config *configs.Config) bapps.BApp { return bapps.NewWebServerApp(*webPort, config) } + default: + defer handleExit() + // open file and create if non-existent + file, err := os.OpenFile("bw_debug.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + log.Fatal(err) + } + defer file.Close() + + logger := log.New(file, "Custom Log", log.LstdFlags) + + appFactory = func(config *configs.Config) bapps.BApp { + return bapps.NewPromptApp(config, bapps.WithLogger(logger)) + } + } + + config, err := configs.NewConfig(".bw_config") + if err != nil { + // run by default, just printing warning. + fmt.Println("[WARN] load config file failed, running in default setting", err.Error()) + } + + start := states.Start(config) + + app := appFactory(config) + app.Run(start) +} + +// handleExit is the fix for go-prompt output hi-jack fix. +func handleExit() { + rawModeOff := exec.Command("/bin/stty", "-raw", "echo") + rawModeOff.Stdin = os.Stdin + _ = rawModeOff.Run() + rawModeOff.Wait() +} diff --git a/states/backup_mock_connect.go b/states/backup_mock_connect.go index 3282e1a..61393d4 100644 --- a/states/backup_mock_connect.go +++ b/states/backup_mock_connect.go @@ -86,7 +86,7 @@ func (s *embedEtcdMockState) SetInstance(instanceName string) { s.SetLabel(fmt.Sprintf("Backup(%s)", instanceName)) s.instanceName = instanceName rootPath := path.Join(instanceName, metaPath) - s.ComponentShow = show.NewComponent(s.client, s.config, rootPath) + s.ComponentShow = show.NewComponent(s.client, s.config, instanceName, metaPath) s.ComponentRemove = remove.NewComponent(s.client, s.config, rootPath) s.ComponentRepair = repair.NewComponent(s.client, s.config, rootPath) s.SetupCommands() @@ -171,7 +171,7 @@ func getEmbedEtcdInstance(server *embed.Etcd, cli kv.MetaKV, instanceName string state := &embedEtcdMockState{ CmdState: framework.NewCmdState(fmt.Sprintf("Backup(%s)", instanceName)), - ComponentShow: show.NewComponent(cli, config, basePath), + ComponentShow: show.NewComponent(cli, config, instanceName, metaPath), ComponentRemove: remove.NewComponent(cli, config, basePath), instanceName: instanceName, server: server, diff --git a/states/etcd/common/bulkinsert.go b/states/etcd/common/bulkinsert.go index a418caf..f372081 100644 --- a/states/etcd/common/bulkinsert.go +++ b/states/etcd/common/bulkinsert.go @@ -24,14 +24,25 @@ func ListImportJobs(ctx context.Context, cli kv.MetaKV, basePath string, filters return nil, nil, err } - return lo.FilterMap(jobs, func(job datapb.ImportJob, idx int) (*datapb.ImportJob, bool) { + resultJobs := make([]*datapb.ImportJob, 0, len(jobs)) + resultKeys := make([]string, 0, len(keys)) + + filterFn := func(job datapb.ImportJob) bool { for _, filter := range filters { if !filter(&job) { - return nil, false + return false } } - return &job, true - }), keys, nil + return true + } + for i, job := range jobs { + if ok := filterFn(job); ok { + resultJobs = append(resultJobs, &jobs[i]) + resultKeys = append(resultKeys, keys[i]) + } + } + + return resultJobs, resultKeys, nil } // ListPreImportTasks list pre-import tasks. diff --git a/states/etcd/common/index.go b/states/etcd/common/index.go index b50d116..f553d89 100644 --- a/states/etcd/common/index.go +++ b/states/etcd/common/index.go @@ -3,7 +3,6 @@ package common import ( "context" "path" - "time" "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" "github.com/milvus-io/birdwatcher/proto/v2.0/indexpb" @@ -18,10 +17,7 @@ func ListIndex(ctx context.Context, cli kv.MetaKV, basePath string, filters ...f } // ListSegmentIndex list segment index info. -func ListSegmentIndex(cli kv.MetaKV, basePath string, filters ...func(segIdx *etcdpb.SegmentIndexInfo) bool) ([]etcdpb.SegmentIndexInfo, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - +func ListSegmentIndex(ctx context.Context, cli kv.MetaKV, basePath string, filters ...func(segIdx *etcdpb.SegmentIndexInfo) bool) ([]etcdpb.SegmentIndexInfo, error) { prefix := path.Join(basePath, "root-coord/segment-index") + "/" result, _, err := ListProtoObjects(ctx, cli, prefix, filters...) return result, err diff --git a/states/etcd/remove/dirty_importing_segment.go b/states/etcd/remove/dirty_importing_segment.go new file mode 100644 index 0000000..d33dcf8 --- /dev/null +++ b/states/etcd/remove/dirty_importing_segment.go @@ -0,0 +1,60 @@ +package remove + +import ( + "context" + "fmt" + + "github.com/samber/lo" + + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" + "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" +) + +type DirtyImportingSegment struct { + framework.ParamBase `use:"remove dirty-importing-segment" desc:"remove dirty importing segments with 0 rows"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"` + Ts int64 `name:"ts" default:"0" desc:"only remove segments with ts less than this value"` + Run bool `name:"run" default:"false" desc:"flag to control actually run or dry"` +} + +// DirtyImportingSegmentCommand returns command to remove +func (c *ComponentRemove) DirtyImportingSegmentCommand(ctx context.Context, p *DirtyImportingSegment) error { + fmt.Println("start to remove dirty importing segment") + segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool { + return (p.CollectionID == 0 || segment.CollectionID == p.CollectionID) + }) + if err != nil { + return err + } + + groups := lo.GroupBy(segments, func(segment *models.Segment) int64 { + return segment.CollectionID + }) + + for collectionID, segments := range groups { + for _, segment := range segments { + if segment.State == models.SegmentStateImporting { + segmentTs := segment.GetDmlPosition().GetTimestamp() + if segmentTs == 0 { + segmentTs = segment.GetStartPosition().GetTimestamp() + } + if segment.NumOfRows == 0 && segmentTs < uint64(p.Ts) { + fmt.Printf("collection %d, segment %d is dirty importing with 0 rows, remove it\n", collectionID, segment.ID) + if p.Run { + err := common.RemoveSegmentByID(ctx, c.client, c.basePath, segment.CollectionID, segment.PartitionID, segment.ID) + if err != nil { + fmt.Printf("failed to remove segment %d, err: %s\n", segment.ID, err.Error()) + } + } + } else { + fmt.Printf("collection %d, segment %d is dirty importing with %d rows, ts=%d, skip it\n", collectionID, segment.ID, segment.NumOfRows, segmentTs) + } + } + } + } + + fmt.Println("finish to remove dirty importing segment") + return nil +} diff --git a/states/etcd/repair/segment.go b/states/etcd/repair/segment.go index 69b87d6..ba52257 100644 --- a/states/etcd/repair/segment.go +++ b/states/etcd/repair/segment.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path" + "time" "github.com/golang/protobuf/proto" "github.com/spf13/cobra" @@ -56,8 +57,11 @@ func SegmentCommand(cli kv.MetaKV, basePath string) *cobra.Command { return } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + // use v1 meta for now - segmentIndexes, err := common.ListSegmentIndex(cli, basePath) + segmentIndexes, err := common.ListSegmentIndex(ctx, cli, basePath) if err != nil { fmt.Println(err.Error()) return diff --git a/states/etcd/show/alias.go b/states/etcd/show/alias.go index beaf69b..589e9f5 100644 --- a/states/etcd/show/alias.go +++ b/states/etcd/show/alias.go @@ -22,7 +22,7 @@ type AliasParam struct { // AliasCommand implements `show alias` command. func (c *ComponentShow) AliasCommand(ctx context.Context, p *AliasParam) (*Aliases, error) { - aliases, err := common.ListAliasVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(a *models.Alias) bool { + aliases, err := common.ListAliasVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(a *models.Alias) bool { return p.DBID == -1 || p.DBID == a.DBID }) if err != nil { diff --git a/states/etcd/show/bulkinsert.go b/states/etcd/show/bulkinsert.go index 760fe0d..15c2905 100644 --- a/states/etcd/show/bulkinsert.go +++ b/states/etcd/show/bulkinsert.go @@ -28,7 +28,7 @@ type ImportJobParam struct { // BulkInsertCommand returns show bulkinsert command. func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam) error { - jobs, _, err := common.ListImportJobs(ctx, c.client, c.basePath, func(job *datapb.ImportJob) bool { + jobs, _, err := common.ListImportJobs(ctx, c.client, c.metaPath, func(job *datapb.ImportJob) bool { return (p.JobID == 0 || job.GetJobID() == p.JobID) && (p.CollectionID == 0 || job.GetCollectionID() == p.CollectionID) && (p.State == "" || strings.EqualFold(job.GetState().String(), p.State)) @@ -51,7 +51,7 @@ func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam fmt.Println("Please specify the job ID (-job={JobID}) to show detailed info.") return nil } - PrintDetailedImportJob(ctx, c.client, c.basePath, job, p.ShowAllFiles) + PrintDetailedImportJob(ctx, c.client, c.metaPath, job, p.ShowAllFiles) } else { PrintSimpleImportJob(job) } diff --git a/states/etcd/show/channel_watched.go b/states/etcd/show/channel_watched.go index c9724be..d964bc6 100644 --- a/states/etcd/show/channel_watched.go +++ b/states/etcd/show/channel_watched.go @@ -27,7 +27,7 @@ type ChannelWatchedParam struct { // ChannelWatchedCommand return show channel-watched commands. func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*framework.PresetResultSet, error) { - infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool { + infos, err := common.ListChannelWatch(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool { return (p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID) && (!p.WithoutSchema || channel.Schema == nil) }) if err != nil { diff --git a/states/etcd/show/checkpoint.go b/states/etcd/show/checkpoint.go index 4bd9465..d6334e4 100644 --- a/states/etcd/show/checkpoint.go +++ b/states/etcd/show/checkpoint.go @@ -23,7 +23,7 @@ type CheckpointParam struct { // CheckpointCommand returns show checkpoint command. func (c *ComponentShow) CheckpointCommand(ctx context.Context, p *CheckpointParam) (*Checkpoints, error) { - coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) + coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID) if err != nil { return nil, errors.Wrap(err, "failed to get collection") } @@ -93,7 +93,7 @@ func (rs *Checkpoints) PrintAs(format framework.Format) string { } func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*models.MsgPosition, error) { - prefix := path.Join(c.basePath, "datacoord-meta", "channel-cp", channelName) + prefix := path.Join(c.metaPath, "datacoord-meta", "channel-cp", channelName) results, _, err := common.ListProtoObjects[internalpb.MsgPosition](ctx, c.client, prefix) if err != nil { return nil, err @@ -108,7 +108,7 @@ func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName st } func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID int64, vchannel string) (*models.MsgPosition, int64, error) { - segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.Segment) bool { + segments, err := common.ListSegmentsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(info *models.Segment) bool { return info.CollectionID == collID && info.InsertChannel == vchannel }) if err != nil { diff --git a/states/etcd/show/collection.go b/states/etcd/show/collection.go index 81b5e50..a582985 100644 --- a/states/etcd/show/collection.go +++ b/states/etcd/show/collection.go @@ -30,12 +30,12 @@ func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionPara // perform get by id to accelerate if p.CollectionID > 0 { var collection *models.Collection - collection, err = common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) + collection, err = common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID) if err == nil { collections = append(collections, collection) } } else { - collections, err = common.ListCollectionsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(coll *models.Collection) bool { + collections, err = common.ListCollectionsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(coll *models.Collection) bool { if p.CollectionName != "" && coll.Schema.Name != p.CollectionName { return false } diff --git a/states/etcd/show/collection_history.go b/states/etcd/show/collection_history.go index 605364c..5f000bd 100644 --- a/states/etcd/show/collection_history.go +++ b/states/etcd/show/collection_history.go @@ -27,7 +27,7 @@ func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *Collect } // fetch current for now - collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) + collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID) if err != nil { switch { case errors.Is(err, common.ErrCollectionDropped): @@ -43,7 +43,7 @@ func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *Collect Collection: collection, } // fetch history - items, err := common.ListCollectionHistory(ctx, c.client, c.basePath, etcdversion.GetVersion(), collection.DBID, p.CollectionID) + items, err := common.ListCollectionHistory(ctx, c.client, c.metaPath, etcdversion.GetVersion(), collection.DBID, p.CollectionID) if err != nil { return nil, err } diff --git a/states/etcd/show/collection_loaded.go b/states/etcd/show/collection_loaded.go index c957fbf..d6dd3c7 100644 --- a/states/etcd/show/collection_loaded.go +++ b/states/etcd/show/collection_loaded.go @@ -26,7 +26,7 @@ type CollectionLoadedParam struct { // CollectionLoadedCommand return show collection-loaded command. func (c *ComponentShow) CollectionLoadedCommand(ctx context.Context, p *CollectionLoadedParam) (*CollectionsLoaded, error) { var total int - infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool { + infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool { total++ return p.CollectionID == 0 || p.CollectionID == info.CollectionID }) diff --git a/states/etcd/show/compaction.go b/states/etcd/show/compaction.go index 4430276..031fe0b 100644 --- a/states/etcd/show/compaction.go +++ b/states/etcd/show/compaction.go @@ -35,7 +35,7 @@ func (c *ComponentShow) CompactionTaskCommand(ctx context.Context, p *Compaction // perform get by id to accelerate - compactionTasks, err = common.ListCompactionTask(ctx, c.client, c.basePath, func(task *models.CompactionTask) bool { + compactionTasks, err = common.ListCompactionTask(ctx, c.client, c.metaPath, func(task *models.CompactionTask) bool { total++ if p.CollectionName != "" && task.GetSchema().GetName() != p.CollectionName { return false diff --git a/states/etcd/show/component.go b/states/etcd/show/component.go index 6830218..4e4ac46 100644 --- a/states/etcd/show/component.go +++ b/states/etcd/show/component.go @@ -1,20 +1,28 @@ package show import ( + "path" + "github.com/milvus-io/birdwatcher/configs" "github.com/milvus-io/birdwatcher/states/kv" ) type ComponentShow struct { - client kv.MetaKV - config *configs.Config + client kv.MetaKV + config *configs.Config + // basePath is the root path of etcd key-value pairs. + // by default is by-dev basePath string + // metaPath is the concatenated path of basePath & metaPath + // by default is by-dev/meta + metaPath string } -func NewComponent(cli kv.MetaKV, config *configs.Config, basePath string) *ComponentShow { +func NewComponent(cli kv.MetaKV, config *configs.Config, basePath string, metaPath string) *ComponentShow { return &ComponentShow{ client: cli, config: config, basePath: basePath, + metaPath: path.Join(basePath, metaPath), } } diff --git a/states/etcd/show/config_etcd.go b/states/etcd/show/config_etcd.go index 1e0891e..29fcba1 100644 --- a/states/etcd/show/config_etcd.go +++ b/states/etcd/show/config_etcd.go @@ -13,14 +13,16 @@ type ConfigEtcdParam struct { } // ConfigEtcdCommand return show config-etcd command. -func (c *ComponentShow) ConfigEtcdCommand(ctx context.Context, p *ConfigEtcdParam) { +func (c *ComponentShow) ConfigEtcdCommand(ctx context.Context, p *ConfigEtcdParam) error { keys, values, err := common.ListEtcdConfigs(ctx, c.client, c.basePath) if err != nil { fmt.Println("failed to list configurations from etcd", err.Error()) - return + return err } for i, key := range keys { fmt.Printf("Key: %s, Value: %s\n", key, values[i]) } + + return nil } diff --git a/states/etcd/show/database.go b/states/etcd/show/database.go index bf2ee57..a496e91 100644 --- a/states/etcd/show/database.go +++ b/states/etcd/show/database.go @@ -20,7 +20,7 @@ type DatabaseParam struct { // DatabaseCommand returns show database comand. func (c *ComponentShow) DatabaseCommand(ctx context.Context, p *DatabaseParam) (*Databases, error) { - dbs, err := common.ListDatabase(ctx, c.client, c.basePath, func(db *models.Database) bool { + dbs, err := common.ListDatabase(ctx, c.client, c.metaPath, func(db *models.Database) bool { return (p.DatabaseName == "" || db.Name == p.DatabaseName) && (p.DatabaseID == 0 || db.ID == p.DatabaseID) }) if err != nil { diff --git a/states/etcd/show/etcd_kv_tree.go b/states/etcd/show/etcd_kv_tree.go new file mode 100644 index 0000000..5432dc1 --- /dev/null +++ b/states/etcd/show/etcd_kv_tree.go @@ -0,0 +1,125 @@ +package show + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/states/kv" +) + +type EtcdKVTree struct { + framework.ParamBase `use:"show etcd-kv-tree" desc:"show etcd kv tree with key size of each prefix"` + Prefix string `name:"prefix" default:"" desc:"the kv prefix to show"` + Level int64 `name:"level" default:"1" desc:"the level of kv tree to show"` + TopK int64 `name:"topK" default:"10" desc:"the number of top prefixes to show per level"` +} + +// EtcdKVTreeCommand retrieves and prints the top K prefixes and their key counts up to the specified level +func (c *ComponentShow) EtcdKVTreeCommand(ctx context.Context, p *EtcdKVTree) error { + // Fetch all keys under the given prefix + keys, _, err := c.client.LoadWithPrefix(ctx, p.Prefix, kv.WithKeysOnly()) + if err != nil { + return err + } + + // Count keys for prefixes up to the specified level + result := countKeysAtEachLevel(keys, p.Prefix, int(p.Level)) + + // Print the result with topK prefixes for each level in order + printLevelsInOrder(result, int(p.TopK)) + + return nil +} + +// countKeysAtEachLevel counts the keys for each prefix at each level up to the specified level +func countKeysAtEachLevel(keys []string, basePrefix string, maxLevel int) map[int]map[string]int { + levelStats := make(map[int]map[string]int) + + for _, key := range keys { + // Ensure the key is under the base prefix + if !strings.HasPrefix(key, basePrefix) { + continue + } + + // Process prefixes for each level up to maxLevel + for level := 1; level <= maxLevel; level++ { + prefix := getNthLevelPrefix(key, basePrefix, level) + if prefix == "" { + break + } + + if _, exists := levelStats[level]; !exists { + levelStats[level] = make(map[string]int) + } + levelStats[level][prefix]++ + } + } + + return levelStats +} + +// printLevelsInOrder ensures that levels are printed in order from 1 to maxLevel +func printLevelsInOrder(result map[int]map[string]int, topK int) { + // Iterate over the levels in sorted order (from 1 to maxLevel) + for level := 1; level <= len(result); level++ { + if prefixes, exists := result[level]; exists { + printTopKPrefixes(level, prefixes, topK) + } + } +} + +// printTopKPrefixes prints the top K prefixes for a given level +func printTopKPrefixes(level int, prefixes map[string]int, topK int) { + // Convert map to slice for sorting + type prefixCount struct { + prefix string + count int + } + var sortedPrefixes []prefixCount + for prefix, count := range prefixes { + sortedPrefixes = append(sortedPrefixes, prefixCount{prefix, count}) + } + + // Sort the prefixes by count in descending order + sort.Slice(sortedPrefixes, func(i, j int) bool { + return sortedPrefixes[i].count > sortedPrefixes[j].count + }) + + // Keep only the top K prefixes + if len(sortedPrefixes) > topK { + sortedPrefixes = sortedPrefixes[:topK] + } + + // Print the result + fmt.Printf("Level %d:\n", level) + for _, p := range sortedPrefixes { + fmt.Printf(" Prefix: %s, Key Count: %d\n", strings.TrimPrefix(p.prefix, "/"), p.count) + } +} + +// getNthLevelPrefix extracts the prefix up to the specified level under basePrefix +func getNthLevelPrefix(key, basePrefix string, level int) string { + if level <= 0 || !strings.HasPrefix(key, basePrefix) { + return "" + } + + parts := strings.Split(strings.TrimPrefix(key, basePrefix), "/") + if len(parts) < level { + return "" + } + + builder := strings.Builder{} + builder.WriteString(strings.Trim(basePrefix, "/")) + + for i := 0; i < level; i++ { + if part := strings.Trim(parts[i], "/"); part != "" { + builder.WriteByte('/') + builder.WriteString(part) + } + } + + return builder.String() +} diff --git a/states/etcd/show/index.go b/states/etcd/show/index.go index 11caae7..bd8cad1 100644 --- a/states/etcd/show/index.go +++ b/states/etcd/show/index.go @@ -52,7 +52,7 @@ type IndexInfoV1 struct { } func (c *ComponentShow) listIndexMeta(ctx context.Context) ([]IndexInfoV1, error) { - prefix := path.Join(c.basePath, "root-coord/index") + prefix := path.Join(c.metaPath, "root-coord/index") indexes, keys, err := common.ListProtoObjects[etcdpb.IndexInfo](ctx, c.client, prefix) result := make([]IndexInfoV1, 0, len(indexes)) for idx, info := range indexes { @@ -70,7 +70,7 @@ func (c *ComponentShow) listIndexMeta(ctx context.Context) ([]IndexInfoV1, error } func (c *ComponentShow) listIndexMetaV2(ctx context.Context) ([]indexpbv2.FieldIndex, error) { - indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.basePath, "field-index")) + indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.metaPath, "field-index")) return indexes, err } diff --git a/states/etcd/show/partition.go b/states/etcd/show/partition.go index a8218a1..3333ee0 100644 --- a/states/etcd/show/partition.go +++ b/states/etcd/show/partition.go @@ -23,7 +23,7 @@ func (c *ComponentShow) PartitionCommand(ctx context.Context, p *PartitionParam) return nil, errors.New("collection id not provided") } - partitions, err := common.ListCollectionPartitions(ctx, c.client, c.basePath, p.CollectionID) + partitions, err := common.ListCollectionPartitions(ctx, c.client, c.metaPath, p.CollectionID) if err != nil { return nil, errors.Wrap(err, "failed to list partition info") } diff --git a/states/etcd/show/partition_loaded.go b/states/etcd/show/partition_loaded.go index 16f78e9..65aeb50 100644 --- a/states/etcd/show/partition_loaded.go +++ b/states/etcd/show/partition_loaded.go @@ -18,7 +18,7 @@ type PartitionLoadedParam struct { } func (c *ComponentShow) PartitionLoadedCommand(ctx context.Context, p *PartitionLoadedParam) (*PartitionsLoaded, error) { - partitions, err := common.ListPartitionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(pl *models.PartitionLoaded) bool { + partitions, err := common.ListPartitionLoadedInfo(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(pl *models.PartitionLoaded) bool { return (p.CollectionID == 0 || p.CollectionID == pl.CollectionID) && (p.PartitionID == 0 || p.PartitionID == pl.PartitionID) }) diff --git a/states/etcd/show/replica.go b/states/etcd/show/replica.go index a4156eb..efb260e 100644 --- a/states/etcd/show/replica.go +++ b/states/etcd/show/replica.go @@ -25,13 +25,13 @@ func (c *ComponentShow) ReplicaCommand(ctx context.Context, p *ReplicaParam) (*R var collections []*models.Collection var err error if p.CollectionID > 0 { - collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID) + collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID) if err != nil { return nil, err } collections = []*models.Collection{collection} } else { - collections, err = common.ListCollectionsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(c *models.Collection) bool { + collections, err = common.ListCollectionsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(c *models.Collection) bool { return p.CollectionID == 0 || p.CollectionID == c.ID }) if err != nil { @@ -39,7 +39,7 @@ func (c *ComponentShow) ReplicaCommand(ctx context.Context, p *ReplicaParam) (*R } } - replicas, err := common.ListReplica(ctx, c.client, c.basePath, p.CollectionID) + replicas, err := common.ListReplica(ctx, c.client, c.metaPath, p.CollectionID) if err != nil { return nil, errors.Wrap(err, "failed to list replica info") } diff --git a/states/etcd/show/resource_group.go b/states/etcd/show/resource_group.go index a97189a..3ee573d 100644 --- a/states/etcd/show/resource_group.go +++ b/states/etcd/show/resource_group.go @@ -16,7 +16,7 @@ type ResourceGroupParam struct { } func (c *ComponentShow) ResourceGroupCommand(ctx context.Context, p *ResourceGroupParam) (*ResourceGroups, error) { - rgs, err := common.ListResourceGroups(ctx, c.client, c.basePath, func(rg *models.ResourceGroup) bool { + rgs, err := common.ListResourceGroups(ctx, c.client, c.metaPath, func(rg *models.ResourceGroup) bool { return p.Name == "" || p.Name == rg.GetName() }) if err != nil { diff --git a/states/etcd/show/segment.go b/states/etcd/show/segment.go index 4e65a96..986d39a 100644 --- a/states/etcd/show/segment.go +++ b/states/etcd/show/segment.go @@ -41,7 +41,7 @@ type segStats struct { // SegmentCommand returns show segments command. func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) error { - segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool { + segments, err := common.ListSegmentsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(segment *models.Segment) bool { return (p.CollectionID == 0 || segment.CollectionID == p.CollectionID) && (p.PartitionID == 0 || segment.PartitionID == p.PartitionID) && (p.SegmentID == 0 || segment.ID == p.SegmentID) && diff --git a/states/etcd/show/segment_index.go b/states/etcd/show/segment_index.go index e90db9d..151e0cc 100644 --- a/states/etcd/show/segment_index.go +++ b/states/etcd/show/segment_index.go @@ -23,7 +23,7 @@ type SegmentIndexParam struct { // SegmentIndexCommand returns show segment-index command. func (c *ComponentShow) SegmentIndexCommand(ctx context.Context, p *SegmentIndexParam) error { - segments, err := common.ListSegments(c.client, c.basePath, func(info *datapb.SegmentInfo) bool { + segments, err := common.ListSegments(c.client, c.metaPath, func(info *datapb.SegmentInfo) bool { return (p.CollectionID == 0 || info.CollectionID == p.CollectionID) && (p.SegmentID == 0 || info.ID == p.SegmentID) }) @@ -31,7 +31,7 @@ func (c *ComponentShow) SegmentIndexCommand(ctx context.Context, p *SegmentIndex return err } - segmentIndexes, err := common.ListSegmentIndex(c.client, c.basePath) + segmentIndexes, err := common.ListSegmentIndex(ctx, c.client, c.metaPath) if err != nil { return err } @@ -40,12 +40,12 @@ func (c *ComponentShow) SegmentIndexCommand(ctx context.Context, p *SegmentIndex return err } - indexBuildInfo, err := common.ListIndex(ctx, c.client, c.basePath) + indexBuildInfo, err := common.ListIndex(ctx, c.client, c.metaPath) if err != nil { return err } - indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.basePath, "field-index")) + indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.metaPath, "field-index")) if err != nil { return err } @@ -111,6 +111,7 @@ func (c *ComponentShow) SegmentIndexCommand(ctx context.Context, p *SegmentIndex fmt.Printf("\t Index Type:%v on Field ID: %d", common.GetKVPair(idx.GetIndexInfo().GetIndexParams(), "index_type"), idx.GetIndexInfo().GetFieldID()) fmt.Printf("\tSerialized Size: %d\n", segIdx.GetSerializeSize()) fmt.Printf("\tCurrent Index Version: %d\n", segIdx.GetCurrentIndexVersion()) + fmt.Printf("\t Index Files: %v\n", segIdx.IndexFileKeys) } } else { // use v1 info @@ -140,7 +141,7 @@ func (c *ComponentShow) SegmentIndexCommand(ctx context.Context, p *SegmentIndex } func (c *ComponentShow) listSegmentIndexV2(ctx context.Context) ([]indexpbv2.SegmentIndex, error) { - prefix := path.Join(c.basePath, "segment-index") + "/" + prefix := path.Join(c.metaPath, "segment-index") + "/" result, _, err := common.ListProtoObjects[indexpbv2.SegmentIndex](ctx, c.client, prefix) return result, err } diff --git a/states/etcd/show/segment_loaded.go b/states/etcd/show/segment_loaded.go index 782c712..398c30e 100644 --- a/states/etcd/show/segment_loaded.go +++ b/states/etcd/show/segment_loaded.go @@ -24,7 +24,7 @@ func (c *ComponentShow) SegmentLoadedCommand(ctx context.Context, p *SegmentLoad if etcdversion.GetVersion() != models.LTEVersion2_1 { return nil, errors.New("list segment-loaded from meta only support before 2.1.x, try use `show segment-loaded-grpc` instead") } - segments, err := common.ListLoadedSegments(c.client, c.basePath, func(info *querypb.SegmentInfo) bool { + segments, err := common.ListLoadedSegments(c.client, c.metaPath, func(info *querypb.SegmentInfo) bool { return (p.CollectionID == 0 || info.CollectionID == p.CollectionID) && (p.SegmentID == 0 || info.SegmentID == p.SegmentID) }) diff --git a/states/etcd/show/session.go b/states/etcd/show/session.go index 23182ab..c86a5ec 100644 --- a/states/etcd/show/session.go +++ b/states/etcd/show/session.go @@ -19,7 +19,7 @@ type SessionParam struct { // SessionCommand returns show session command. // usage: show session func (c *ComponentShow) SessionCommand(ctx context.Context, p *SessionParam) (*Sessions, error) { - sessions, err := common.ListSessions(ctx, c.client, c.basePath) + sessions, err := common.ListSessions(ctx, c.client, c.metaPath) if err != nil { return nil, errors.Wrap(err, "failed to list sessions") } diff --git a/states/etcd_connect.go b/states/etcd_connect.go index 02a48a8..6408afb 100644 --- a/states/etcd_connect.go +++ b/states/etcd_connect.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "net" "os" "path" "strings" @@ -110,6 +111,14 @@ func (app *ApplicationState) connectEtcd(ctx context.Context, cp *ConnectParams) if err != nil { return err } + _, _, err = net.SplitHostPort(cp.EtcdAddr) + if err != nil { + if strings.Contains(err.Error(), "missing port in address") { + cp.EtcdAddr = cp.EtcdAddr + ":2379" + } else { + return errors.Wrap(err, "invalid etcd address") + } + } cfg := clientv3.Config{ Endpoints: []string{cp.EtcdAddr}, diff --git a/states/instance.go b/states/instance.go index b708f94..3c78280 100644 --- a/states/instance.go +++ b/states/instance.go @@ -129,7 +129,7 @@ func getInstanceState(parent *framework.CmdState, cli metakv.MetaKV, instanceNam // use audit kv state := &InstanceState{ CmdState: parent.Spawn(fmt.Sprintf("Milvus(%s)", instanceName)), - ComponentShow: show.NewComponent(cli, config, basePath), + ComponentShow: show.NewComponent(cli, config, instanceName, metaPath), ComponentRemove: remove.NewComponent(cli, config, basePath), ComponentRepair: repair.NewComponent(cli, config, basePath), ComponentSet: set.NewComponent(cli, config, basePath), diff --git a/states/scan_binlog.go b/states/scan_binlog.go index 255b6f5..5933be4 100644 --- a/states/scan_binlog.go +++ b/states/scan_binlog.go @@ -20,7 +20,7 @@ import ( ) type ScanBinlogParams struct { - framework.ParamBase `use:"scan-binlog" desc:"test expr"` + framework.ParamBase `use:"scan-binlog" desc:"scan binlog to check data"` CollectionID int64 `name:"collection" default:"0"` SegmentID int64 `name:"segment" default:"0"` Fields []string `name:"fields"` @@ -153,7 +153,7 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara } } - if pkBinlog == nil && segment.State != models.SegmentStateGrowing { + if pkBinlog == nil { fmt.Printf("PK Binlog not found, segment %d\n", segment.ID) continue } @@ -175,8 +175,8 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara err = s.scanBinlogs(pkObject, fieldObjects, func(pk storage.PrimaryKey, offset int, values map[int64]any) error { pkv := pk.GetValue() + ts := values[1].(int64) if !p.IgnoreDelete { - ts := values[1].(int64) if deletedRecords[pkv] > uint64(ts) { return nil } @@ -188,6 +188,8 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara env := lo.MapKeys(values, func(_ any, fid int64) string { return fields[fid].Name }) + env["$pk"] = pkv + env["$timestamp"] = ts program, err := expr.Compile(p.Expr, expr.Env(env)) if err != nil { return err diff --git a/states/scan_deltalog.go b/states/scan_deltalog.go new file mode 100644 index 0000000..b3aaf79 --- /dev/null +++ b/states/scan_deltalog.go @@ -0,0 +1,173 @@ +package states + +import ( + "context" + "fmt" + "log" + "strings" + + "github.com/cockroachdb/errors" + "github.com/expr-lang/expr" + "github.com/minio/minio-go/v7" + + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" + "github.com/milvus-io/birdwatcher/oss" + "github.com/milvus-io/birdwatcher/proto/v2.0/schemapb" + "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" + "github.com/milvus-io/birdwatcher/storage" +) + +type ScanDeltalogParams struct { + framework.ParamBase `use:"scan-deltalog" desc:"scan deltalog to check delta data"` + CollectionID int64 `name:"collection" default:"0"` + SegmentID int64 `name:"segment" default:"0"` + Fields []string `name:"fields"` + Expr string `name:"expr"` + MinioAddress string `name:"minioAddr"` + SkipBucketCheck bool `name:"skipBucketCheck" default:"false" desc:"skip bucket exist check due to permission issue"` + Action string `name:"action" default:"count"` + Limit int64 `name:"limit" default:"0" desc:"limit the scan line number if action is locate"` + IncludeUnhealthy bool `name:"includeUnhealthy" default:"false" desc:"also check dropped segments"` +} + +func (s *InstanceState) ScanDeltalogCommand(ctx context.Context, p *ScanDeltalogParams) error { + collection, err := common.GetCollectionByIDVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), p.CollectionID) + if err != nil { + return err + } + fmt.Println("=== Checking collection schema ===") + pkField, ok := collection.GetPKField() + if !ok { + return errors.New("pk field not found") + } + fmt.Printf("PK Field [%d] %s\n", pkField.FieldID, pkField.Name) + + fieldsMap := make(map[string]struct{}) + for _, field := range p.Fields { + fieldsMap[field] = struct{}{} + } + + fields := make(map[int64]models.FieldSchema) // make([]models.FieldSchema, 0, len(p.Fields)) + + for _, fieldSchema := range collection.Schema.Fields { + // timestamp field id + if fieldSchema.FieldID == 1 { + fields[fieldSchema.FieldID] = fieldSchema + continue + } + if _, ok := fieldsMap[fieldSchema.Name]; ok { + fmt.Printf("Output Field %s field id %d\n", fieldSchema.Name, fieldSchema.FieldID) + fields[fieldSchema.FieldID] = fieldSchema + } + } + + segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool { + return (p.SegmentID == 0 || p.SegmentID == s.ID) && + p.CollectionID == s.CollectionID && + (p.IncludeUnhealthy || s.State != models.SegmentStateDropped) + }) + if err != nil { + return err + } + + params := []oss.MinioConnectParam{oss.WithSkipCheckBucket(p.SkipBucketCheck)} + if p.MinioAddress != "" { + params = append(params, oss.WithMinioAddr(p.MinioAddress)) + } + + minioClient, bucketName, rootPath, err := s.GetMinioClientFromCfg(ctx, params...) + if err != nil { + fmt.Println("Failed to create client,", err.Error()) + return err + } + + fmt.Printf("=== start to execute \"%s\" task with filter expresion: \"%s\" ===\n", p.Action, p.Expr) + + // prepare action dataset + // count + var count int64 + // locate do nothing + + var process func(pk storage.PrimaryKey, ts uint64, segment *models.Segment, logPath string, offset int64) bool + + switch p.Action { + case "count": + process = func(pk storage.PrimaryKey, ts uint64, segment *models.Segment, logPath string, offset int64) bool { + count++ + return true + } + case "locate": + process = func(pk storage.PrimaryKey, ts uint64, segment *models.Segment, logPath string, offset int64) bool { + log.Printf("Entry found in segment %d, level = %s, log file = %s, offset = %d, pk = %v, ts = %d\n", segment.ID, segment.Level.String(), logPath, offset, pk.GetValue(), ts) + count++ + return p.Limit <= 0 || count < p.Limit + } + } + + getObject := func(binlogPath string) (*minio.Object, error) { + logPath := strings.Replace(binlogPath, "ROOT_PATH", rootPath, -1) + return minioClient.GetObject(ctx, bucketName, logPath, minio.GetObjectOptions{}) + } + for _, segment := range segments { + for _, fieldBinlogs := range segment.GetDeltalogs() { + for _, deltaBinlog := range fieldBinlogs.Binlogs { + deltaObj, err := getObject(deltaBinlog.LogPath) + if err != nil { + return err + } + reader, err := storage.NewDeltalogReader(deltaObj) + if err != nil { + return err + } + deltaData, err := reader.NextEventReader(schemapb.DataType(pkField.DataType)) + if err != nil { + return err + } + offset := int64(0) + deltaData.Range(func(pk storage.PrimaryKey, ts uint64) bool { + defer func() { + offset++ + }() + if len(p.Expr) != 0 { + env := map[string]any{ + "$pk": pk.GetValue(), + "$timestamp": ts, + } + program, err := expr.Compile(p.Expr, expr.Env(env)) + if err != nil { + return false + } + + output, err := expr.Run(program, env) + if err != nil { + fmt.Println("failed to run expression, err: ", err.Error()) + } + + match, ok := output.(bool) + if !ok { + fmt.Println("expr not return bool value") + return false + } + + if !match { + return true + } + } + + process(pk, ts, segment, deltaBinlog.LogPath, offset) + return true + }) + } + } + } + + switch strings.ToLower(p.Action) { + case "count": + fmt.Printf("Total %d entries found\n", count) + default: + } + + return nil +} diff --git a/states/visit_querycoord.go b/states/visit_querycoord.go index 7fd3759..bef51db 100644 --- a/states/visit_querycoord.go +++ b/states/visit_querycoord.go @@ -1,7 +1,12 @@ package states import ( + "context" "fmt" + "os" + "sort" + "strconv" + "text/tabwriter" "github.com/spf13/cobra" "google.golang.org/grpc" @@ -9,6 +14,7 @@ import ( "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/querypb" + commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb" querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" ) @@ -41,6 +47,32 @@ func (s *queryCoordState) SetupCommands() { s.SetupFn = s.SetupCommands } +type BalanceSegmentParam struct { + framework.ParamBase `use:"balance-segment" desc:"balance segment"` + CollectionID int64 `name:"collection" default:"0"` + SegmentIDs []int64 `name:"segment" desc:"segment ids to balance"` + SourceNodes []int64 `name:"srcNodes" desc:"from querynode ids"` + DstNodes int64 `name:"dstNode" desc:"to querynode ids"` +} + +func (s *queryCoordState) BalanceSegmentCommand(ctx context.Context, p *BalanceSegmentParam) error { + req := &querypbv2.LoadBalanceRequest{ + Base: &commonpbv2.MsgBase{ + TargetID: s.session.ServerID, + }, + CollectionID: p.CollectionID, + SealedSegmentIDs: p.SegmentIDs, + SourceNodeIDs: p.SourceNodes, + } + + resp, err := s.clientv2.LoadBalance(ctx, req) + if err != nil { + return err + } + fmt.Println(resp) + return nil +} + /* func (s *queryCoordState) ShowCollectionCmd() *cobra.Command { cmd := &cobra.Command{ @@ -87,3 +119,130 @@ func getQueryCoordState(client querypb.QueryCoordClient, conn *grpc.ClientConn, return state } + +func checkerActivationCmd(clientv2 querypbv2.QueryCoordClient, id int64) *cobra.Command { + cmd := &cobra.Command{ + Use: "checker", + Short: "checker cmd", + } + cmd.AddCommand( + checkerActivateCmd(clientv2, id), + checkerDeactivateCmd(clientv2, id), + checkerListCmd(clientv2, id), + ) + return cmd +} + +func checkerActivateCmd(clientv2 querypbv2.QueryCoordClient, id int64) *cobra.Command { + cmd := &cobra.Command{ + Use: "activate", + Short: "activate checkerID", + Run: func(cmd *cobra.Command, args []string) { + checkerID, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + fmt.Println("checkerID must be a number") + return + } + req := &querypbv2.ActivateCheckerRequest{ + Base: &commonpbv2.MsgBase{ + TargetID: id, + SourceID: -1, + }, + CheckerID: int32(checkerID), + } + + status, err := clientv2.ActivateChecker(context.Background(), req) + if err != nil { + fmt.Println(err.Error()) + return + } + if status.ErrorCode != commonpbv2.ErrorCode_Success { + fmt.Print(status.Reason) + return + } + fmt.Println("success") + }, + } + + return cmd +} + +func checkerDeactivateCmd(clientv2 querypbv2.QueryCoordClient, id int64) *cobra.Command { + cmd := &cobra.Command{ + Use: "deactivate", + Short: "deactivate checkerID", + Run: func(cmd *cobra.Command, args []string) { + checkerID, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + fmt.Println("checkerID must be a number") + return + } + req := &querypbv2.DeactivateCheckerRequest{ + Base: &commonpbv2.MsgBase{ + TargetID: id, + SourceID: -1, + }, + CheckerID: int32(checkerID), + } + + status, err := clientv2.DeactivateChecker(context.Background(), req) + if err != nil { + fmt.Println(err.Error()) + return + } + if status.ErrorCode != commonpbv2.ErrorCode_Success { + fmt.Print(status.Reason) + return + } + fmt.Println("success") + }, + } + + return cmd +} + +func checkerListCmd(clientv2 querypbv2.QueryCoordClient, id int64) *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "checker list [checkerIDs]", + Run: func(cmd *cobra.Command, args []string) { + checkerIDs := make([]int32, 0) + for _, arg := range args { + checkerID, err := strconv.ParseInt(arg, 10, 32) + if err != nil { + fmt.Println("checkerID must be number") + } + checkerIDs = append(checkerIDs, int32(checkerID)) + } + + req := &querypbv2.ListCheckersRequest{ + Base: &commonpbv2.MsgBase{ + TargetID: id, + SourceID: -1, + }, + CheckerIDs: checkerIDs, + } + + resp, err := clientv2.ListCheckers(context.Background(), req) + if err != nil { + fmt.Println(err.Error()) + return + } + if resp.Status.ErrorCode != commonpbv2.ErrorCode_Success { + fmt.Println(resp.Status.Reason) + return + } + + sort.Slice(resp.CheckerInfos, func(i, j int) bool { + return resp.CheckerInfos[i].GetId() < resp.CheckerInfos[j].GetId() + }) + w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.AlignRight|tabwriter.Debug) + fmt.Fprintln(w, "id\tdesc\tfound\tactivated") + for _, info := range resp.CheckerInfos { + fmt.Fprintf(w, "%v\t%v\t%v\t%v\n", info.GetId(), info.GetDesc(), info.GetFound(), info.GetActivated()) + } + w.Flush() + }, + } + return cmd +}