Skip to content

Commit

Permalink
enhance: Refine show segment-loaded-grpc command (#319)
Browse files Browse the repository at this point in the history
This PR:

- Use framework style to re-write this command
- Add row count information in output for balance info
- Group segment by collection
- Add `ctx` parameter for all list session util func

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Oct 9, 2024
1 parent f0b26f2 commit 3dd5cce
Show file tree
Hide file tree
Showing 21 changed files with 206 additions and 112 deletions.
4 changes: 3 additions & 1 deletion states/balance_explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func ExplainBalanceCommand(cli clientv3.KV, basePath string) *cobra.Command {
Use: "explain-balance",
Short: "explain segments and channels current balance status",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 0. set up collection, policy, servers and params
collectionID, err := cmd.Flags().GetInt64(collectionLabel)
if err != nil {
Expand All @@ -44,7 +46,7 @@ func ExplainBalanceCommand(cli clientv3.KV, basePath string) *cobra.Command {
}

// 1. set up segment distribution view, replicas and segmentInfos
sessions, err := ListServers(cli, basePath, queryNode)
sessions, err := ListServers(ctx, cli, basePath, queryNode)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion states/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type GetConfigurationParam struct {

func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error {
p.Filter = strings.ToLower(p.Filter)
sessions, err := common.ListSessions(s.client, s.basePath)
sessions, err := common.ListSessions(ctx, s.client, s.basePath)
if err != nil {
return err
}
Expand Down
223 changes: 149 additions & 74 deletions states/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,98 +3,173 @@ package states
import (
"context"
"fmt"
"sync"
"time"

"github.com/samber/lo"
"github.com/spf13/cobra"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb"
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
)

// GetDistributionCommand returns command to iterate all querynodes to list distribution.
func GetDistributionCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "segment-loaded-grpc",
Short: "list segments loaded information",
RunE: func(cmd *cobra.Command, args []string) error {
collectionID, err := cmd.Flags().GetInt64("collection")
var sealedCnt int64
if err != nil {
return err
type GetDistributionParam struct {
framework.ParamBase `use:"show segment-loaded-grpc" desc:"list segments loaded information"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
}

// GetDistributionCommand iterates all querynodes to list distribution.
func (s *InstanceState) GetDistributionCommand(ctx context.Context, p *GetDistributionParam) error {
// list segment info to get row count information
segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool {
return p.CollectionID == 0 || p.CollectionID == s.CollectionID
})
if err != nil {
return err
}

id2Segment := lo.SliceToMap(segments, func(s *models.Segment) (int64, *models.Segment) {
return s.ID, s
})

sessions, err := common.ListSessions(ctx, s.client, s.basePath)
if err != nil {
return err
}

qnSessions := lo.Filter(sessions, func(sess *models.Session, _ int) bool {
return sess.ServerName == "querynode"
})

type clientWithID struct {
client querypbv2.QueryNodeClient
id int64
}
var wg sync.WaitGroup
clientCh := make(chan clientWithID, len(qnSessions))
for _, session := range qnSessions {
wg.Add(1)
go func(session *models.Session) {
defer wg.Done()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
sessions, err := common.ListSessions(cli, basePath)

dialCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
conn, err := grpc.DialContext(dialCtx, session.Address, opts...)
cancel()
// ignore bad session
if err != nil {
return err
fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error())
return
}

for _, session := range sessions {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
clientv2 := querypbv2.NewQueryNodeClient(conn)
clientCh <- clientWithID{
client: clientv2,
id: session.ServerID,
}
}(session)
}
wg.Wait()
close(clientCh)

dialCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
conn, err := grpc.DialContext(dialCtx, session.Address, opts...)
cancel()
if err != nil {
fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error())
continue
}
type distResponse struct {
resp *querypbv2.GetDataDistributionResponse
err error
id int64
}

if session.ServerName == "querynode" {
fmt.Println("===========")
fmt.Printf("ServerID %d\n", session.ServerID)
clientv2 := querypbv2.NewQueryNodeClient(conn)
resp, err := clientv2.GetDataDistribution(context.Background(), &querypbv2.GetDataDistributionRequest{
Base: &commonpbv2.MsgBase{
SourceID: -1,
TargetID: session.ServerID,
},
})
if err != nil {
fmt.Println(err.Error())
continue
}

// print channel
for _, channel := range resp.GetChannels() {
if collectionID != 0 && channel.GetCollection() != collectionID {
continue
}
fmt.Printf("Channel %s, collection: %d, version %d\n", channel.Channel, channel.Collection, channel.Version)
}

for _, lv := range resp.GetLeaderViews() {
if collectionID != 0 && lv.GetCollection() != collectionID {
continue
}
fmt.Printf("Leader view for channel: %s\n", lv.GetChannel())
growings := lo.Uniq(lo.Union(lv.GetGrowingSegmentIDs(), lo.Keys(lv.GetGrowingSegments())))
fmt.Printf("Growing segments number: %d , ids: %v\n", len(growings), growings)
}

sealedNum := 0
for _, segment := range resp.GetSegments() {
if collectionID != 0 && segment.GetCollection() != collectionID {
continue
}
fmt.Printf("SegmentID: %d CollectionID: %d Channel: %s\n", segment.GetID(), segment.GetCollection(), segment.GetChannel())
sealedNum++
}
fmt.Println("Sealed segments number:", sealedNum)
sealedCnt += int64(sealedNum)
}
respCh := make(chan distResponse, len(qnSessions))

for idClient := range clientCh {
wg.Add(1)
go func(idClient clientWithID) {
defer wg.Done()
resp, err := idClient.client.GetDataDistribution(context.Background(), &querypbv2.GetDataDistributionRequest{
Base: &commonpbv2.MsgBase{
SourceID: -1,
TargetID: idClient.id,
},
})
respCh <- distResponse{
resp: resp,
err: err,
id: idClient.id,
}
}(idClient)
}

wg.Wait()
close(respCh)

var totalSealedCnt int
var totalSealedRowcount int64

for result := range respCh {
fmt.Println("===========")
fmt.Printf("ServerID %d\n", result.id)
if result.err != nil {
fmt.Println("Error fetching distribution:", result.err.Error())
continue
}
resp := result.resp

// print channel
for _, channel := range resp.GetChannels() {
if p.CollectionID != 0 && channel.GetCollection() != p.CollectionID {
continue
}
fmt.Printf("==== total loaded sealed segment number: %d\n", sealedCnt)
fmt.Printf("Channel %s, collection: %d, version %d\n", channel.Channel, channel.Collection, channel.Version)
}

return nil
},
for _, lv := range resp.GetLeaderViews() {
if p.CollectionID != 0 && lv.GetCollection() != p.CollectionID {
continue
}
fmt.Printf("Leader view for channel: %s\n", lv.GetChannel())
growings := lo.Uniq(lo.Union(lv.GetGrowingSegmentIDs(), lo.Keys(lv.GetGrowingSegments())))
fmt.Printf("Growing segments number: %d , ids: %v\n", len(growings), growings)
}

sealedNum := 0
sealedRowCount := int64(0)

collSegments := lo.GroupBy(resp.GetSegments(), func(segment *querypbv2.SegmentVersionInfo) int64 {
return segment.GetCollection()
})

for collection, segments := range collSegments {
if p.CollectionID != 0 && collection != p.CollectionID {
continue
}
fmt.Printf("------ Collection %d ------\n", collection)
var collRowCount int64
for _, segment := range segments {
segmentInfo := id2Segment[segment.GetID()]
var rc int64
if segmentInfo != nil {
rc = segmentInfo.NumOfRows
}
fmt.Printf("SegmentID: %d CollectionID: %d Channel: %s, NumOfRows %d\n", segment.GetID(), segment.GetCollection(), segment.GetChannel(), rc)
sealedNum++
collRowCount += rc
}
fmt.Printf("Collection RowCount total %d\n\n", collRowCount)
sealedRowCount += collRowCount
}
fmt.Println("------------------")
fmt.Printf("Sealed segments number: %d Sealed Row Num: %d\n", sealedNum, sealedRowCount)
totalSealedCnt += sealedNum
totalSealedRowcount += sealedRowCount
}
cmd.Flags().Int64("collection", 0, "collection id to filter with")
return cmd
fmt.Println("==========================================")
fmt.Printf("\n#### total loaded sealed segment number: %d, total loaded row count: %d\n", totalSealedCnt, totalSealedRowcount)
return nil
}
9 changes: 3 additions & 6 deletions states/etcd/common/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"path"
"time"

clientv3 "go.etcd.io/etcd/client/v3"

Expand All @@ -16,15 +15,13 @@ const (
)

// ListSessions returns all session.
func ListSessions(cli clientv3.KV, basePath string) ([]*models.Session, error) {
func ListSessions(ctx context.Context, cli clientv3.KV, basePath string) ([]*models.Session, error) {
prefix := path.Join(basePath, sessionPrefix)
return ListSessionsByPrefix(cli, prefix)
return ListSessionsByPrefix(ctx, cli, prefix)
}

// ListSessionsByPrefix returns all session with provided prefix.
func ListSessionsByPrefix(cli clientv3.KV, prefix string) ([]*models.Session, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
func ListSessionsByPrefix(ctx context.Context, cli clientv3.KV, prefix string) ([]*models.Session, error) {
resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion states/etcd/download/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func PullGlobalDistributionDetails(cli clientv3.KV, basePath string) *cobra.Comm
Use: "global-distribution",
Short: "pull global distribution details",
RunE: func(cmd *cobra.Command, args []string) error {
sessions, err := common.ListSessions(cli, basePath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessions, err := common.ListSessions(ctx, cli, basePath)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/remove/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type RemoveSessionParam struct {
}

func (c *ComponentRemove) RemoveSessionCommand(ctx context.Context, p *RemoveSessionParam) error {
sessions, err := common.ListSessions(c.client, c.basePath)
sessions, err := common.ListSessions(ctx, c.client, c.basePath)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion states/etcd/repair/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command {
}

func doDatacoordWatch(cli clientv3.KV, basePath string, collectionID int64, vchannels []string) {
sessions, err := common.ListSessions(cli, basePath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessions, err := common.ListSessions(ctx, cli, basePath)
if err != nil {
fmt.Println("failed to list session")
return
Expand Down
4 changes: 3 additions & 1 deletion states/etcd/repair/manual_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func ManualCompactionCommand(cli clientv3.KV, basePath string) *cobra.Command {
}

func doManualCompaction(cli clientv3.KV, basePath string, collID int64) {
sessions, err := common.ListSessions(cli, basePath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessions, err := common.ListSessions(ctx, cli, basePath)
if err != nil {
fmt.Println("failed to list session")
return
Expand Down
11 changes: 7 additions & 4 deletions states/etcd/show/legacy_qc_cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package show

import (
"context"
"fmt"
"path"

Expand All @@ -15,9 +16,9 @@ const (
queryNodeInfoPrefix = "queryCoord-queryNodeInfo"
)

func listQueryCoordClusterNodeInfo(cli clientv3.KV, basePath string) ([]*models.Session, error) {
func listQueryCoordClusterNodeInfo(ctx context.Context, cli clientv3.KV, basePath string) ([]*models.Session, error) {
prefix := path.Join(basePath, queryNodeInfoPrefix)
return common.ListSessionsByPrefix(cli, prefix)
return common.ListSessionsByPrefix(ctx, cli, prefix)
}

// QueryCoordClusterCommand returns show querycoord-cluster command.
Expand All @@ -27,13 +28,15 @@ func QueryCoordClusterCommand(cli clientv3.KV, basePath string) *cobra.Command {
Short: "display querynode information from querycoord cluster",
Aliases: []string{"querycoord-clusters"},
RunE: func(cmd *cobra.Command, args []string) error {
sessions, err := listQueryCoordClusterNodeInfo(cli, basePath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sessions, err := listQueryCoordClusterNodeInfo(ctx, cli, basePath)
if err != nil {
fmt.Println("failed to list tasks in querycoord", err.Error())
return nil
}

onlineSessons, _ := common.ListSessions(cli, basePath)
onlineSessons, _ := common.ListSessions(ctx, cli, basePath)
onlineSessionMap := make(map[UniqueID]struct{})
for _, s := range onlineSessons {
onlineSessionMap[s.ServerID] = struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type SessionParam struct {
// SessionCommand returns show session command.
// usage: show session
func (c *ComponentShow) SessionCommand(ctx context.Context, p *SessionParam) (*Sessions, error) {
sessions, err := common.ListSessions(c.client, c.basePath)
sessions, err := common.ListSessions(ctx, c.client, c.basePath)
if err != nil {
return nil, errors.Wrap(err, "failed to list sessions")
}
Expand Down
Loading

0 comments on commit 3dd5cce

Please sign in to comment.