Skip to content

Commit

Permalink
Support metrics externally ScaledJob as well as ScaledObject
Browse files Browse the repository at this point in the history
Signed-off-by: Yoon Park <[email protected]>
  • Loading branch information
yoongon committed Jan 4, 2024
1 parent 9be8ee6 commit cda367d
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 9 deletions.
14 changes: 12 additions & 2 deletions pkg/metricsservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"

"google.golang.org/grpc"
"k8s.io/metrics/pkg/apis/external_metrics"
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -41,13 +42,22 @@ type GrpcServer struct {
api.UnimplementedMetricsServiceServer
}

// GetMetrics returns metrics values in form of ExternalMetricValueList for specified ScaledObject reference
// GetMetrics returns metrics values in form of ExternalMetricValueList for specified ScaledObject and ScaledJob reference
func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*v1beta1.ExternalMetricValueList, error) {
v1beta1ExtMetrics := &v1beta1.ExternalMetricValueList{}
extMetrics, err := (*s.scalerHandler).GetScaledObjectMetrics(ctx, in.Name, in.Namespace, in.MetricName)
var extMetrics *external_metrics.ExternalMetricValueList

scaledObjectExtMetrics, err := (*s.scalerHandler).GetScaledObjectMetrics(ctx, in.Name, in.Namespace, in.MetricName)
if err != nil {
return v1beta1ExtMetrics, fmt.Errorf("error when getting metric values %w", err)
}
extMetrics.Items = append(extMetrics.Items, scaledObjectExtMetrics.Items...)

scaledJobExtMetrics, err := (*s.scalerHandler).GetScaledJobMetrics(ctx, in.Name, in.Namespace, in.MetricName)
if err != nil {
return v1beta1ExtMetrics, fmt.Errorf("error when getting metric values %w", err)
}
extMetrics.Items = append(extMetrics.Items, scaledJobExtMetrics.Items...)

err = v1beta1.Convert_external_metrics_ExternalMetricValueList_To_v1beta1_ExternalMetricValueList(extMetrics, v1beta1ExtMetrics, nil)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions pkg/mock/mock_scaling/mock_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var log = logf.Log.WithName("scalers_cache")

type ScalersCache struct {
ScaledObject *kedav1alpha1.ScaledObject
ScaledJob *kedav1alpha1.ScaledJob
Scalers []ScalerBuilder
ScalableObjectGeneration int64
Recorder record.EventRecorder
Expand Down
51 changes: 44 additions & 7 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type ScaleHandler interface {
ClearScalersCache(ctx context.Context, scalableObject interface{}) error

GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error)
GetScaledJobMetrics(ctx context.Context, scaledJobName, scaleJobNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error)
}

type scaleHandler struct {
Expand Down Expand Up @@ -281,13 +282,21 @@ func (h *scaleHandler) GetScalersCache(ctx context.Context, scalableObject inter
}

// getScalersCacheForScaledObject returns cache for input ScaledObject, referenced by name and namespace
// we don't need to compare the Generation, because this method should be called only inside scale loop, where we have up to date object.
// we don't need to compare the Generation, because this method should be called only inside scale loop, where we have up-to date object.
func (h *scaleHandler) getScalersCacheForScaledObject(ctx context.Context, scaledObjectName, scaledObjectNamespace string) (*cache.ScalersCache, error) {
key := kedav1alpha1.GenerateIdentifier("ScaledObject", scaledObjectNamespace, scaledObjectName)

return h.performGetScalersCache(ctx, key, nil, nil, "ScaledObject", scaledObjectNamespace, scaledObjectName)
}

// getScalersCacheForScaledJob returns cache for input ScaledJob, referenced by name and namespace
// we don't need to compare the Generation, because this method should be called only inside scale loop, where we have up-to date object.
func (h *scaleHandler) getScalersCacheForScaledJob(ctx context.Context, scaledObjectName, scaledObjectNamespace string) (*cache.ScalersCache, error) {
key := kedav1alpha1.GenerateIdentifier("ScaledJob", scaledObjectNamespace, scaledObjectName)

return h.performGetScalersCache(ctx, key, nil, nil, "ScaledJob", scaledObjectNamespace, scaledObjectName)
}

// performGetScalersCache returns cache for input scalableObject, it is common code used by GetScalersCache() and getScalersCacheForScaledObject() methods
func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, scalableObject interface{}, scalableObjectGeneration *int64, scalableObjectKind, scalableObjectNamespace, scalableObjectName string) (*cache.ScalersCache, error) {
h.scalerCachesLock.RLock()
Expand Down Expand Up @@ -374,6 +383,8 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s
newCache.CompiledFormula = program
}
newCache.ScaledObject = obj
case *kedav1alpha1.ScaledJob:
newCache.ScaledJob = obj
default:
}

Expand Down Expand Up @@ -748,18 +759,39 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler,
}

// / --------------------------------------------------------------------------- ///
// / ---------- ScaledJob related methods --------- ///
// / ---------- ScaledJob related methods --------- ///
// / --------------------------------------------------------------------------- ///

// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace.
// GetScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace.
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics {
func (h *scaleHandler) GetScaledJobMetrics(ctx context.Context, scaledJobName, scaledJobNamespace, metricsName string) (*external_metrics.ExternalMetricValueList, error) {
cache, err := h.getScalersCacheForScaledJob(ctx, scaledJobName, scaledJobNamespace)
if err != nil {
return nil, fmt.Errorf("error getting scalers %w", err)
}

var scaledJob *kedav1alpha1.ScaledJob
if cache.ScaledJob != nil {
scaledJob = cache.ScaledJob
} else {
err := fmt.Errorf("scaledJob not found in the cache")
return nil, err
}
_, externalMetricValueList := h.getScaledJobMetrics(ctx, scaledJob, metricsName)
return externalMetricValueList, nil
}

// getScaledJobMetrics returns metrics for the input ScaledJob:
// the first return internal metrics to decide the scaledJob is active
// the second return external metrics for specified metric name for a ScaledJob
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, metricsName string) ([]scaledjob.ScalerMetrics, *external_metrics.ExternalMetricValueList) {
cache, err := h.GetScalersCache(ctx, scaledJob)
if err != nil {
log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
return nil
return nil, nil
}
var scalersMetrics []scaledjob.ScalerMetrics
var matchingMetrics []external_metrics.ExternalMetricValue
scalers, _ := cache.GetScalers()
for i, s := range scalers {
isActive := false
Expand All @@ -784,6 +816,9 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
if isTriggerActive {
isActive = true
}
if metricSpecs[0].External.Metric.Name == metricsName {
matchingMetrics = append(matchingMetrics, metrics...)
}

queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount())

Expand All @@ -795,7 +830,9 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
IsActive: isActive,
})
}
return scalersMetrics
return scalersMetrics, &external_metrics.ExternalMetricValueList{
Items: matchingMetrics,
}
}

// isScaledJobActive returns whether the input ScaledJob:
Expand All @@ -804,7 +841,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
logger := logf.Log.WithName("scalemetrics")

scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob)
scalersMetrics, _ := h.getScaledJobMetrics(ctx, scaledJob, "")
isActive, queueLength, maxValue, maxFloatValue :=
scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount())

Expand Down

0 comments on commit cda367d

Please sign in to comment.