From f0b7446e6b62edce1478f0df8fad0f77674c5cd0 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 5 Feb 2025 11:01:10 +0800 Subject: [PATCH] enhance: Remove unnecessary collection and partition label from the metrics (#39536) /kind improvement --------- Signed-off-by: bigsheeper --- .../datacoord/compaction_task_clustering.go | 4 ++-- internal/datacoord/import_checker.go | 21 +++++++++++++++++-- internal/datacoord/import_checker_test.go | 2 +- internal/querynodev2/metrics_info.go | 2 -- internal/querynodev2/segments/manager.go | 4 ---- internal/querynodev2/services_test.go | 2 -- pkg/metrics/datacoord_metrics.go | 9 +++++++- pkg/metrics/datanode_metrics.go | 21 ------------------- pkg/metrics/metrics.go | 2 -- pkg/metrics/querynode_metrics.go | 3 --- 10 files changed, 30 insertions(+), 40 deletions(-) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 5dd312df3d8ca..282fd98942dd3 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -108,7 +108,7 @@ func (t *clusteringCompactionTask) Process() bool { lastStateDuration := ts - t.GetTaskProto().GetLastStateStartTime() log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration)) metrics.DataCoordCompactionLatency. - WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState). + WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState). Observe(float64(lastStateDuration * 1000)) updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)} @@ -117,7 +117,7 @@ func (t *clusteringCompactionTask) Process() bool { elapse := ts - t.GetTaskProto().StartTime log.Info("clustering compaction task total elapse", zap.Duration("costs", time.Duration(elapse)*time.Second)) metrics.DataCoordCompactionLatency. - WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), "total"). + WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), "total"). Observe(float64(elapse * 1000)) } err = t.updateAndSaveTaskMeta(updateOps...) diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 1206e7a9ee482..628fee027cae5 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -114,7 +114,8 @@ func (c *importChecker) Start() { for collID, collJobs := range jobsByColl { c.checkCollection(collID, collJobs) } - c.LogStats() + c.LogJobStats(jobs) + c.LogTaskStats() } } } @@ -125,7 +126,23 @@ func (c *importChecker) Close() { }) } -func (c *importChecker) LogStats() { +func (c *importChecker) LogJobStats(jobs []ImportJob) { + byState := lo.GroupBy(jobs, func(job ImportJob) string { + return job.GetState().String() + }) + stateNum := make(map[string]int) + for state := range internalpb.ImportJobState_value { + if state == internalpb.ImportJobState_None.String() { + continue + } + num := len(byState[state]) + stateNum[state] = num + metrics.ImportJobs.WithLabelValues(state).Set(float64(num)) + } + log.Info("import job stats", zap.Any("stateNum", stateNum)) +} + +func (c *importChecker) LogTaskStats() { logFunc := func(tasks []ImportTask, taskType TaskType) { byState := lo.GroupBy(tasks, func(t ImportTask) datapb.ImportTaskStateV2 { return t.GetState() diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index f3d7428829615..acc6301e477a1 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -139,7 +139,7 @@ func (s *ImportCheckerSuite) TestLogStats() { err = s.imeta.AddTask(context.TODO(), it1) s.NoError(err) - s.checker.LogStats() + s.checker.LogTaskStats() } func (s *ImportCheckerSuite) TestCheckJob() { diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 722b8b844c45d..6609d52676ff0 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -106,7 +106,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error collections[segment.Collection()], nodeID, fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), segments.SegmentTypeGrowing.String(), ).Set(float64(numEntities)) } @@ -136,7 +135,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error collections[segment.Collection()], nodeID, fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), segments.SegmentTypeSealed.String(), ).Set(float64(numEntities)) } diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 9dc066e04f438..bda5721965ddd 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -401,9 +401,7 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg metrics.QueryNodeNumSegments.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), segment.Type().String(), - fmt.Sprint(len(segment.Indexes())), segment.Level().String(), ).Inc() } @@ -707,9 +705,7 @@ func (mgr *segmentManager) release(ctx context.Context, segment Segment) { metrics.QueryNodeNumSegments.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(segment.Collection()), - fmt.Sprint(segment.Partition()), segment.Type().String(), - fmt.Sprint(len(segment.Indexes())), segment.Level().String(), ).Dec() diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 40c021612e421..9cdfe9224304b 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -546,10 +546,8 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() { l0Segment := segments.NewMockSegment(suite.T()) l0Segment.EXPECT().ID().Return(10000) l0Segment.EXPECT().Collection().Return(suite.collectionID) - l0Segment.EXPECT().Partition().Return(common.AllPartitionsID) l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0) l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed) - l0Segment.EXPECT().Indexes().Return(nil) l0Segment.EXPECT().Shard().Return(suite.channel) l0Segment.EXPECT().Release(ctx).Return() diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index a2c8ef9ea263d..a772e5c07478e 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -206,7 +206,6 @@ var ( Buckets: longTaskBuckets, }, []string{ isVectorFieldLabelName, - collectionIDLabelName, channelNameLabelName, compactionTypeLabelName, stageLabelName, @@ -327,6 +326,14 @@ var ( Help: "number of IndexNodes managed by IndexCoord", }, []string{}) + ImportJobs = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "import_jobs", + Help: "the import jobs grouping by state", + }, []string{"import_state"}) + ImportTasks = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index 1a44da1b164f1..2e98cf677fdb8 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -91,18 +91,6 @@ var ( collectionIDLabelName, }) - DataNodeProduceTimeTickLag = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.DataNodeRole, - Name: "produce_tt_lag_ms", - Help: "now time minus tt pts per physical channel", - }, []string{ - nodeIDLabelName, - collectionIDLabelName, - channelNameLabelName, - }) - DataNodeConsumeMsgCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: milvusNamespace, @@ -269,7 +257,6 @@ func RegisterDataNode(registry *prometheus.Registry) { // deprecated metrics registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken) registry.MustRegister(DataNodeNumProducers) - registry.MustRegister(DataNodeProduceTimeTickLag) } func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) { @@ -281,14 +268,6 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel collectionIDLabelName: fmt.Sprint(collectionID), }) - DataNodeProduceTimeTickLag. - Delete( - prometheus.Labels{ - nodeIDLabelName: fmt.Sprint(nodeID), - collectionIDLabelName: fmt.Sprint(collectionID), - channelNameLabelName: channel, - }) - for _, label := range []string{AllLabel, DeleteLabel, InsertLabel} { DataNodeConsumeMsgCount. Delete( diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 7c7296c88176d..932b0fc4de87c 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -93,7 +93,6 @@ const ( indexTaskStatusLabelName = "index_task_status" msgTypeLabelName = "msg_type" collectionIDLabelName = "collection_id" - partitionIDLabelName = "partition_id" channelNameLabelName = "channel_name" functionLabelName = "function_name" queryTypeLabelName = "query_type" @@ -109,7 +108,6 @@ const ( roleNameLabelName = "role_name" cacheNameLabelName = "cache_name" cacheStateLabelName = "cache_state" - indexCountLabelName = "indexed_field_count" dataSourceLabelName = "data_source" importStageLabelName = "import_stage" requestScope = "scope" diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index 13f723b576389..67d70009fd098 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -125,9 +125,7 @@ var ( }, []string{ nodeIDLabelName, collectionIDLabelName, - partitionIDLabelName, segmentStateLabelName, - indexCountLabelName, segmentLevelLabelName, }) @@ -455,7 +453,6 @@ var ( collectionName, nodeIDLabelName, collectionIDLabelName, - partitionIDLabelName, segmentStateLabelName, })