From 980f5609568e10d7daf302e4084daec623bc3ae0 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Wed, 9 Oct 2024 12:00:38 +0800 Subject: [PATCH 1/2] enhance: Refine `show segment-loaded-grpc` command This PR: - Use framework style to re-write this command - Add row count information in output for balance info - Group segment by collection - Add `ctx` parameter for all list session util func Signed-off-by: Congqi Xia --- states/balance_explain.go | 4 +- states/configuration.go | 2 +- states/distribution.go | 225 ++++++++++++++++-------- states/etcd/common/session.go | 9 +- states/etcd/download/distribution.go | 4 +- states/etcd/remove/session.go | 2 +- states/etcd/repair/channel.go | 4 +- states/etcd/repair/manual_compaction.go | 4 +- states/etcd/show/legacy_qc_cluster.go | 11 +- states/etcd/show/session.go | 2 +- states/etcd_backup.go | 12 +- states/healthz.go | 2 +- states/instance.go | 5 - states/management.go | 4 +- states/metrics.go | 5 +- states/minio.go | 2 +- states/pprof.go | 2 +- states/probe.go | 4 +- states/update_log_level.go | 9 +- states/util.go | 4 +- states/visit.go | 4 +- 21 files changed, 208 insertions(+), 112 deletions(-) diff --git a/states/balance_explain.go b/states/balance_explain.go index c4c3191c..acebb6ce 100644 --- a/states/balance_explain.go +++ b/states/balance_explain.go @@ -33,6 +33,8 @@ func ExplainBalanceCommand(cli clientv3.KV, basePath string) *cobra.Command { Use: "explain-balance", Short: "explain segments and channels current balance status", RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // 0. set up collection, policy, servers and params collectionID, err := cmd.Flags().GetInt64(collectionLabel) if err != nil { @@ -44,7 +46,7 @@ func ExplainBalanceCommand(cli clientv3.KV, basePath string) *cobra.Command { } // 1. set up segment distribution view, replicas and segmentInfos - sessions, err := ListServers(cli, basePath, queryNode) + sessions, err := ListServers(ctx, cli, basePath, queryNode) if err != nil { return err } diff --git a/states/configuration.go b/states/configuration.go index 4fa34cf3..16029fc8 100644 --- a/states/configuration.go +++ b/states/configuration.go @@ -27,7 +27,7 @@ type GetConfigurationParam struct { func (s *InstanceState) GetConfigurationCommand(ctx context.Context, p *GetConfigurationParam) error { p.Filter = strings.ToLower(p.Filter) - sessions, err := common.ListSessions(s.client, s.basePath) + sessions, err := common.ListSessions(ctx, s.client, s.basePath) if err != nil { return err } diff --git a/states/distribution.go b/states/distribution.go index 071b3c18..f64ddc31 100644 --- a/states/distribution.go +++ b/states/distribution.go @@ -3,98 +3,175 @@ package states import ( "context" "fmt" + "sync" "time" "github.com/samber/lo" - "github.com/spf13/cobra" - clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/milvus-io/birdwatcher/framework" + "github.com/milvus-io/birdwatcher/models" commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb" + "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" ) -// GetDistributionCommand returns command to iterate all querynodes to list distribution. -func GetDistributionCommand(cli clientv3.KV, basePath string) *cobra.Command { - cmd := &cobra.Command{ - Use: "segment-loaded-grpc", - Short: "list segments loaded information", - RunE: func(cmd *cobra.Command, args []string) error { - collectionID, err := cmd.Flags().GetInt64("collection") - var sealedCnt int64 - if err != nil { - return err +type GetDistributionParam struct { + framework.ParamBase `use:"show segment-loaded-grpc" desc:"list segments loaded information"` + CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"` +} + +// GetDistributionCommand iterates all querynodes to list distribution. +func (s *InstanceState) GetDistributionCommand(ctx context.Context, p *GetDistributionParam) error { + // list segment info to get row count information + segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool { + return p.CollectionID == 0 || p.CollectionID == s.CollectionID + }) + + if err != nil { + return err + } + + id2Segment := lo.SliceToMap(segments, func(s *models.Segment) (int64, *models.Segment) { + return s.ID, s + }) + + sessions, err := common.ListSessions(ctx, s.client, s.basePath) + if err != nil { + return err + } + + qnSessions := lo.Filter(sessions, func(sess *models.Session, _ int) bool { + return sess.ServerName == "querynode" + }) + + type clientWithID struct { + client querypbv2.QueryNodeClient + id int64 + } + var wg sync.WaitGroup + clientCh := make(chan clientWithID, len(qnSessions)) + for _, session := range qnSessions { + wg.Add(1) + go func(session *models.Session) { + defer wg.Done() + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), } - sessions, err := common.ListSessions(cli, basePath) + + dialCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + conn, err := grpc.DialContext(dialCtx, session.Address, opts...) + cancel() + // ignore bad session if err != nil { - return err + fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) + return } - for _, session := range sessions { - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - } + clientv2 := querypbv2.NewQueryNodeClient(conn) + clientCh <- clientWithID{ + client: clientv2, + id: session.ServerID, + } + }(session) + } + wg.Wait() + close(clientCh) - dialCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - conn, err := grpc.DialContext(dialCtx, session.Address, opts...) - cancel() - if err != nil { - fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) - continue - } + type distResponse struct { + resp *querypb.GetDataDistributionResponse + err error + id int64 + } - if session.ServerName == "querynode" { - fmt.Println("===========") - fmt.Printf("ServerID %d\n", session.ServerID) - clientv2 := querypbv2.NewQueryNodeClient(conn) - resp, err := clientv2.GetDataDistribution(context.Background(), &querypbv2.GetDataDistributionRequest{ - Base: &commonpbv2.MsgBase{ - SourceID: -1, - TargetID: session.ServerID, - }, - }) - if err != nil { - fmt.Println(err.Error()) - continue - } - - // print channel - for _, channel := range resp.GetChannels() { - if collectionID != 0 && channel.GetCollection() != collectionID { - continue - } - fmt.Printf("Channel %s, collection: %d, version %d\n", channel.Channel, channel.Collection, channel.Version) - } - - for _, lv := range resp.GetLeaderViews() { - if collectionID != 0 && lv.GetCollection() != collectionID { - continue - } - fmt.Printf("Leader view for channel: %s\n", lv.GetChannel()) - growings := lo.Uniq(lo.Union(lv.GetGrowingSegmentIDs(), lo.Keys(lv.GetGrowingSegments()))) - fmt.Printf("Growing segments number: %d , ids: %v\n", len(growings), growings) - } - - sealedNum := 0 - for _, segment := range resp.GetSegments() { - if collectionID != 0 && segment.GetCollection() != collectionID { - continue - } - fmt.Printf("SegmentID: %d CollectionID: %d Channel: %s\n", segment.GetID(), segment.GetCollection(), segment.GetChannel()) - sealedNum++ - } - fmt.Println("Sealed segments number:", sealedNum) - sealedCnt += int64(sealedNum) - } + respCh := make(chan distResponse, len(qnSessions)) + + for idClient := range clientCh { + wg.Add(1) + go func(idClient clientWithID) { + defer wg.Done() + resp, err := idClient.client.GetDataDistribution(context.Background(), &querypbv2.GetDataDistributionRequest{ + Base: &commonpbv2.MsgBase{ + SourceID: -1, + TargetID: idClient.id, + }, + }) + respCh <- distResponse{ + resp: resp, + err: err, + id: idClient.id, + } + }(idClient) + } + + wg.Wait() + close(respCh) + + var totalSealedCnt int + var totalSealedRowcount int64 + + for result := range respCh { + fmt.Println("===========") + fmt.Printf("ServerID %d\n", result.id) + if result.err != nil { + fmt.Println("Error fetching distribution:", err.Error()) + continue + } + resp := result.resp + + // print channel + for _, channel := range resp.GetChannels() { + if p.CollectionID != 0 && channel.GetCollection() != p.CollectionID { + continue + } + fmt.Printf("Channel %s, collection: %d, version %d\n", channel.Channel, channel.Collection, channel.Version) + } + + for _, lv := range resp.GetLeaderViews() { + if p.CollectionID != 0 && lv.GetCollection() != p.CollectionID { + continue } - fmt.Printf("==== total loaded sealed segment number: %d\n", sealedCnt) + fmt.Printf("Leader view for channel: %s\n", lv.GetChannel()) + growings := lo.Uniq(lo.Union(lv.GetGrowingSegmentIDs(), lo.Keys(lv.GetGrowingSegments()))) + fmt.Printf("Growing segments number: %d , ids: %v\n", len(growings), growings) + } + + sealedNum := 0 + sealedRowCount := int64(0) - return nil - }, + collSegments := lo.GroupBy(resp.GetSegments(), func(segment *querypbv2.SegmentVersionInfo) int64 { + return segment.GetCollection() + }) + + for collection, segments := range collSegments { + if p.CollectionID != 0 && collection != p.CollectionID { + continue + } + fmt.Printf("------ Collection %d ------\n", collection) + var collRowCount int64 + for _, segment := range segments { + segmentInfo := id2Segment[segment.GetID()] + var rc int64 + if segmentInfo != nil { + rc = segmentInfo.NumOfRows + } + fmt.Printf("SegmentID: %d CollectionID: %d Channel: %s, NumOfRows %d\n", segment.GetID(), segment.GetCollection(), segment.GetChannel(), rc) + sealedNum++ + collRowCount += rc + } + fmt.Printf("Collection RowCount total %d\n\n", collRowCount) + sealedRowCount += collRowCount + } + fmt.Println("------------------") + fmt.Printf("Sealed segments number: %d Sealed Row Num: %d\n", sealedNum, sealedRowCount) + totalSealedCnt += sealedNum + totalSealedRowcount += sealedRowCount } - cmd.Flags().Int64("collection", 0, "collection id to filter with") - return cmd + fmt.Println("==========================================") + fmt.Printf("\n#### total loaded sealed segment number: %d, total loaded row count: %d\n", totalSealedCnt, totalSealedRowcount) + return nil } diff --git a/states/etcd/common/session.go b/states/etcd/common/session.go index 0b3b6407..f731692b 100644 --- a/states/etcd/common/session.go +++ b/states/etcd/common/session.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "path" - "time" clientv3 "go.etcd.io/etcd/client/v3" @@ -16,15 +15,13 @@ const ( ) // ListSessions returns all session. -func ListSessions(cli clientv3.KV, basePath string) ([]*models.Session, error) { +func ListSessions(ctx context.Context, cli clientv3.KV, basePath string) ([]*models.Session, error) { prefix := path.Join(basePath, sessionPrefix) - return ListSessionsByPrefix(cli, prefix) + return ListSessionsByPrefix(ctx, cli, prefix) } // ListSessionsByPrefix returns all session with provided prefix. -func ListSessionsByPrefix(cli clientv3.KV, prefix string) ([]*models.Session, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() +func ListSessionsByPrefix(ctx context.Context, cli clientv3.KV, prefix string) ([]*models.Session, error) { resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, err diff --git a/states/etcd/download/distribution.go b/states/etcd/download/distribution.go index f63f0274..c7dab104 100644 --- a/states/etcd/download/distribution.go +++ b/states/etcd/download/distribution.go @@ -25,7 +25,9 @@ func PullGlobalDistributionDetails(cli clientv3.KV, basePath string) *cobra.Comm Use: "global-distribution", Short: "pull global distribution details", RunE: func(cmd *cobra.Command, args []string) error { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { return err } diff --git a/states/etcd/remove/session.go b/states/etcd/remove/session.go index f7e9c50b..cd82e401 100644 --- a/states/etcd/remove/session.go +++ b/states/etcd/remove/session.go @@ -20,7 +20,7 @@ type RemoveSessionParam struct { } func (c *ComponentRemove) RemoveSessionCommand(ctx context.Context, p *RemoveSessionParam) error { - sessions, err := common.ListSessions(c.client, c.basePath) + sessions, err := common.ListSessions(ctx, c.client, c.basePath) if err != nil { return err } diff --git a/states/etcd/repair/channel.go b/states/etcd/repair/channel.go index 32264319..1e81f8e2 100644 --- a/states/etcd/repair/channel.go +++ b/states/etcd/repair/channel.go @@ -84,7 +84,9 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command { } func doDatacoordWatch(cli clientv3.KV, basePath string, collectionID int64, vchannels []string) { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { fmt.Println("failed to list session") return diff --git a/states/etcd/repair/manual_compaction.go b/states/etcd/repair/manual_compaction.go index 0dbad8bc..fc911864 100644 --- a/states/etcd/repair/manual_compaction.go +++ b/states/etcd/repair/manual_compaction.go @@ -37,7 +37,9 @@ func ManualCompactionCommand(cli clientv3.KV, basePath string) *cobra.Command { } func doManualCompaction(cli clientv3.KV, basePath string, collID int64) { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { fmt.Println("failed to list session") return diff --git a/states/etcd/show/legacy_qc_cluster.go b/states/etcd/show/legacy_qc_cluster.go index 35b25e90..ce79e98a 100644 --- a/states/etcd/show/legacy_qc_cluster.go +++ b/states/etcd/show/legacy_qc_cluster.go @@ -1,6 +1,7 @@ package show import ( + "context" "fmt" "path" @@ -15,9 +16,9 @@ const ( queryNodeInfoPrefix = "queryCoord-queryNodeInfo" ) -func listQueryCoordClusterNodeInfo(cli clientv3.KV, basePath string) ([]*models.Session, error) { +func listQueryCoordClusterNodeInfo(ctx context.Context, cli clientv3.KV, basePath string) ([]*models.Session, error) { prefix := path.Join(basePath, queryNodeInfoPrefix) - return common.ListSessionsByPrefix(cli, prefix) + return common.ListSessionsByPrefix(ctx, cli, prefix) } // QueryCoordClusterCommand returns show querycoord-cluster command. @@ -27,13 +28,15 @@ func QueryCoordClusterCommand(cli clientv3.KV, basePath string) *cobra.Command { Short: "display querynode information from querycoord cluster", Aliases: []string{"querycoord-clusters"}, RunE: func(cmd *cobra.Command, args []string) error { - sessions, err := listQueryCoordClusterNodeInfo(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := listQueryCoordClusterNodeInfo(ctx, cli, basePath) if err != nil { fmt.Println("failed to list tasks in querycoord", err.Error()) return nil } - onlineSessons, _ := common.ListSessions(cli, basePath) + onlineSessons, _ := common.ListSessions(ctx, cli, basePath) onlineSessionMap := make(map[UniqueID]struct{}) for _, s := range onlineSessons { onlineSessionMap[s.ServerID] = struct{}{} diff --git a/states/etcd/show/session.go b/states/etcd/show/session.go index d9a61176..23182aba 100644 --- a/states/etcd/show/session.go +++ b/states/etcd/show/session.go @@ -19,7 +19,7 @@ type SessionParam struct { // SessionCommand returns show session command. // usage: show session func (c *ComponentShow) SessionCommand(ctx context.Context, p *SessionParam) (*Sessions, error) { - sessions, err := common.ListSessions(c.client, c.basePath) + sessions, err := common.ListSessions(ctx, c.client, c.basePath) if err != nil { return nil, errors.Wrap(err, "failed to list sessions") } diff --git a/states/etcd_backup.go b/states/etcd_backup.go index c6e2b098..7298e817 100644 --- a/states/etcd_backup.go +++ b/states/etcd_backup.go @@ -266,7 +266,9 @@ func backupEtcdV2(cli clientv3.KV, base, prefix string, w *bufio.Writer, opt *ba } func backupMetrics(cli clientv3.KV, basePath string, w *bufio.Writer) error { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { return err } @@ -307,7 +309,9 @@ func backupMetrics(cli clientv3.KV, basePath string, w *bufio.Writer) error { } func backupAppMetrics(cli clientv3.KV, basePath string, w *bufio.Writer) error { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { return err } @@ -376,7 +380,9 @@ func backupAppMetrics(cli clientv3.KV, basePath string, w *bufio.Writer) error { } func backupConfiguration(cli clientv3.KV, basePath string, w *bufio.Writer) error { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { return err } diff --git a/states/healthz.go b/states/healthz.go index 3ce03752..99de341b 100644 --- a/states/healthz.go +++ b/states/healthz.go @@ -73,7 +73,7 @@ func (c *InstanceState) checkSegmentTarget(ctx context.Context) ([]*HealthzCheck } validIDs := lo.SliceToMap(segments, func(segment *models.Segment) (int64, struct{}) { return segment.ID, struct{}{} }) - sessions, err := common.ListSessions(c.client, c.basePath) + sessions, err := common.ListSessions(ctx, c.client, c.basePath) if err != nil { return nil, err } diff --git a/states/instance.go b/states/instance.go index 1f366ff7..8121c9ca 100644 --- a/states/instance.go +++ b/states/instance.go @@ -52,8 +52,6 @@ func (s *InstanceState) SetupCommands() { showCmd.AddCommand( // show current-version CurrentVersionCommand(), - // show segment-loaded-grpc - GetDistributionCommand(cli, basePath), ) cmd.AddCommand( @@ -79,9 +77,6 @@ func (s *InstanceState) SetupCommands() { // update-log-level log_level_name component serverId getUpdateLogLevelCmd(cli, basePath), - // segment-loaded - GetDistributionCommand(cli, basePath), - // balance-explain ExplainBalanceCommand(cli, basePath), diff --git a/states/management.go b/states/management.go index 59ebbeca..682177f4 100644 --- a/states/management.go +++ b/states/management.go @@ -31,7 +31,7 @@ type ListMetricsPortParam struct { // ListMetricsPortCommand returns command logic listing metrics port for all online components. func (s *InstanceState) ListMetricsPortCommand(ctx context.Context, p *ListMetricsPortParam) error { - sessions, err := common.ListSessions(s.client, s.basePath) + sessions, err := common.ListSessions(ctx, s.client, s.basePath) if err != nil { return errors.Wrap(err, "failed to list sessions") } @@ -132,7 +132,7 @@ func getEventLogPort(ctx context.Context, ip string, metricPort string) int { } func (s *InstanceState) prepareListenerClients(ctx context.Context) ([]*eventlog.Listener, error) { - sessions, err := common.ListSessions(s.client, s.basePath) + sessions, err := common.ListSessions(ctx, s.client, s.basePath) if err != nil { return nil, errors.Wrap(err, "failed to list sessions") } diff --git a/states/metrics.go b/states/metrics.go index c8081a71..2c092391 100644 --- a/states/metrics.go +++ b/states/metrics.go @@ -1,6 +1,7 @@ package states import ( + "context" "fmt" "io/ioutil" "net/http" @@ -18,7 +19,9 @@ func getFetchMetricsCmd(cli clientv3.KV, basePath string) *cobra.Command { Use: "fetch-metrics", Short: "fetch metrics from milvus instances", Run: func(cmd *cobra.Command, args []string) { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { fmt.Println("failed to list session", err.Error()) return diff --git a/states/minio.go b/states/minio.go index 65666bf4..c4366d48 100644 --- a/states/minio.go +++ b/states/minio.go @@ -37,7 +37,7 @@ func (s *InstanceState) TestMinioCfgCommand(ctx context.Context, p *TestMinioCfg } func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, params ...oss.MinioConnectParam) (client *minio.Client, bucketName, rootPath string, err error) { - sessions, err := common.ListSessions(s.client, s.basePath) + sessions, err := common.ListSessions(ctx, s.client, s.basePath) if err != nil { return nil, "", "", err } diff --git a/states/pprof.go b/states/pprof.go index 28bac9b7..d14807dd 100644 --- a/states/pprof.go +++ b/states/pprof.go @@ -31,7 +31,7 @@ func (s *InstanceState) GetPprofCommand(ctx context.Context, p *PprofParam) erro return errors.New("invalid pprof metric type provided") } - sessions, err := common.ListSessions(s.client, s.basePath) + sessions, err := common.ListSessions(ctx, s.client, s.basePath) if err != nil { return errors.Wrap(err, "failed to list sessions") } diff --git a/states/probe.go b/states/probe.go index 87dd9da2..511b42ee 100644 --- a/states/probe.go +++ b/states/probe.go @@ -63,7 +63,7 @@ func getProbeQueryCmd(cli clientv3.KV, basePath string) *cobra.Command { return } - sessions, err := common.ListSessions(cli, basePath) + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { fmt.Println("failed to list online sessions", err.Error()) return @@ -214,7 +214,7 @@ func getProbePKCmd(cli clientv3.KV, basePath string) *cobra.Command { bs, _ := proto.Marshal(plan) - sessions, err := common.ListSessions(cli, basePath) + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { fmt.Println("failed to list online sessions", err.Error()) return diff --git a/states/update_log_level.go b/states/update_log_level.go index 516d7dcd..0e1afd08 100644 --- a/states/update_log_level.go +++ b/states/update_log_level.go @@ -2,6 +2,7 @@ package states import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -24,7 +25,9 @@ func getShowLogLevelCmd(cli clientv3.KV, basePath string) *cobra.Command { Use: "show-log-level", Short: "show log level of milvus roles", RunE: func(cmd *cobra.Command, args []string) error { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { return err } @@ -67,7 +70,9 @@ func getUpdateLogLevelCmd(cli clientv3.KV, basePath string) *cobra.Command { Use: "update-log-level log_level [component] [serverId]", Short: "update log level of milvus role ", RunE: func(cmd *cobra.Command, args []string) error { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { return err } diff --git a/states/util.go b/states/util.go index 31b99caa..2530bb3c 100644 --- a/states/util.go +++ b/states/util.go @@ -213,8 +213,8 @@ func pathPart(p string, idx int) (string, error) { return parts[idx], nil } -func ListServers(cli clientv3.KV, basePath string, serverName string) ([]*models.Session, error) { - sessions, err := stateCommon.ListSessions(cli, basePath) +func ListServers(ctx context.Context, cli clientv3.KV, basePath string, serverName string) ([]*models.Session, error) { + sessions, err := stateCommon.ListSessions(ctx, cli, basePath) if err != nil { return nil, err } diff --git a/states/visit.go b/states/visit.go index b22ba011..5779b740 100644 --- a/states/visit.go +++ b/states/visit.go @@ -80,7 +80,9 @@ func setNextState(sessionType string, conn *grpc.ClientConn, statePtr *State, se } func getSessionConnect(cli clientv3.KV, basePath string, id int64, sessionType string) (session *models.Session, conn *grpc.ClientConn, err error) { - sessions, err := common.ListSessions(cli, basePath) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sessions, err := common.ListSessions(ctx, cli, basePath) if err != nil { fmt.Println("failed to list session, err:", err.Error()) return nil, nil, err From c2898f4a3ed44cb8883671415114c5f15c188ae8 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Wed, 9 Oct 2024 12:05:29 +0800 Subject: [PATCH 2/2] Fix lint issue Signed-off-by: Congqi Xia --- states/distribution.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/states/distribution.go b/states/distribution.go index f64ddc31..170da9c2 100644 --- a/states/distribution.go +++ b/states/distribution.go @@ -13,7 +13,6 @@ import ( "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb" - "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" "github.com/milvus-io/birdwatcher/states/etcd/common" etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" @@ -30,7 +29,6 @@ func (s *InstanceState) GetDistributionCommand(ctx context.Context, p *GetDistri segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(s *models.Segment) bool { return p.CollectionID == 0 || p.CollectionID == s.CollectionID }) - if err != nil { return err } @@ -63,7 +61,7 @@ func (s *InstanceState) GetDistributionCommand(ctx context.Context, p *GetDistri grpc.WithBlock(), } - dialCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + dialCtx, cancel := context.WithTimeout(ctx, 2*time.Second) conn, err := grpc.DialContext(dialCtx, session.Address, opts...) cancel() // ignore bad session @@ -83,7 +81,7 @@ func (s *InstanceState) GetDistributionCommand(ctx context.Context, p *GetDistri close(clientCh) type distResponse struct { - resp *querypb.GetDataDistributionResponse + resp *querypbv2.GetDataDistributionResponse err error id int64 } @@ -118,7 +116,7 @@ func (s *InstanceState) GetDistributionCommand(ctx context.Context, p *GetDistri fmt.Println("===========") fmt.Printf("ServerID %d\n", result.id) if result.err != nil { - fmt.Println("Error fetching distribution:", err.Error()) + fmt.Println("Error fetching distribution:", result.err.Error()) continue } resp := result.resp