From 38b3195564c1fa55584c16086e394c57bb23ab7f Mon Sep 17 00:00:00 2001 From: "Congqi.Xia" Date: Tue, 27 Feb 2024 19:24:09 +0800 Subject: [PATCH 1/6] enhance: Add more output for check part key output Signed-off-by: Congqi.Xia --- states/check_partition_key.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/states/check_partition_key.go b/states/check_partition_key.go index 408e2021..f8c0a985 100644 --- a/states/check_partition_key.go +++ b/states/check_partition_key.go @@ -22,6 +22,7 @@ import ( type CheckPartitionKeyParam struct { framework.ParamBase `use:"check-partiton-key" desc:"check partition key field file"` + Storage string `name:"storage" ` StopIfErr bool `name:"stopIfErr" default:"true"` MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"` OutputFormat string `name:"outputFmt" default:"stdout"` @@ -110,6 +111,8 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa return field.FieldID, field }) + fmt.Printf("Start to check collection %s id = %d\n", collection.Schema.Name, collection.ID) + segments, err := common.ListSegmentsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool { return segment.CollectionID == collection.ID }) @@ -121,7 +124,9 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa var collectionErrs int var found bool - for _, segment := range segments { + fmt.Printf("Partition numer: %d, Segment number %d\n", len(partitions), len(segments)) + + for idx, segment := range segments { if segment.State == models.SegmentStateDropped || segment.State == models.SegmentStateSegmentStateNone { continue } @@ -229,6 +234,7 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa fmt.Printf("Segment %d of collection %s find %d partition-key error\n", segment.ID, collection.Schema.Name, errCnt) collectionErrs += errCnt } + fmt.Printf("%d of %d processed\n", idx, len(segments)) } if p.StopIfErr { if found { From f5b3aad2d7e0159256a41090f6239af64fafe50c Mon Sep 17 00:00:00 2001 From: "Congqi.Xia" Date: Tue, 27 Feb 2024 19:29:26 +0800 Subject: [PATCH 2/6] Add all ok for skip err mode Signed-off-by: Congqi.Xia --- states/check_partition_key.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/states/check_partition_key.go b/states/check_partition_key.go index f8c0a985..b2a60e9d 100644 --- a/states/check_partition_key.go +++ b/states/check_partition_key.go @@ -239,6 +239,8 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa if p.StopIfErr { if found { fmt.Printf("Collection %s found partition key error\n", collection.Schema.Name) + } else { + fmt.Printf("Collection %s all data OK!\n", collection.Schema.Name) } } else { fmt.Printf("Collection %s found %d partition key error\n", collection.Schema.Name, collectionErrs) From 020d7eacc6cf21dcf1bb214c83a2231ea419e697 Mon Sep 17 00:00:00 2001 From: "Congqi.Xia" Date: Tue, 27 Feb 2024 19:45:40 +0800 Subject: [PATCH 3/6] Add collection selection & pk output for quick exit Signed-off-by: Congqi.Xia --- states/check_partition_key.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/states/check_partition_key.go b/states/check_partition_key.go index b2a60e9d..a5e0dfb0 100644 --- a/states/check_partition_key.go +++ b/states/check_partition_key.go @@ -22,21 +22,28 @@ import ( type CheckPartitionKeyParam struct { framework.ParamBase `use:"check-partiton-key" desc:"check partition key field file"` - Storage string `name:"storage" ` + Storage string `name:"storage" default:"auto" desc:"storage service configuration mode"` StopIfErr bool `name:"stopIfErr" default:"true"` MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"` OutputFormat string `name:"outputFmt" default:"stdout"` + + CollectionID int64 `name:"collection" default:"0" desc:"target collection to scan, default scan all partition key collections"` } var errQuickExit = errors.New("quick exit") func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPartitionKeyParam) error { - collections, err := common.ListCollectionsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion()) + collections, err := common.ListCollectionsVersion(ctx, s.client, s.basePath, etcdversion.GetVersion(), func(collection *models.Collection) bool { + return p.CollectionID == 0 || collection.ID == p.CollectionID + }) if err != nil { return err } - minioClient, bucketName, rootPath, err := s.GetMinioClientFromCfg(ctx, p.MinioAddress) + var minioClient *minio.Client + var bucketName, rootPath string + + minioClient, bucketName, rootPath, err = s.GetMinioClientFromCfg(ctx, p.MinioAddress) if err != nil { return err } @@ -198,10 +205,10 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa output[pkField.Name] = pk.GetValue() switch p.OutputFormat { case "stdout": + fmt.Printf("PK %v partition does not follow partition key rule (%s=%v)\n", pk.GetValue(), partKeyField.Name, partKeyValue) if p.StopIfErr { return errQuickExit } - fmt.Printf("PK %v partition does not follow partition key rule\n", pk) case "json": bs, err := json.Marshal(output) if err != nil { From eedc4a49e697e9e7be428824848560d6bbc19fe6 Mon Sep 17 00:00:00 2001 From: "Congqi.Xia" Date: Tue, 27 Feb 2024 20:42:06 +0800 Subject: [PATCH 4/6] Add progress output Signed-off-by: Congqi.Xia --- states/check_partition_key.go | 22 ++++++++++++++++++---- states/frame_screen.go | 1 + 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/states/check_partition_key.go b/states/check_partition_key.go index a5e0dfb0..f3daeb31 100644 --- a/states/check_partition_key.go +++ b/states/check_partition_key.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/cockroachdb/errors" + "github.com/gosuri/uilive" "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/schemapb" @@ -24,6 +25,7 @@ type CheckPartitionKeyParam struct { framework.ParamBase `use:"check-partiton-key" desc:"check partition key field file"` Storage string `name:"storage" default:"auto" desc:"storage service configuration mode"` StopIfErr bool `name:"stopIfErr" default:"true"` + OutputPrimaryKey bool `name:"outputPK" default:"true" desc:"print error record primary key info in stdout mode"` MinioAddress string `name:"minioAddr" default:"" desc:"the minio address to override, leave empty to use milvus.yaml value"` OutputFormat string `name:"outputFmt" default:"stdout"` @@ -131,7 +133,11 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa var collectionErrs int var found bool - fmt.Printf("Partition numer: %d, Segment number %d\n", len(partitions), len(segments)) + fmt.Printf("Partition number: %d, Segment number %d\n", len(partitions), len(segments)) + progressDisplay := uilive.New() + progressFmt := "Scan segment ... %d%%(%d/%d) %s\n" + progressDisplay.Start() + fmt.Fprintf(progressDisplay, progressFmt, 0, 0, len(segments), "") for idx, segment := range segments { if segment.State == models.SegmentStateDropped || segment.State == models.SegmentStateSegmentStateNone { @@ -205,7 +211,9 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa output[pkField.Name] = pk.GetValue() switch p.OutputFormat { case "stdout": - fmt.Printf("PK %v partition does not follow partition key rule (%s=%v)\n", pk.GetValue(), partKeyField.Name, partKeyValue) + if p.OutputPrimaryKey { + fmt.Printf("PK %v partition does not follow partition key rule (%s=%v)\n", pk.GetValue(), partKeyField.Name, partKeyValue) + } if p.StopIfErr { return errQuickExit } @@ -237,12 +245,18 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa if p.StopIfErr && found { break } + progress := idx * 100 / len(segments) + status := fmt.Sprintf("%d [%s]", segment.ID, colorReady.Sprint("done")) if errCnt > 0 { - fmt.Printf("Segment %d of collection %s find %d partition-key error\n", segment.ID, collection.Schema.Name, errCnt) + // fmt.Printf("Segment %d of collection %s find %d partition-key error\n", segment.ID, collection.Schema.Name, errCnt) collectionErrs += errCnt + status = fmt.Sprintf("%d [%s](%d)", segment.ID, colorPending.Sprint("error"), errCnt) } - fmt.Printf("%d of %d processed\n", idx, len(segments)) + + fmt.Fprintf(progressDisplay, progressFmt, progress, idx+1, len(segments), status) } + progressDisplay.Stop() + fmt.Println() if p.StopIfErr { if found { fmt.Printf("Collection %s found partition key error\n", collection.Schema.Name) diff --git a/states/frame_screen.go b/states/frame_screen.go index a1d99d07..c44bfaeb 100644 --- a/states/frame_screen.go +++ b/states/frame_screen.go @@ -38,6 +38,7 @@ func NewFrameScreen(lines int, display *uilive.Writer) *FrameScreen { var ( colorPending = color.New(color.FgYellow) colorReady = color.New(color.FgGreen) + colorError = color.New(color.FgRed) levelColor = map[eventlog.Level]*color.Color{ eventlog.Level_Debug: color.New(color.FgGreen), From 7f69bfc23678db1185581eccdfcda89d9ab6fd4d Mon Sep 17 00:00:00 2001 From: "Congqi.Xia" Date: Tue, 27 Feb 2024 20:47:06 +0800 Subject: [PATCH 5/6] Fix progress percentage & error color Signed-off-by: Congqi.Xia --- states/check_partition_key.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/states/check_partition_key.go b/states/check_partition_key.go index f3daeb31..79e2167a 100644 --- a/states/check_partition_key.go +++ b/states/check_partition_key.go @@ -245,12 +245,11 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa if p.StopIfErr && found { break } - progress := idx * 100 / len(segments) + progress := (idx + 1) * 100 / len(segments) status := fmt.Sprintf("%d [%s]", segment.ID, colorReady.Sprint("done")) if errCnt > 0 { - // fmt.Printf("Segment %d of collection %s find %d partition-key error\n", segment.ID, collection.Schema.Name, errCnt) collectionErrs += errCnt - status = fmt.Sprintf("%d [%s](%d)", segment.ID, colorPending.Sprint("error"), errCnt) + status = fmt.Sprintf("%d [%s](%d)", segment.ID, colorError.Sprint("error"), errCnt) } fmt.Fprintf(progressDisplay, progressFmt, progress, idx+1, len(segments), status) From 0d72d8dfa506726fa832efba2649329606512fe4 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Wed, 28 Feb 2024 11:52:51 +0800 Subject: [PATCH 6/6] Add json-pk output format Signed-off-by: Congqi Xia --- states/check_partition_key.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/states/check_partition_key.go b/states/check_partition_key.go index 79e2167a..a2396cb7 100644 --- a/states/check_partition_key.go +++ b/states/check_partition_key.go @@ -151,6 +151,9 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa switch p.OutputFormat { case "stdout": selector = func(field int64) bool { return field == partKeyField.FieldID } + case "json-pk": + selector = func(field int64) bool { return field == partKeyField.FieldID } + fallthrough case "json": f, err = os.Create(fmt.Sprintf("%d-%d.json", collection.ID, segment.ID)) if err != nil { @@ -217,7 +220,7 @@ func (s *InstanceState) CheckPartitionKeyCommand(ctx context.Context, p *CheckPa if p.StopIfErr { return errQuickExit } - case "json": + case "json", "json-pk": bs, err := json.Marshal(output) if err != nil { fmt.Println(err.Error())