diff --git a/internal/datanode/compaction/executor.go b/internal/datanode/compaction/executor.go index 231ad09101637..6df00b51795e1 100644 --- a/internal/datanode/compaction/executor.go +++ b/internal/datanode/compaction/executor.go @@ -18,6 +18,7 @@ package compaction import ( "context" + "fmt" "sync" "github.com/samber/lo" @@ -25,6 +26,7 @@ import ( "golang.org/x/sync/semaphore" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -177,6 +179,24 @@ func (e *executor) executeTask(task Compactor) { e.completed.Insert(result.GetPlanID(), result) e.completedCompactor.Insert(result.GetPlanID(), task) + getLogSize := func(binlogs []*datapb.FieldBinlog) int64 { + size := int64(0) + for _, binlog := range binlogs { + for _, fbinlog := range binlog.GetBinlogs() { + size += fbinlog.GetLogSize() + } + } + return size + } + + totalSize := lo.SumBy(result.Segments, func(seg *datapb.CompactionSegment) int64 { + return getLogSize(seg.GetInsertLogs()) + + getLogSize(seg.GetField2StatslogPaths()) + + getLogSize(seg.GetBm25Logs()) + + getLogSize(seg.GetDeltalogs()) + }) + metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.CompactionDataSourceLabel, fmt.Sprint(task.GetCollection())).Add(float64(totalSize)) + log.Info("end to execute compaction") } diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index 9d0f46a2437e6..3bc7fcb093364 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -130,6 +130,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { } metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(float64(t.flushedSize)) + metrics.DataNodeWriteBinlogSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, fmt.Sprint(t.collectionID), t.level.String()).Add(float64(t.flushedSize)) metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows)) metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds())) diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index 2e98cf677fdb8..56b293584c96f 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -58,6 +58,18 @@ var ( segmentLevelLabelName, }) + DataNodeWriteBinlogSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "write_data_size", + Help: "byte size of datanode write to object storage, including flushed size", + }, []string{ + nodeIDLabelName, + dataSourceLabelName, + collectionIDLabelName, + }) + DataNodeFlushedRows = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: milvusNamespace, @@ -249,6 +261,7 @@ func RegisterDataNode(registry *prometheus.Registry) { registry.MustRegister(DataNodeFlushReqCounter) registry.MustRegister(DataNodeFlushedSize) registry.MustRegister(DataNodeFlushedRows) + registry.MustRegister(DataNodeWriteBinlogSize) // compaction related registry.MustRegister(DataNodeCompactionLatency) registry.MustRegister(DataNodeCompactionLatencyInQueue) @@ -290,4 +303,8 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel DataNodeCompactionMissingDeleteCount.Delete(prometheus.Labels{ collectionIDLabelName: fmt.Sprint(collectionID), }) + + DataNodeWriteBinlogSize.Delete(prometheus.Labels{ + collectionIDLabelName: fmt.Sprint(collectionID), + }) } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 932b0fc4de87c..bf682f696defc 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -59,6 +59,7 @@ const ( StreamingDataSourceLabel = "streaming" BulkinsertDataSourceLabel = "bulkinsert" + CompactionDataSourceLabel = "compaction" Leader = "OnLeader" FromLeader = "FromLeader"