Skip to content

Commit

Permalink
enhance: Enable to observe write amplification
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Feb 6, 2025
1 parent 427b6a4 commit 48da6f1
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 0 deletions.
20 changes: 20 additions & 0 deletions internal/datanode/compaction/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package compaction

import (
"context"
"fmt"
"sync"

"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -177,6 +179,24 @@ func (e *executor) executeTask(task Compactor) {
e.completed.Insert(result.GetPlanID(), result)
e.completedCompactor.Insert(result.GetPlanID(), task)

getLogSize := func(binlogs []*datapb.FieldBinlog) int64 {
size := int64(0)
for _, binlog := range binlogs {
for _, fbinlog := range binlog.GetBinlogs() {
size += fbinlog.GetLogSize()
}
}
return size
}

totalSize := lo.SumBy(result.Segments, func(seg *datapb.CompactionSegment) int64 {
return getLogSize(seg.GetInsertLogs()) +
getLogSize(seg.GetField2StatslogPaths()) +
getLogSize(seg.GetBm25Logs()) +
getLogSize(seg.GetDeltalogs())
})
metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.CompactionDataSourceLabel, fmt.Sprint(task.GetCollection())).Add(float64(totalSize))

log.Info("end to execute compaction")
}

Expand Down
1 change: 1 addition & 0 deletions internal/flushcommon/syncmgr/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
}

metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(float64(t.flushedSize))
metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, fmt.Sprint(t.collectionID), t.level.String()).Add(float64(t.flushedSize))
metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows))

metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds()))
Expand Down
17 changes: 17 additions & 0 deletions pkg/metrics/datanode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ var (
segmentLevelLabelName,
})

DataNodeWriteBinlogSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "write_data_size",
Help: "byte size of datanode write to object storage, including flushed size",
}, []string{
nodeIDLabelName,
dataSourceLabelName,
collectionIDLabelName,
})

DataNodeFlushedRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Expand Down Expand Up @@ -249,6 +261,7 @@ func RegisterDataNode(registry *prometheus.Registry) {
registry.MustRegister(DataNodeFlushReqCounter)
registry.MustRegister(DataNodeFlushedSize)
registry.MustRegister(DataNodeFlushedRows)
registry.MustRegister(DataNodeWriteBinlogSize)
// compaction related
registry.MustRegister(DataNodeCompactionLatency)
registry.MustRegister(DataNodeCompactionLatencyInQueue)
Expand Down Expand Up @@ -290,4 +303,8 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel
DataNodeCompactionMissingDeleteCount.Delete(prometheus.Labels{
collectionIDLabelName: fmt.Sprint(collectionID),
})

DataNodeWriteBinlogSize.Delete(prometheus.Labels{
collectionIDLabelName: fmt.Sprint(collectionID),
})
}
1 change: 1 addition & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (

StreamingDataSourceLabel = "streaming"
BulkinsertDataSourceLabel = "bulkinsert"
CompactionDataSourceLabel = "compaction"

Leader = "OnLeader"
FromLeader = "FromLeader"
Expand Down

0 comments on commit 48da6f1

Please sign in to comment.