Skip to content

Commit

Permalink
Merge pull request #693 from MaciekPytel/cherrypick_gpuexpander_1.1
Browse files Browse the repository at this point in the history
Cherrypick gpu expander fix on CA 1.1 branch
  • Loading branch information
mwielgus authored Mar 5, 2018
2 parents d559af3 + 930c210 commit d5e094d
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 1 deletion.
31 changes: 30 additions & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,23 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

apiv1 "k8s.io/api/core/v1"
kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"

"github.com/golang/glog"
)

const (
// How old the oldest unschedulable pod should be before starting scale up.
unschedulablePodTimeBuffer = 2 * time.Second
// How old the oldest unschedulable pod with GPU should be before starting scale up.
// The idea is that nodes with GPU are very expensive and we're ready to sacrifice
// a bit more latency to wait for more pods and make a more informed scale-up decision.
unschedulablePodWithGpuTimeBuffer = 30 * time.Second
)

// StaticAutoscaler is an autoscaler which has all the core functionality of a CA but without the reconfiguration feature
type StaticAutoscaler struct {
// AutoscalingContext consists of validated settings and options for this autoscaler
Expand Down Expand Up @@ -239,10 +250,19 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
glog.V(4).Info("No schedulable pods")
}

// If all pending pods are new we may want to skip a real scale down (just like if the pods were handled).
allPendingPodsToHelpAreNew := false

if len(unschedulablePodsToHelp) == 0 {
glog.V(1).Info("No unschedulable pods")
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
glog.V(1).Info("Max total nodes in cluster reached")
} else if allPodsAreNew(unschedulablePodsToHelp, currentTime) {
// The assumption here is that these pods have been created very recently and probably there
// is more pods to come. In theory we could check the newest pod time but then if pod were created
// slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time.
allPendingPodsToHelpAreNew = true
glog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else {
daemonsets, err := a.ListerRegistry.DaemonSetLister().List()
if err != nil {
Expand Down Expand Up @@ -300,7 +320,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) ||
schedulablePodsPresent ||
scaleDown.nodeDeleteStatus.IsDeleteInProgress()
scaleDown.nodeDeleteStatus.IsDeleteInProgress() ||
allPendingPodsToHelpAreNew

glog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+
"lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s schedulablePodsPresent=%v isDeleteInProgress=%v",
Expand Down Expand Up @@ -346,3 +367,11 @@ func (a *StaticAutoscaler) ExitCleanUp() {
}
utils.DeleteStatusConfigMap(a.AutoscalingContext.ClientSet, a.AutoscalingContext.ConfigNamespace)
}

func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool {
if getOldestCreateTime(pods).Add(unschedulablePodTimeBuffer).After(currentTime) {
return true
}
found, oldest := getOldestCreateTimeWithGpu(pods)
return found && oldest.Add(unschedulablePodWithGpuTimeBuffer).After(currentTime)
}
25 changes: 25 additions & 0 deletions cluster-autoscaler/core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"

Expand Down Expand Up @@ -491,3 +492,27 @@ func UpdateClusterStateMetrics(csr *clusterstate.ClusterStateRegistry) {
readiness := csr.GetClusterReadiness()
metrics.UpdateNodesCount(readiness.Ready, readiness.Unready+readiness.LongNotStarted, readiness.NotStarted)
}

func getOldestCreateTime(pods []*apiv1.Pod) time.Time {
oldest := time.Now()
for _, pod := range pods {
if oldest.After(pod.CreationTimestamp.Time) {
oldest = pod.CreationTimestamp.Time
}
}
return oldest
}

func getOldestCreateTimeWithGpu(pods []*apiv1.Pod) (bool, time.Time) {
oldest := time.Now()
gpuFound := false
for _, pod := range pods {
if gpu.PodRequestsGpu(pod) {
gpuFound = true
if oldest.After(pod.CreationTimestamp.Time) {
oldest = pod.CreationTimestamp.Time
}
}
}
return gpuFound, oldest
}
12 changes: 12 additions & 0 deletions cluster-autoscaler/core/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,15 @@ func TestGetNodeCoresAndMemory(t *testing.T) {
_, _, err = getNodeCoresAndMemory(node)
assert.Error(t, err)
}

func TestGetOldestPod(t *testing.T) {
p1 := BuildTestPod("p1", 500, 1000)
p1.CreationTimestamp = metav1.NewTime(time.Now().Add(-1 * time.Minute))
p2 := BuildTestPod("p2", 500, 1000)
p2.CreationTimestamp = metav1.NewTime(time.Now().Add(+1 * time.Minute))
p3 := BuildTestPod("p3", 500, 1000)
p3.CreationTimestamp = metav1.NewTime(time.Now())

assert.Equal(t, p1.CreationTimestamp.Time, getOldestCreateTime([]*apiv1.Pod{p1, p2, p3}))
assert.Equal(t, p1.CreationTimestamp.Time, getOldestCreateTime([]*apiv1.Pod{p3, p2, p1}))
}
22 changes: 22 additions & 0 deletions cluster-autoscaler/expander/price/price.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"

"github.com/golang/glog"
Expand Down Expand Up @@ -56,6 +57,20 @@ var (
// TODO: make it a flag
// TODO: investigate what a proper value should be
notExistCoeficient = 2.0

// This value will be used as unfitness for node groups using GPU. This serves
// 2 purposes:
// - It makes nodes with GPU extremely unattractive to expander, so it will never
// use nodes with expensive GPUs for pods that don't require it.
// - By overriding unfitness for node groups with GPU we ignore preferred cluster
// shape when comparing such node groups. Node unfitness logic is meant to
// minimize per-node cost (resources consumed by kubelet, kube-proxy, etc) and
// resource fragmentation, while avoiding putting a significant fraction of all
// pods on a single node for availability reasons.
// Those goals don't apply well to nodes with GPUs that are generally dedicated
// for specific workload and need to be optimized for GPU utilization, not CPU
// utilization.
gpuUnfitnessOverride = 1000.0
)

// NewStrategy returns an expansion strategy that picks nodes based on price and preferred node type.
Expand Down Expand Up @@ -123,6 +138,13 @@ nextoption:
// TODO: normalize node count against preferred node.
supressedUnfitness := (nodeUnfitness-1.0)*(1.0-math.Tanh(float64(option.NodeCount-1)/15.0)) + 1.0

// Set constant, very high unfitness to make them unattractive for pods that doesn't need GPU and
// avoid optimizing them for CPU utilization.
if gpu.NodeHasGpu(nodeInfo.Node()) {
glog.V(4).Infof("Price expander overriding unfitness for node group with GPU %s", option.NodeGroup.Id())
supressedUnfitness = gpuUnfitnessOverride
}

optionScore := supressedUnfitness * priceSubScore

if !option.NodeGroup.Exist() {
Expand Down
22 changes: 22 additions & 0 deletions cluster-autoscaler/utils/gpu/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,25 @@ func getUnreadyNodeCopy(node *apiv1.Node) (*apiv1.Node, error) {
newNode.Status.Conditions = newNodeConditions
return newNode, nil
}

// NodeHasGpu returns true if a given node has GPU hardware.
// The result will be true if there is hardware capability. It doesn't matter
// if the drivers are installed and GPU is ready to use.
func NodeHasGpu(node *apiv1.Node) bool {
_, hasGpuLabel := node.Labels[GPULabel]
gpuAllocatable, hasGpuAllocatable := node.Status.Allocatable[ResourceNvidiaGPU]
return hasGpuLabel || (hasGpuAllocatable && !gpuAllocatable.IsZero())
}

// PodRequestsGpu returns true if a given pod has GPU request.
func PodRequestsGpu(pod *apiv1.Pod) bool {
for _, container := range pod.Spec.Containers {
if container.Resources.Requests != nil {
_, gpuFound := container.Resources.Requests[ResourceNvidiaGPU]
if gpuFound {
return true
}
}
}
return false
}
53 changes: 53 additions & 0 deletions cluster-autoscaler/utils/gpu/gpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -149,3 +150,55 @@ func TestFilterOutNodesWithUnreadyGpus(t *testing.T) {
}
}
}

func TestNodeHasGpu(t *testing.T) {
gpuLabels := map[string]string{
GPULabel: "nvidia-tesla-k80",
}
nodeGpuReady := &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "nodeGpuReady",
Labels: gpuLabels,
},
Status: apiv1.NodeStatus{
Capacity: apiv1.ResourceList{},
Allocatable: apiv1.ResourceList{},
},
}
nodeGpuReady.Status.Allocatable[ResourceNvidiaGPU] = *resource.NewQuantity(1, resource.DecimalSI)
nodeGpuReady.Status.Capacity[ResourceNvidiaGPU] = *resource.NewQuantity(1, resource.DecimalSI)
assert.True(t, NodeHasGpu(nodeGpuReady))

nodeGpuUnready := &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "nodeGpuUnready",
Labels: gpuLabels,
},
Status: apiv1.NodeStatus{
Capacity: apiv1.ResourceList{},
Allocatable: apiv1.ResourceList{},
},
}
assert.True(t, NodeHasGpu(nodeGpuUnready))

nodeNoGpu := &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "nodeNoGpu",
Labels: map[string]string{},
},
Status: apiv1.NodeStatus{
Capacity: apiv1.ResourceList{},
Allocatable: apiv1.ResourceList{},
},
}
assert.False(t, NodeHasGpu(nodeNoGpu))
}

func TestPodRequestsGpu(t *testing.T) {
podNoGpu := test.BuildTestPod("podNoGpu", 0, 1000)
podWithGpu := test.BuildTestPod("pod1AnyGpu", 0, 1000)
podWithGpu.Spec.Containers[0].Resources.Requests[ResourceNvidiaGPU] = *resource.NewQuantity(1, resource.DecimalSI)

assert.False(t, PodRequestsGpu(podNoGpu))
assert.True(t, PodRequestsGpu(podWithGpu))
}

0 comments on commit d5e094d

Please sign in to comment.