Skip to content

Commit

Permalink
enhance: Add Run flag for remove compaction command (#308)
Browse files Browse the repository at this point in the history
all remove command shall have `run` flag

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Sep 6, 2024
1 parent b597ba4 commit 7363e5d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 54 deletions.
2 changes: 0 additions & 2 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ func RemoveCommand(cli clientv3.KV, instanceName, basePath string) *cobra.Comman
remove.EtcdConfigCommand(cli, instanceName),
// remove collection has been dropped
remove.CollectionCleanCommand(cli, basePath),
// remove compaction task
remove.CompactionTaskCleanCommand(cli, basePath),
)

return removeCmd
Expand Down
96 changes: 44 additions & 52 deletions states/etcd/remove/compaction_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,58 @@ import (
"fmt"
"path"

"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
)

// CompactionTaskCleanCommand returns command to remove
func CompactionTaskCleanCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "compaction",
Short: "Remove compaction task",
Run: func(cmd *cobra.Command, args []string) {
compactionType, err := cmd.Flags().GetString("type")
if err != nil {
fmt.Println(err.Error())
return
}
triggerID, err := cmd.Flags().GetString("jobID")
if err != nil {
fmt.Println(err.Error())
return
}
planID, err := cmd.Flags().GetString("taskID")
if err != nil {
fmt.Println(err.Error())
return
}
type CompactionTaskParam struct {
framework.ParamBase `use:"remove compaction" desc:"Remove compaction task"`
CompactionType string `name:"type" default:"ClusteringCompaction" desc:"compaction type to remove"`
JobID string `name:"jobID" default:"" desc:"jobID also known as triggerID"`
TaskID string `name:"taskID" default:"" desc:"taskID also known as planID"`
Run bool `name:"run" default:"false" desc:"flag to control actually run or dry"`
}

compactionTasks, err := common.ListCompactionTask(context.TODO(), cli, basePath, func(task *models.CompactionTask) bool {
if compactionType != task.GetType().String() {
return false
}
if triggerID != "" && fmt.Sprint(task.GetTriggerID()) != triggerID {
return false
}
if planID != "" && fmt.Sprint(task.GetPlanID()) != planID {
return false
}
return true
})
if err != nil {
fmt.Println(err.Error())
return
}
// CompactionTaskCommand is the command function to remove compaction task.
func (c *ComponentRemove) CompactionTaskCommand(ctx context.Context, p *CompactionTaskParam) error {
compactionTasks, err := common.ListCompactionTask(ctx, c.client, c.basePath, func(task *models.CompactionTask) bool {
if p.CompactionType != task.GetType().String() {
return false
}
if p.JobID != "" && fmt.Sprint(task.GetTriggerID()) != p.JobID {
return false
}
if p.TaskID != "" && fmt.Sprint(task.GetPlanID()) != p.TaskID {
return false
}
return true
})
if err != nil {
return err
}

for _, task := range compactionTasks {
key := path.Join(basePath, common.CompactionTaskPrefix, task.GetType().String(), fmt.Sprint(task.GetTriggerID()), fmt.Sprint(task.GetPlanID()))
_, err = cli.Delete(context.TODO(), key, clientv3.WithPrefix())
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("clean compaction task done, prefix: %s\n", key)
}
},
if len(compactionTasks) == 0 {
fmt.Println("no compaction task found")
return nil
}

cmd.Flags().String("type", "ClusteringCompaction", "compaction type")
cmd.Flags().String("jobID", "", "jobID also known as triggerID")
cmd.Flags().String("taskID", "", "taskID also known as planID")
return cmd
if !p.Run {
for _, task := range compactionTasks {
fmt.Printf("target compact task, JobID %d, TaskID %d, Type %s\n", task.GetTriggerID(), task.GetPlanID(), task.GetType().String())
}
return nil
}

for _, task := range compactionTasks {
key := path.Join(c.basePath, common.CompactionTaskPrefix, task.GetType().String(), fmt.Sprint(task.GetTriggerID()), fmt.Sprint(task.GetPlanID()))
_, err = c.client.Delete(ctx, key, clientv3.WithPrefix())
if err != nil {
return err
}
fmt.Printf("clean compaction task done, prefix: %s\n", key)
}
return nil
}

0 comments on commit 7363e5d

Please sign in to comment.