From 9af60ee105ca17416cb9f73ae6606000f92b4ded Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 11 Jan 2023 15:19:35 +0800 Subject: [PATCH] Change remove-segment-by-id to remove segment (#83) Signed-off-by: Congqi Xia --- states/etcd/commands.go | 67 ++++++---------------- states/etcd/common/segment.go | 35 ++++++++++++ states/etcd/remove/segment.go | 88 +++++++++++++++++++++++++++++ states/etcd/repair/segment_empty.go | 15 +---- states/etcd/show/segment.go | 5 +- states/instance.go | 3 + 6 files changed, 146 insertions(+), 67 deletions(-) create mode 100644 states/etcd/remove/segment.go diff --git a/states/etcd/commands.go b/states/etcd/commands.go index 6826a376..191ce54d 100644 --- a/states/etcd/commands.go +++ b/states/etcd/commands.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/milvus-io/birdwatcher/states/etcd/remove" "github.com/milvus-io/birdwatcher/states/etcd/repair" "github.com/milvus-io/birdwatcher/states/etcd/show" "github.com/spf13/cobra" @@ -73,6 +74,21 @@ func RepairCommand(cli *clientv3.Client, basePath string) *cobra.Command { return repairCmd } +// RemoveCommand returns etcd remove commands. +// WARNING this command shall be used with EXTRA CARE! +func RemoveCommand(cli *clientv3.Client, basePath string) *cobra.Command { + removeCmd := &cobra.Command{ + Use: "remove", + } + + removeCmd.AddCommand( + // remove segment + remove.SegmentCommand(cli, basePath), + ) + + return removeCmd +} + // RawCommands provides raw "get" command to list kv in etcd func RawCommands(cli *clientv3.Client) []*cobra.Command { cmd := &cobra.Command{ @@ -95,54 +111,3 @@ func RawCommands(cli *clientv3.Client) []*cobra.Command { return []*cobra.Command{cmd} } - -/* -func removeSegmentByID(cli *clientv3.Client, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "remove-segment-by-id", - Short: "Remove segment from meta with specified segment id", - RunE: func(cmd *cobra.Command, args []string) error { - targetSegmentID, err := cmd.Flags().GetInt64("segment") - if err != nil { - return err - } - run, err := cmd.Flags().GetBool("run") - if err != nil { - return err - } - segments, err := listSegments(cli, basePath, func(info *datapb.SegmentInfo) bool { - return true - }) - if err != nil { - fmt.Println("failed to list segments", err.Error()) - return nil - } - - for _, info := range segments { - if info.GetID() == targetSegmentID { - fmt.Printf("target segment %d found:\n", info.GetID()) - printSegmentInfo(info, false) - if run { - err := removeSegment(cli, basePath, info) - if err == nil { - fmt.Printf("remove segment %d from meta succeed\n", info.GetID()) - } else { - fmt.Printf("remove segment %d failed, err: %s\n", info.GetID(), err.Error()) - } - return nil - } - if !isEmptySegment(info) { - fmt.Printf("\n[WARN] segment %d is not empty, please make sure you know what you're doing\n", targetSegmentID) - } - return nil - } - } - fmt.Printf("[WARN] cannot find segment %d\n", targetSegmentID) - return nil - }, - } - - cmd.Flags().Bool("run", false, "flags indicating whether to remove segment from meta") - cmd.Flags().Int64("segment", 0, "segment id to remove") - return cmd -}*/ diff --git a/states/etcd/common/segment.go b/states/etcd/common/segment.go index 1b56e485..6696188c 100644 --- a/states/etcd/common/segment.go +++ b/states/etcd/common/segment.go @@ -151,3 +151,38 @@ func ListLoadedSegments(cli *clientv3.Client, basePath string, filter func(*quer return segments, nil } + +// RemoveSegment delete segment entry from etcd. +func RemoveSegment(cli *clientv3.Client, basePath string, info *datapb.SegmentInfo) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + + segmentPath := path.Join(basePath, "datacoord-meta/s", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID)) + _, err := cli.Delete(ctx, segmentPath) + if err != nil { + return err + } + + // delete binlog entries + binlogPrefix := path.Join(basePath, "datacoord-meta/binlog", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID)) + _, err = cli.Delete(ctx, binlogPrefix, clientv3.WithPrefix()) + if err != nil { + fmt.Printf("failed to delete binlogs from etcd for segment %d, err: %s\n", info.GetID(), err.Error()) + } + + // delete deltalog entries + deltalogPrefix := path.Join(basePath, "datacoord-meta/deltalog", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID)) + _, err = cli.Delete(ctx, deltalogPrefix, clientv3.WithPrefix()) + if err != nil { + fmt.Printf("failed to delete deltalogs from etcd for segment %d, err: %s\n", info.GetID(), err.Error()) + } + + // delete statslog entries + statslogPrefix := path.Join(basePath, "datacoord-meta/statslog", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID)) + _, err = cli.Delete(ctx, statslogPrefix, clientv3.WithPrefix()) + if err != nil { + fmt.Printf("failed to delete statslogs from etcd for segment %d, err: %s\n", info.GetID(), err.Error()) + } + + return err +} diff --git a/states/etcd/remove/segment.go b/states/etcd/remove/segment.go new file mode 100644 index 00000000..bd9f663f --- /dev/null +++ b/states/etcd/remove/segment.go @@ -0,0 +1,88 @@ +package remove + +import ( + "fmt" + "os" + "time" + + "github.com/golang/protobuf/proto" + "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" + "github.com/milvus-io/birdwatcher/states/etcd/common" + "github.com/milvus-io/birdwatcher/states/etcd/show" + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// SegmentCommand returns remove segment command. +func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { + cmd := &cobra.Command{ + Use: "segment", + Short: "Remove segment from meta with specified segment id", + Run: func(cmd *cobra.Command, args []string) { + targetSegmentID, err := cmd.Flags().GetInt64("segment") + if err != nil { + fmt.Println(err.Error()) + return + } + run, err := cmd.Flags().GetBool("run") + if err != nil { + fmt.Println(err.Error()) + return + } + + segments, err := common.ListSegments(cli, basePath, func(segmentInfo *datapb.SegmentInfo) bool { + return segmentInfo.GetID() == targetSegmentID + }) + if err != nil { + fmt.Println("failed to list segments", err.Error()) + return + } + + if len(segments) != 1 { + fmt.Printf("failed to get segment with id %d, get %d result(s)\n", targetSegmentID, len(segments)) + return + } + + // dry run, display segment first + if !run { + show.PrintSegmentInfo(segments[0], false) + return + } + + //TODO put audit log + info := segments[0] + backupSegmentInfo(info) + fmt.Println("[WARNING] about to remove segment from etcd") + err = common.RemoveSegment(cli, basePath, info) + if err != nil { + fmt.Printf("Remove segment %d from Etcd failed, err: %s\n", info.ID, err.Error()) + return + } + fmt.Printf("Remove segment %d from etcd succeeds.\n", info.GetID()) + }, + } + + cmd.Flags().Bool("run", false, "flags indicating whether to remove segment from meta") + cmd.Flags().Int64("segment", 0, "segment id to remove") + return cmd +} + +func backupSegmentInfo(info *datapb.SegmentInfo) { + now := time.Now() + filePath := fmt.Sprintf("bw_etcd_segment_%d.%s.bak", info.GetID(), now.Format("060102-150405")) + f, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + if err != nil { + fmt.Println("failed to open backup segment file", err.Error()) + return + } + + defer f.Close() + + bs, err := proto.Marshal(info) + if err != nil { + fmt.Println("failed to marshal backup segment", err.Error()) + return + } + + f.Write(bs) +} diff --git a/states/etcd/repair/segment_empty.go b/states/etcd/repair/segment_empty.go index 5ca18dcd..a5af581f 100644 --- a/states/etcd/repair/segment_empty.go +++ b/states/etcd/repair/segment_empty.go @@ -1,10 +1,7 @@ package repair import ( - "context" "fmt" - "path" - "time" "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" @@ -37,7 +34,7 @@ func EmptySegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { fmt.Printf("suspect segment %d found:\n", info.GetID()) fmt.Printf("SegmentID: %d State: %s, Row Count:%d\n", info.ID, info.State.String(), info.NumOfRows) if run { - err := removeSegment(cli, basePath, info) + err := common.RemoveSegment(cli, basePath, info) if err == nil { fmt.Printf("remove segment %d from meta succeed\n", info.GetID()) } else { @@ -75,13 +72,3 @@ func isEmptySegment(info *datapb.SegmentInfo) bool { } return true } - -func removeSegment(cli *clientv3.Client, basePath string, info *datapb.SegmentInfo) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - - path := path.Join(basePath, "datacoord-meta/s", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID)) - _, err := cli.Delete(ctx, path) - - return err -} diff --git a/states/etcd/show/segment.go b/states/etcd/show/segment.go index 2dee058f..ed04ede3 100644 --- a/states/etcd/show/segment.go +++ b/states/etcd/show/segment.go @@ -70,7 +70,7 @@ func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { switch format { case "table": common.FillFieldsIfV2(cli, basePath, info) - printSegmentInfo(info, detail) + PrintSegmentInfo(info, detail) case "line": fmt.Printf("SegmentID: %d State: %s, Row Count:%d\n", info.ID, info.State.String(), info.NumOfRows) case "statistics": @@ -106,7 +106,8 @@ const ( tsPrintFormat = "2006-01-02 15:04:05.999 -0700" ) -func printSegmentInfo(info *datapb.SegmentInfo, detailBinlog bool) { +// PrintSegmentInfo prints segments info +func PrintSegmentInfo(info *datapb.SegmentInfo, detailBinlog bool) { fmt.Println("================================================================================") fmt.Printf("Segment ID: %d\n", info.ID) fmt.Printf("Segment State:%v", info.State) diff --git a/states/instance.go b/states/instance.go index d676b9a7..7ddbb278 100644 --- a/states/instance.go +++ b/states/instance.go @@ -35,6 +35,9 @@ func (s *instanceState) SetupCommands() { etcd.ShowCommand(cli, basePath), // repair [subcommand] options... etcd.RepairCommand(cli, basePath), + // remove [subcommand] options... + etcd.RemoveCommand(cli, basePath), + // backup [component] getBackupEtcdCmd(cli, basePath), // kill --component [component] --id [id]