Skip to content

Commit

Permalink
Add remove orphan watch channel command (#88)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jan 23, 2023
1 parent 22a9d74 commit b3a51a2
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.git
*.bak.gz
dlsegment*
bw_workspace
4 changes: 4 additions & 0 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (s *embedEtcdMockState) SetupCommands() {
cmd.AddCommand(
// show [subcommand] options...
etcd.ShowCommand(s.client, rootPath),

// remove [subcommand] options...
// used for testing
etcd.RemoveCommand(s.client, rootPath),
// download-pk
getDownloadPKCmd(s.client, rootPath),
// inspect-pk
Expand Down
2 changes: 2 additions & 0 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func RemoveCommand(cli *clientv3.Client, basePath string) *cobra.Command {
removeCmd.AddCommand(
// remove segment
remove.SegmentCommand(cli, basePath),
// remove channel
remove.ChannelCommand(cli, basePath),
)

return removeCmd
Expand Down
10 changes: 10 additions & 0 deletions states/etcd/common/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/milvus-io/birdwatcher/proto/v2.0/datapb"
datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -17,3 +18,12 @@ func ListChannelWatchV1(cli *clientv3.Client, basePath string, filters ...func(c
prefix := path.Join(basePath, "channelwatch") + "/"
return ListProtoObjects(ctx, cli, prefix, filters...)
}

// ListChannelWatchV2 lists v2.2 channel watch info meta.
func ListChannelWatchV2(cli *clientv3.Client, basePath string, filters ...func(channel *datapbv2.ChannelWatchInfo) bool) ([]datapbv2.ChannelWatchInfo, []string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

prefix := path.Join(basePath, "channelwatch") + "/"
return ListProtoObjects(ctx, cli, prefix, filters...)
}
12 changes: 12 additions & 0 deletions states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ var (
CollectionTombstone = []byte{0xE2, 0x9B, 0xBC}
)

// ListCollections returns collection information.
// the field info might not include.
func ListCollections(cli *clientv3.Client, basePath string, filter func(*etcdpb.CollectionInfo) bool) ([]etcdpb.CollectionInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

colls, _, err := ListProtoObjectsAdv(ctx, cli, path.Join(basePath, "root-coord/collection"), func(value []byte) bool {
return !bytes.Equal(value, CollectionTombstone)
}, filter)
return colls, err
}

// GetCollectionByID returns collection info from etcd with provided id.
func GetCollectionByID(cli *clientv3.Client, basePath string, collID int64) (*etcdpb.CollectionInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
Expand Down
36 changes: 36 additions & 0 deletions states/etcd/common/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,39 @@ LOOP:
}
return result, keys, nil
}

// ListProtoObjectsAdv returns proto objects with specified prefix.
// add preFilter to handle tombstone cases.
func ListProtoObjectsAdv[T any, P interface {
*T
protoiface.MessageV1
}](ctx context.Context, cli *clientv3.Client, prefix string, preFilter func([]byte) bool, filters ...func(t *T) bool) ([]T, []string, error) {
resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, nil, err
}
result := make([]T, 0, len(resp.Kvs))
keys := make([]string, 0, len(resp.Kvs))
LOOP:
for _, kv := range resp.Kvs {
if !preFilter(kv.Value) {
continue
}
var elem T
info := P(&elem)
err = proto.Unmarshal(kv.Value, info)
if err != nil {
fmt.Println(err.Error())
continue
}

for _, filter := range filters {
if !filter(&elem) {
continue LOOP
}
}
result = append(result, elem)
keys = append(keys, string(kv.Key))
}
return result, keys, nil
}
83 changes: 83 additions & 0 deletions states/etcd/remove/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package remove

import (
"context"
"fmt"

"github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb"
datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
)

// ChannelCommand returns remove channel command.
func ChannelCommand(cli *clientv3.Client, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "channel",
Short: "Remove channel from datacoord meta with specified condition if orphan",
Run: func(cmd *cobra.Command, args []string) {
channelName, err := cmd.Flags().GetString("channel")
if err != nil {
fmt.Println(err.Error())
return
}
run, err := cmd.Flags().GetBool("run")
if err != nil {
fmt.Println(err.Error())
return
}

var collections []etcdpb.CollectionInfo

colls, err := common.ListCollections(cli, basePath, func(info *etcdpb.CollectionInfo) bool {
return true
})
if err != nil {
fmt.Println(err.Error())
return
}
collections = append(collections, colls...)

if len(collections) == 0 {
fmt.Println("no collection found")
return
}

validChannels := make(map[string]struct{})
for _, collection := range collections {
for _, vchan := range collection.GetVirtualChannelNames() {
validChannels[vchan] = struct{}{}
}
}

watchChannels, paths, err := common.ListChannelWatchV2(cli, basePath, func(info *datapbv2.ChannelWatchInfo) bool {
if len(channelName) > 0 {
return info.GetVchan().GetChannelName() == channelName
}
return true
})

targets := make([]string, 0, len(paths))
for i, watchChannel := range watchChannels {
_, ok := validChannels[watchChannel.GetVchan().GetChannelName()]
if !ok {
fmt.Printf("%s might be an orphan channel, collection id: %d\n", watchChannel.GetVchan().GetChannelName(), watchChannel.GetVchan().GetCollectionID())
targets = append(targets, paths[i])
}
}

if !run {
return
}
fmt.Printf("Start to delete orphan watch channel info...")
for _, path := range paths {
cli.Delete(context.Background(), path)
}
},
}

cmd.Flags().Bool("run", false, "flags indicating whether to remove segment from meta")
cmd.Flags().String("channel", "", "channel name to remove")
return cmd
}

0 comments on commit b3a51a2

Please sign in to comment.