From b3a51a2172f0446d4e9b65acf45f80bdc6bfe7ae Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 23 Jan 2023 13:41:41 +0800 Subject: [PATCH] Add remove orphan watch channel command (#88) Signed-off-by: Congqi Xia --- .dockerignore | 4 ++ states/backup_mock_connect.go | 4 ++ states/etcd/commands.go | 2 + states/etcd/common/channel.go | 10 ++++ states/etcd/common/collection.go | 12 +++++ states/etcd/common/list.go | 36 ++++++++++++++ states/etcd/remove/channel.go | 83 ++++++++++++++++++++++++++++++++ 7 files changed, 151 insertions(+) create mode 100644 .dockerignore create mode 100644 states/etcd/remove/channel.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..8a1f352e --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +.git +*.bak.gz +dlsegment* +bw_workspace diff --git a/states/backup_mock_connect.go b/states/backup_mock_connect.go index 90f7c223..66513f62 100644 --- a/states/backup_mock_connect.go +++ b/states/backup_mock_connect.go @@ -56,6 +56,10 @@ func (s *embedEtcdMockState) SetupCommands() { cmd.AddCommand( // show [subcommand] options... etcd.ShowCommand(s.client, rootPath), + + // remove [subcommand] options... + // used for testing + etcd.RemoveCommand(s.client, rootPath), // download-pk getDownloadPKCmd(s.client, rootPath), // inspect-pk diff --git a/states/etcd/commands.go b/states/etcd/commands.go index 191ce54d..3f74c22b 100644 --- a/states/etcd/commands.go +++ b/states/etcd/commands.go @@ -84,6 +84,8 @@ func RemoveCommand(cli *clientv3.Client, basePath string) *cobra.Command { removeCmd.AddCommand( // remove segment remove.SegmentCommand(cli, basePath), + // remove channel + remove.ChannelCommand(cli, basePath), ) return removeCmd diff --git a/states/etcd/common/channel.go b/states/etcd/common/channel.go index d5dd8b03..5110f264 100644 --- a/states/etcd/common/channel.go +++ b/states/etcd/common/channel.go @@ -6,6 +6,7 @@ import ( "time" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" + datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -17,3 +18,12 @@ func ListChannelWatchV1(cli *clientv3.Client, basePath string, filters ...func(c prefix := path.Join(basePath, "channelwatch") + "/" return ListProtoObjects(ctx, cli, prefix, filters...) } + +// ListChannelWatchV2 lists v2.2 channel watch info meta. +func ListChannelWatchV2(cli *clientv3.Client, basePath string, filters ...func(channel *datapbv2.ChannelWatchInfo) bool) ([]datapbv2.ChannelWatchInfo, []string, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + + prefix := path.Join(basePath, "channelwatch") + "/" + return ListProtoObjects(ctx, cli, prefix, filters...) +} diff --git a/states/etcd/common/collection.go b/states/etcd/common/collection.go index 3ba114be..1a61fc86 100644 --- a/states/etcd/common/collection.go +++ b/states/etcd/common/collection.go @@ -30,6 +30,18 @@ var ( CollectionTombstone = []byte{0xE2, 0x9B, 0xBC} ) +// ListCollections returns collection information. +// the field info might not include. +func ListCollections(cli *clientv3.Client, basePath string, filter func(*etcdpb.CollectionInfo) bool) ([]etcdpb.CollectionInfo, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + + colls, _, err := ListProtoObjectsAdv(ctx, cli, path.Join(basePath, "root-coord/collection"), func(value []byte) bool { + return !bytes.Equal(value, CollectionTombstone) + }, filter) + return colls, err +} + // GetCollectionByID returns collection info from etcd with provided id. func GetCollectionByID(cli *clientv3.Client, basePath string, collID int64) (*etcdpb.CollectionInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) diff --git a/states/etcd/common/list.go b/states/etcd/common/list.go index 2e4df99a..759183fe 100644 --- a/states/etcd/common/list.go +++ b/states/etcd/common/list.go @@ -40,3 +40,39 @@ LOOP: } return result, keys, nil } + +// ListProtoObjectsAdv returns proto objects with specified prefix. +// add preFilter to handle tombstone cases. +func ListProtoObjectsAdv[T any, P interface { + *T + protoiface.MessageV1 +}](ctx context.Context, cli *clientv3.Client, prefix string, preFilter func([]byte) bool, filters ...func(t *T) bool) ([]T, []string, error) { + resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) + if err != nil { + return nil, nil, err + } + result := make([]T, 0, len(resp.Kvs)) + keys := make([]string, 0, len(resp.Kvs)) +LOOP: + for _, kv := range resp.Kvs { + if !preFilter(kv.Value) { + continue + } + var elem T + info := P(&elem) + err = proto.Unmarshal(kv.Value, info) + if err != nil { + fmt.Println(err.Error()) + continue + } + + for _, filter := range filters { + if !filter(&elem) { + continue LOOP + } + } + result = append(result, elem) + keys = append(keys, string(kv.Key)) + } + return result, keys, nil +} diff --git a/states/etcd/remove/channel.go b/states/etcd/remove/channel.go new file mode 100644 index 00000000..d3637342 --- /dev/null +++ b/states/etcd/remove/channel.go @@ -0,0 +1,83 @@ +package remove + +import ( + "context" + "fmt" + + "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" + datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb" + "github.com/milvus-io/birdwatcher/states/etcd/common" + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// ChannelCommand returns remove channel command. +func ChannelCommand(cli *clientv3.Client, basePath string) *cobra.Command { + cmd := &cobra.Command{ + Use: "channel", + Short: "Remove channel from datacoord meta with specified condition if orphan", + Run: func(cmd *cobra.Command, args []string) { + channelName, err := cmd.Flags().GetString("channel") + if err != nil { + fmt.Println(err.Error()) + return + } + run, err := cmd.Flags().GetBool("run") + if err != nil { + fmt.Println(err.Error()) + return + } + + var collections []etcdpb.CollectionInfo + + colls, err := common.ListCollections(cli, basePath, func(info *etcdpb.CollectionInfo) bool { + return true + }) + if err != nil { + fmt.Println(err.Error()) + return + } + collections = append(collections, colls...) + + if len(collections) == 0 { + fmt.Println("no collection found") + return + } + + validChannels := make(map[string]struct{}) + for _, collection := range collections { + for _, vchan := range collection.GetVirtualChannelNames() { + validChannels[vchan] = struct{}{} + } + } + + watchChannels, paths, err := common.ListChannelWatchV2(cli, basePath, func(info *datapbv2.ChannelWatchInfo) bool { + if len(channelName) > 0 { + return info.GetVchan().GetChannelName() == channelName + } + return true + }) + + targets := make([]string, 0, len(paths)) + for i, watchChannel := range watchChannels { + _, ok := validChannels[watchChannel.GetVchan().GetChannelName()] + if !ok { + fmt.Printf("%s might be an orphan channel, collection id: %d\n", watchChannel.GetVchan().GetChannelName(), watchChannel.GetVchan().GetCollectionID()) + targets = append(targets, paths[i]) + } + } + + if !run { + return + } + fmt.Printf("Start to delete orphan watch channel info...") + for _, path := range paths { + cli.Delete(context.Background(), path) + } + }, + } + + cmd.Flags().Bool("run", false, "flags indicating whether to remove segment from meta") + cmd.Flags().String("channel", "", "channel name to remove") + return cmd +}