Skip to content

Commit

Permalink
Change remove-segment-by-id to remove segment (#83)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jan 11, 2023
1 parent acb1ab5 commit 9af60ee
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 67 deletions.
67 changes: 16 additions & 51 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/milvus-io/birdwatcher/states/etcd/remove"
"github.com/milvus-io/birdwatcher/states/etcd/repair"
"github.com/milvus-io/birdwatcher/states/etcd/show"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -73,6 +74,21 @@ func RepairCommand(cli *clientv3.Client, basePath string) *cobra.Command {
return repairCmd
}

// RemoveCommand returns etcd remove commands.
// WARNING this command shall be used with EXTRA CARE!
func RemoveCommand(cli *clientv3.Client, basePath string) *cobra.Command {
removeCmd := &cobra.Command{
Use: "remove",
}

removeCmd.AddCommand(
// remove segment
remove.SegmentCommand(cli, basePath),
)

return removeCmd
}

// RawCommands provides raw "get" command to list kv in etcd
func RawCommands(cli *clientv3.Client) []*cobra.Command {
cmd := &cobra.Command{
Expand All @@ -95,54 +111,3 @@ func RawCommands(cli *clientv3.Client) []*cobra.Command {

return []*cobra.Command{cmd}
}

/*
func removeSegmentByID(cli *clientv3.Client, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "remove-segment-by-id",
Short: "Remove segment from meta with specified segment id",
RunE: func(cmd *cobra.Command, args []string) error {
targetSegmentID, err := cmd.Flags().GetInt64("segment")
if err != nil {
return err
}
run, err := cmd.Flags().GetBool("run")
if err != nil {
return err
}
segments, err := listSegments(cli, basePath, func(info *datapb.SegmentInfo) bool {
return true
})
if err != nil {
fmt.Println("failed to list segments", err.Error())
return nil
}
for _, info := range segments {
if info.GetID() == targetSegmentID {
fmt.Printf("target segment %d found:\n", info.GetID())
printSegmentInfo(info, false)
if run {
err := removeSegment(cli, basePath, info)
if err == nil {
fmt.Printf("remove segment %d from meta succeed\n", info.GetID())
} else {
fmt.Printf("remove segment %d failed, err: %s\n", info.GetID(), err.Error())
}
return nil
}
if !isEmptySegment(info) {
fmt.Printf("\n[WARN] segment %d is not empty, please make sure you know what you're doing\n", targetSegmentID)
}
return nil
}
}
fmt.Printf("[WARN] cannot find segment %d\n", targetSegmentID)
return nil
},
}
cmd.Flags().Bool("run", false, "flags indicating whether to remove segment from meta")
cmd.Flags().Int64("segment", 0, "segment id to remove")
return cmd
}*/
35 changes: 35 additions & 0 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,38 @@ func ListLoadedSegments(cli *clientv3.Client, basePath string, filter func(*quer

return segments, nil
}

// RemoveSegment delete segment entry from etcd.
func RemoveSegment(cli *clientv3.Client, basePath string, info *datapb.SegmentInfo) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

segmentPath := path.Join(basePath, "datacoord-meta/s", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID))
_, err := cli.Delete(ctx, segmentPath)
if err != nil {
return err
}

// delete binlog entries
binlogPrefix := path.Join(basePath, "datacoord-meta/binlog", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID))
_, err = cli.Delete(ctx, binlogPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Printf("failed to delete binlogs from etcd for segment %d, err: %s\n", info.GetID(), err.Error())
}

// delete deltalog entries
deltalogPrefix := path.Join(basePath, "datacoord-meta/deltalog", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID))
_, err = cli.Delete(ctx, deltalogPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Printf("failed to delete deltalogs from etcd for segment %d, err: %s\n", info.GetID(), err.Error())
}

// delete statslog entries
statslogPrefix := path.Join(basePath, "datacoord-meta/statslog", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID))
_, err = cli.Delete(ctx, statslogPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Printf("failed to delete statslogs from etcd for segment %d, err: %s\n", info.GetID(), err.Error())
}

return err
}
88 changes: 88 additions & 0 deletions states/etcd/remove/segment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package remove

import (
"fmt"
"os"
"time"

"github.com/golang/protobuf/proto"
"github.com/milvus-io/birdwatcher/proto/v2.0/datapb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
"github.com/milvus-io/birdwatcher/states/etcd/show"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

// SegmentCommand returns remove segment command.
func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "segment",
Short: "Remove segment from meta with specified segment id",
Run: func(cmd *cobra.Command, args []string) {
targetSegmentID, err := cmd.Flags().GetInt64("segment")
if err != nil {
fmt.Println(err.Error())
return
}
run, err := cmd.Flags().GetBool("run")
if err != nil {
fmt.Println(err.Error())
return
}

segments, err := common.ListSegments(cli, basePath, func(segmentInfo *datapb.SegmentInfo) bool {
return segmentInfo.GetID() == targetSegmentID
})
if err != nil {
fmt.Println("failed to list segments", err.Error())
return
}

if len(segments) != 1 {
fmt.Printf("failed to get segment with id %d, get %d result(s)\n", targetSegmentID, len(segments))
return
}

// dry run, display segment first
if !run {
show.PrintSegmentInfo(segments[0], false)
return
}

//TODO put audit log
info := segments[0]
backupSegmentInfo(info)
fmt.Println("[WARNING] about to remove segment from etcd")
err = common.RemoveSegment(cli, basePath, info)
if err != nil {
fmt.Printf("Remove segment %d from Etcd failed, err: %s\n", info.ID, err.Error())
return
}
fmt.Printf("Remove segment %d from etcd succeeds.\n", info.GetID())
},
}

cmd.Flags().Bool("run", false, "flags indicating whether to remove segment from meta")
cmd.Flags().Int64("segment", 0, "segment id to remove")
return cmd
}

func backupSegmentInfo(info *datapb.SegmentInfo) {
now := time.Now()
filePath := fmt.Sprintf("bw_etcd_segment_%d.%s.bak", info.GetID(), now.Format("060102-150405"))
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
fmt.Println("failed to open backup segment file", err.Error())
return
}

defer f.Close()

bs, err := proto.Marshal(info)
if err != nil {
fmt.Println("failed to marshal backup segment", err.Error())
return
}

f.Write(bs)
}
15 changes: 1 addition & 14 deletions states/etcd/repair/segment_empty.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package repair

import (
"context"
"fmt"
"path"
"time"

"github.com/milvus-io/birdwatcher/proto/v2.0/commonpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/datapb"
Expand Down Expand Up @@ -37,7 +34,7 @@ func EmptySegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command {
fmt.Printf("suspect segment %d found:\n", info.GetID())
fmt.Printf("SegmentID: %d State: %s, Row Count:%d\n", info.ID, info.State.String(), info.NumOfRows)
if run {
err := removeSegment(cli, basePath, info)
err := common.RemoveSegment(cli, basePath, info)
if err == nil {
fmt.Printf("remove segment %d from meta succeed\n", info.GetID())
} else {
Expand Down Expand Up @@ -75,13 +72,3 @@ func isEmptySegment(info *datapb.SegmentInfo) bool {
}
return true
}

func removeSegment(cli *clientv3.Client, basePath string, info *datapb.SegmentInfo) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

path := path.Join(basePath, "datacoord-meta/s", fmt.Sprintf("%d/%d/%d", info.CollectionID, info.PartitionID, info.ID))
_, err := cli.Delete(ctx, path)

return err
}
5 changes: 3 additions & 2 deletions states/etcd/show/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command {
switch format {
case "table":
common.FillFieldsIfV2(cli, basePath, info)
printSegmentInfo(info, detail)
PrintSegmentInfo(info, detail)
case "line":
fmt.Printf("SegmentID: %d State: %s, Row Count:%d\n", info.ID, info.State.String(), info.NumOfRows)
case "statistics":
Expand Down Expand Up @@ -106,7 +106,8 @@ const (
tsPrintFormat = "2006-01-02 15:04:05.999 -0700"
)

func printSegmentInfo(info *datapb.SegmentInfo, detailBinlog bool) {
// PrintSegmentInfo prints segments info
func PrintSegmentInfo(info *datapb.SegmentInfo, detailBinlog bool) {
fmt.Println("================================================================================")
fmt.Printf("Segment ID: %d\n", info.ID)
fmt.Printf("Segment State:%v", info.State)
Expand Down
3 changes: 3 additions & 0 deletions states/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func (s *instanceState) SetupCommands() {
etcd.ShowCommand(cli, basePath),
// repair [subcommand] options...
etcd.RepairCommand(cli, basePath),
// remove [subcommand] options...
etcd.RemoveCommand(cli, basePath),

// backup [component]
getBackupEtcdCmd(cli, basePath),
// kill --component [component] --id [id]
Expand Down

0 comments on commit 9af60ee

Please sign in to comment.