Skip to content

Commit

Permalink
refine show segment ouputs (#292)
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <yun.zhang@zilliz.com>
jaime0815 authored Aug 12, 2024
1 parent 1389fcf commit 4b0dc4b
Showing 1 changed file with 103 additions and 62 deletions.
165 changes: 103 additions & 62 deletions states/etcd/show/segment.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/samber/lo"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
@@ -25,6 +27,15 @@ type SegmentParam struct {
Level string `name:"level" default:"" desc:"target segment level"`
}

type segStats struct {
binlogLogSize map[int64]int64
binlogMemSize map[int64]int64
deltaLogSize int64
deltaMemSize int64
statsLogSize int64
statsMemSize int64
}

// SegmentCommand returns show segments command.
func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) error {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool {
@@ -46,79 +57,81 @@ func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) err
var small, other int
var smallCnt, otherCnt int64

var (
binlogLogSize = make(map[int64]int64)
binlogMemSize = make(map[int64]int64)
deltaLogSize int64
deltaMemSize int64
statsLogSize int64
statsMemSize int64
)

for _, info := range segments {
if info.State != models.SegmentStateDropped {
totalRC += info.NumOfRows
healthy++
}
switch info.State {
case models.SegmentStateGrowing:
growing++
case models.SegmentStateSealed:
sealed++
case models.SegmentStateFlushing, models.SegmentStateFlushed:
flushed++
if float64(info.NumOfRows)/float64(info.MaxRowNum) < 0.2 {
small++
smallCnt += info.NumOfRows
} else {
other++
otherCnt += info.NumOfRows
}
case models.SegmentStateDropped:
dropped++
collectionID2SegStats := make(map[int64]*segStats)
collectionID2Segments := lo.GroupBy(segments, func(s *models.Segment) int64 {
return s.CollectionID
})

for collectionID, segs := range collectionID2Segments {
fmt.Printf("===============================CollectionID: %d===========================\n", collectionID)
collectionID2SegStats[collectionID] = &segStats{
binlogLogSize: make(map[int64]int64),
binlogMemSize: make(map[int64]int64),
}

switch p.Format {
case "table":
PrintSegmentInfo(info, p.Detail)
case "line":
fmt.Printf("SegmentID: %d State: %s, Level: %s, Row Count:%d, PartitionStatsVersion:%d \n", info.ID, info.State.String(),
info.Level.String(), info.NumOfRows, info.PartitionStatsVersion)
case "statistics":
for _, info := range segs {
if info.State != models.SegmentStateDropped {
for _, binlog := range info.GetBinlogs() {
for _, log := range binlog.Binlogs {
binlogLogSize[binlog.FieldID] += log.LogSize
binlogMemSize[binlog.FieldID] += log.MemSize
}
totalRC += info.NumOfRows
healthy++
}
switch info.State {
case models.SegmentStateGrowing:
growing++
case models.SegmentStateSealed:
sealed++
case models.SegmentStateFlushing, models.SegmentStateFlushed:
flushed++
if float64(info.NumOfRows)/float64(info.MaxRowNum) < 0.2 {
small++
smallCnt += info.NumOfRows
} else {
other++
otherCnt += info.NumOfRows
}
for _, delta := range info.GetDeltalogs() {
for _, log := range delta.Binlogs {
deltaLogSize += log.LogSize
deltaMemSize += log.MemSize
case models.SegmentStateDropped:
dropped++
}

switch p.Format {
case "table":
PrintSegmentInfo(info, p.Detail)
case "line":
fmt.Printf("SegmentID: %d PartitionID: %d State: %s, Level: %s, Row Count:%d, PartitionStatsVersion:%d \n",
info.ID, info.PartitionID, info.State.String(), info.Level.String(), info.NumOfRows, info.PartitionStatsVersion)
case "statistics":
if info.State != models.SegmentStateDropped {
for _, binlog := range info.GetBinlogs() {
for _, log := range binlog.Binlogs {
collectionID2SegStats[collectionID].binlogLogSize[binlog.FieldID] += log.LogSize
collectionID2SegStats[collectionID].binlogMemSize[binlog.FieldID] += log.MemSize
}
}
}
for _, statslog := range info.GetStatslogs() {
for _, binlog := range statslog.Binlogs {
statsLogSize += binlog.LogSize
statsMemSize += binlog.MemSize
for _, delta := range info.GetDeltalogs() {
for _, log := range delta.Binlogs {
collectionID2SegStats[collectionID].deltaLogSize += log.LogSize
collectionID2SegStats[collectionID].deltaMemSize += log.MemSize
}
}
for _, statslog := range info.GetStatslogs() {
for _, binlog := range statslog.Binlogs {
collectionID2SegStats[collectionID].statsLogSize += binlog.LogSize
collectionID2SegStats[collectionID].statsMemSize += binlog.MemSize
}
}
}
default:
err := fmt.Errorf("unsupport format:%s\n", p.Format)
return err
}
}
if p.Format == "statistics" {
outputStats("Collection", collectionID2SegStats[collectionID])
}
fmt.Printf("\n")
}

if p.Format == "statistics" {
var totalBinlogLogSize int64
var totalBinlogMemSize int64
for fieldID, logSize := range binlogLogSize {
memSize := binlogMemSize[fieldID]
fmt.Printf("\t field binlog size[%d]: %s, mem size[%d]: %s\n", fieldID, hrSize(logSize), fieldID, hrSize(memSize))
totalBinlogLogSize += logSize
totalBinlogMemSize += memSize
}
fmt.Printf("--- Total binlog size: %s, mem size: %s\n", hrSize(totalBinlogLogSize), hrSize(totalBinlogMemSize))
fmt.Printf("--- Total deltalog size: %s, mem size: %s\n", hrSize(deltaLogSize), hrSize(deltaMemSize))
fmt.Printf("--- Total statslog size: %s, mem size: %s\n", hrSize(statsLogSize), hrSize(statsMemSize))
outputStats("Total", lo.Values(collectionID2SegStats)...)
}

fmt.Printf("--- Growing: %d, Sealed: %d, Flushed: %d, Dropped: %d\n", growing, sealed, flushed, dropped)
@@ -127,6 +140,34 @@ func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) err
return nil
}

func outputStats(scope string, stats ...*segStats) {
var totalBinlogLogSize int64
var totalBinlogMemSize int64
var TotalDeltaLogSize int64
var TotalDeltaMemSize int64
var TotalStatsLogSize int64
var TotalStatsMemSize int64
for _, s := range stats {
for fieldID, logSize := range s.binlogLogSize {
memSize := s.binlogMemSize[fieldID]
if scope != "Total" {
fmt.Printf("field[%d] binlog size: %s, mem size: %s\n", fieldID, hrSize(logSize), hrSize(memSize))
}
totalBinlogLogSize += logSize
totalBinlogMemSize += memSize
}

TotalDeltaLogSize += s.deltaLogSize
TotalDeltaMemSize += s.deltaMemSize
TotalStatsLogSize += s.statsLogSize
TotalStatsMemSize += s.statsMemSize
}

fmt.Printf("--- %s binlog size: %s, mem size: %s\n", scope, hrSize(totalBinlogLogSize), hrSize(totalBinlogMemSize))
fmt.Printf("--- %s deltalog size: %s, mem size: %s\n", scope, hrSize(TotalDeltaLogSize), hrSize(TotalDeltaMemSize))
fmt.Printf("--- %s statslog size: %s, mem size: %s\n", scope, hrSize(TotalStatsLogSize), hrSize(TotalStatsMemSize))
}

func hrSize(size int64) string {
sf := float64(size)
units := []string{"Bytes", "KB", "MB", "GB"}

0 comments on commit 4b0dc4b

Please sign in to comment.