Skip to content

Commit

Permalink
webhook: support elasticquota enable update resource key
Browse files Browse the repository at this point in the history
Signed-off-by: lijunxin <[email protected]>
  • Loading branch information
lijunxin559 committed Feb 6, 2025
1 parent 6f6ef82 commit 83ca151
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 119 deletions.
6 changes: 6 additions & 0 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ const (
// to belong to the users and will not be preempted back.
ElasticQuotaGuaranteeUsage featuregate.Feature = "ElasticQuotaGuaranteeUsage"

// ElasticQuotaEnableUpdateResourceKey allows to update resource key in standard operation
// when delete resource type: from child to parent
// when add resource type: from parent to child
ElasticQuotaEnableUpdateResourceKey featuregate.Feature = "ElasticQuotaEnableUpdateResourceKey"

// DisableDefaultQuota disable default quota.
DisableDefaultQuota featuregate.Feature = "DisableDefaultQuota"

Expand All @@ -94,6 +99,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
MultiQuotaTree: {Default: false, PreRelease: featuregate.Alpha},
ElasticQuotaIgnorePodOverhead: {Default: false, PreRelease: featuregate.Alpha},
ElasticQuotaGuaranteeUsage: {Default: false, PreRelease: featuregate.Alpha},
ElasticQuotaEnableUpdateResourceKey: {Default: false, PreRelease: featuregate.Alpha},
DisableDefaultQuota: {Default: false, PreRelease: featuregate.Alpha},
SupportParentQuotaSubmitPod: {Default: false, PreRelease: featuregate.Alpha},
EnableQuotaAdmission: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
25 changes: 18 additions & 7 deletions pkg/scheduler/plugins/elasticquota/core/group_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,22 +563,34 @@ func (gqm *GroupQuotaManager) updateOneGroupSharedWeightNoLock(quotaInfo *QuotaI
gqm.runtimeQuotaCalculatorMap[quotaInfo.ParentName].updateOneGroupSharedWeight(quotaInfo)
}

// updateResourceKeyNoLock based on quotaInfo.CalculateInfo.Max of self
// Note: RootQuotaName need to be updated as allResourceKeys
func (gqm *GroupQuotaManager) updateResourceKeyNoLock() {
// collect all dimensions
resourceKeys := make(map[v1.ResourceName]struct{})
allResourceKeys := make(map[v1.ResourceName]struct{})
for quotaName, quotaInfo := range gqm.quotaInfoMap {
if quotaName == extension.DefaultQuotaName || quotaName == extension.SystemQuotaName {
if quotaName == extension.RootQuotaName || quotaName == extension.DefaultQuotaName || quotaName == extension.SystemQuotaName {
continue
}
resourceKeys := make(map[v1.ResourceName]struct{})
for resName := range quotaInfo.CalculateInfo.Max {
allResourceKeys[resName] = struct{}{}
resourceKeys[resName] = struct{}{}
}
// update right now
if runtimeQuotaCalculator, ok := gqm.runtimeQuotaCalculatorMap[quotaName]; ok && runtimeQuotaCalculator != nil && !reflect.DeepEqual(resourceKeys, runtimeQuotaCalculator.resourceKeys) {
runtimeQuotaCalculator.updateResourceKeys(resourceKeys)
}
}

if !reflect.DeepEqual(resourceKeys, gqm.resourceKeys) {
gqm.resourceKeys = resourceKeys
for _, runtimeQuotaCalculator := range gqm.runtimeQuotaCalculatorMap {
runtimeQuotaCalculator.updateResourceKeys(resourceKeys)
// keep special ones same
if !reflect.DeepEqual(allResourceKeys, gqm.resourceKeys) {
gqm.resourceKeys = allResourceKeys
quotas := []string{extension.RootQuotaName, extension.DefaultQuotaName, extension.SystemQuotaName}
for _, quotaName := range quotas {
if runtimeQuotaCalculator, ok := gqm.runtimeQuotaCalculatorMap[quotaName]; ok && runtimeQuotaCalculator != nil {
runtimeQuotaCalculator.updateResourceKeys(allResourceKeys)
}
}
}
}
Expand Down Expand Up @@ -1019,7 +1031,6 @@ func (gqm *GroupQuotaManager) updateQuotaInternalNoLock(newQuotaInfo, oldQuotaIn

// update resource keys
gqm.updateResourceKeyNoLock()

oldMin := v1.ResourceList{}
if oldQuotaInfo != nil {
oldMin = oldQuotaInfo.CalculateInfo.Min
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import (
)

const (
GigaByte = 1024 * 1048576
GigaByte = 1024 * 1048576
ExtendedResourceKeyXCPU = "x-cpu"
)

func TestGroupQuotaManager_QuotaAdd(t *testing.T) {
Expand Down Expand Up @@ -151,6 +152,7 @@ func TestGroupQuotaManager_UpdateQuota(t *testing.T) {
}

func TestGroupQuotaManager_UpdateQuotaInternalAndRequest(t *testing.T) {
// add resource to node
gqm := NewGroupQuotaManagerForTest()
deltaRes := createResourceList(96, 160*GigaByte)
gqm.UpdateClusterTotalResource(deltaRes)
Expand All @@ -160,23 +162,61 @@ func TestGroupQuotaManager_UpdateQuotaInternalAndRequest(t *testing.T) {

AddQuotaToManager(t, gqm, "test1", extension.RootQuotaName, 96, 160*GigaByte, 50, 80*GigaByte, true, false)

// test1 request[120, 290] runtime == maxQuota
// request[120, 290] > maxQuota, runtime == maxQuota
request := createResourceList(120, 290*GigaByte)
gqm.updateGroupDeltaRequestNoLock("test1", request, request, 0)
runtime := gqm.RefreshRuntime("test1")
assert.Equal(t, deltaRes, runtime)
expectCurrentRuntime := deltaRes
assert.Equal(t, expectCurrentRuntime, runtime)

// update resourceKey
quota1 := CreateQuota("test1", extension.RootQuotaName, 64, 100*GigaByte, 60, 90*GigaByte, true, false)
quota1.Labels[extension.LabelQuotaIsParent] = "false"
err := gqm.UpdateQuota(quota1, false)
assert.Nil(t, err)
quotaInfo := gqm.GetQuotaInfoByName("test1")
assert.Equal(t, createResourceList(64, 100*GigaByte), quotaInfo.CalculateInfo.Max)
runtime = gqm.RefreshRuntime("test1")
expectCurrentRuntime = createResourceList(64, 100*GigaByte)
assert.Equal(t, expectCurrentRuntime, runtime)

// added max ExtendedResourceKeyXCPU without node resource added
// runtime.ExtendedResourceKeyXCPU = 0
request[ExtendedResourceKeyXCPU] = *resource.NewQuantity(80, resource.DecimalSI)
gqm.updateGroupDeltaRequestNoLock("test1", request, request, 0)
xCPUQuantity := resource.NewQuantity(100, resource.DecimalSI)
quota1.Spec.Max[ExtendedResourceKeyXCPU] = *xCPUQuantity
maxJson, err := json.Marshal(quota1.Spec.Max)
assert.Nil(t, err)
quota1.Annotations[extension.AnnotationSharedWeight] = string(maxJson)
gqm.UpdateQuota(quota1, false)
quotaInfo = gqm.quotaInfoMap["test1"]
assert.True(t, quotaInfo != nil)
assert.Equal(t, *xCPUQuantity, quotaInfo.CalculateInfo.Max[ExtendedResourceKeyXCPU])
runtime = gqm.RefreshRuntime("test1")
assert.Equal(t, createResourceList(64, 100*GigaByte), runtime)
}
expectCurrentRuntime[ExtendedResourceKeyXCPU] = resource.Quantity{Format: resource.DecimalSI}
assert.Equal(t, expectCurrentRuntime, runtime)

// add ExtendedResourceKeyXCPU to node resource
deltaRes[ExtendedResourceKeyXCPU] = *xCPUQuantity
gqm.UpdateClusterTotalResource(deltaRes)
runtime = gqm.RefreshRuntime("test1")
expectCurrentRuntime[ExtendedResourceKeyXCPU] = *resource.NewQuantity(80, resource.DecimalSI)
assert.Equal(t, expectCurrentRuntime, runtime)

// delete max ExtendedResourceKeyXCPU
delete(quota1.Spec.Max, ExtendedResourceKeyXCPU)
maxJson, err = json.Marshal(quota1.Spec.Max)
assert.Nil(t, err)
quota1.Annotations[extension.AnnotationSharedWeight] = string(maxJson)
gqm.UpdateQuota(quota1, false)
quotaInfo = gqm.quotaInfoMap["test1"]
assert.True(t, quotaInfo != nil)
assert.Equal(t, resource.Quantity{}, quotaInfo.CalculateInfo.Max[ExtendedResourceKeyXCPU])
runtime = gqm.RefreshRuntime("test1")
delete(expectCurrentRuntime, ExtendedResourceKeyXCPU)
assert.Equal(t, expectCurrentRuntime, runtime)
}
func TestGroupQuotaManager_DeleteOneGroup(t *testing.T) {
gqm := NewGroupQuotaManagerForTest()
gqm.UpdateClusterTotalResource(createResourceList(1000, 1000*GigaByte))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (qt *quotaTree) iterationForRedistribution(totalRes, totalSharedWeight int6
// if totalSharedWeight is not larger than 0, no need to iterate anymore.
return
}

needAdjustQuotaNodes := make([]*quotaNode, 0)
toPartitionResource, needAdjustTotalSharedWeight := int64(0), int64(0)
for _, node := range nodes {
Expand Down
152 changes: 132 additions & 20 deletions pkg/scheduler/plugins/elasticquota/core/runtime_quota_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ import (
"github.com/koordinator-sh/koordinator/apis/thirdparty/scheduler-plugins/pkg/apis/scheduling/v1alpha1"
)

const (
TestNode1 = "node1"
TestNode2 = "node2"
TestNode3 = "node3"
TestNode4 = "node4"
)

func TestQuotaInfo_GetLimitRequest(t *testing.T) {
max := createResourceList(100, 10000)
req := createResourceList(1000, 1000)
Expand Down Expand Up @@ -129,29 +136,134 @@ func createElasticQuota() *v1alpha1.ElasticQuota {
return eQ
}

func TestRuntimeQuotaCalculator_Iteration4AdjustQuota(t *testing.T) {
qtw := NewRuntimeQuotaCalculator("testTreeName")
resourceKey := make(map[corev1.ResourceName]struct{})
cpu := corev1.ResourceCPU
resourceKey[cpu] = struct{}{}
qtw.updateResourceKeys(resourceKey)
qtw.quotaTree[cpu].insert("node1", 40, 5, 10, 0, true)
qtw.quotaTree[cpu].insert("node2", 60, 20, 15, 0, true)
qtw.quotaTree[cpu].insert("node3", 50, 40, 20, 0, true)
qtw.quotaTree[cpu].insert("node4", 80, 70, 15, 0, true)
qtw.totalResource = corev1.ResourceList{}
qtw.totalResource[corev1.ResourceCPU] = *resource.NewMilliQuantity(100, resource.DecimalSI)
qtw.calculateRuntimeNoLock()
if qtw.globalRuntimeVersion == 0 {
t.Error("error")
func TestRuntimeQuotaCalculator_IterationAdjustQuota(t *testing.T) {
type quotaNodeInfo = struct {
groupName string
sharedWeight int64
request int64
min int64
guarantee int64
allowLentResource bool
}
if qtw.quotaTree[cpu].quotaNodes["node1"].runtimeQuota != 5 ||
qtw.quotaTree[cpu].quotaNodes["node2"].runtimeQuota != 20 ||
qtw.quotaTree[cpu].quotaNodes["node3"].runtimeQuota != 35 ||
qtw.quotaTree[cpu].quotaNodes["node4"].runtimeQuota != 40 {
t.Error("error")
node1 := &quotaNodeInfo{
groupName: TestNode1,
sharedWeight: 40,
request: 5,
min: 10,
guarantee: 0,
allowLentResource: true,
}
node2 := &quotaNodeInfo{
groupName: TestNode2,
sharedWeight: 60,
request: 20,
min: 15,
guarantee: 0,
allowLentResource: true,
}
node3 := &quotaNodeInfo{
groupName: TestNode3,
sharedWeight: 50,
request: 40,
min: 20,
guarantee: 0,
allowLentResource: true,
}
node4 := &quotaNodeInfo{
groupName: TestNode4,
sharedWeight: 80,
request: 70,
min: 15,
guarantee: 0,
allowLentResource: true,
}
node4_1 := &quotaNodeInfo{
groupName: TestNode4,
sharedWeight: 0,
request: 70,
min: 15,
guarantee: 0,
allowLentResource: true,
}
node4_2 := &quotaNodeInfo{
groupName: TestNode4,
sharedWeight: 0,
request: 70,
min: 15,
guarantee: 45,
allowLentResource: true,
}

testCases := []struct {
name string
totalResource corev1.ResourceList
nodes []*quotaNodeInfo
expectedRuntimeMp map[string]map[corev1.ResourceName]int64
}{
{
name: "case1-no-guarantee",
totalResource: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
},
nodes: []*quotaNodeInfo{node1, node2, node3, node4},
expectedRuntimeMp: map[string]map[corev1.ResourceName]int64{
TestNode1: {corev1.ResourceCPU: 5},
TestNode2: {corev1.ResourceCPU: 20},
TestNode3: {corev1.ResourceCPU: 35},
TestNode4: {corev1.ResourceCPU: 40},
},
},
{
name: "case2-node4.sharedWeight=0",
totalResource: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
},
nodes: []*quotaNodeInfo{node1, node2, node3, node4_1},
expectedRuntimeMp: map[string]map[corev1.ResourceName]int64{
TestNode1: {corev1.ResourceCPU: 5},
TestNode2: {corev1.ResourceCPU: 20},
TestNode3: {corev1.ResourceCPU: 40},
TestNode4: {corev1.ResourceCPU: 15},
},
},
{
name: "case3-node4.guarantee>min",
totalResource: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
},
nodes: []*quotaNodeInfo{node1, node2, node3, node4_2},
expectedRuntimeMp: map[string]map[corev1.ResourceName]int64{
TestNode1: {corev1.ResourceCPU: 5},
TestNode2: {corev1.ResourceCPU: 20},
TestNode3: {corev1.ResourceCPU: 30},
TestNode4: {corev1.ResourceCPU: 45},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
qtw := NewRuntimeQuotaCalculator("testTreeName")
resourceKey := make(map[corev1.ResourceName]struct{})
for key, value := range tc.totalResource {
if !value.IsZero() {
resourceKey[key] = struct{}{}
}
}
qtw.updateResourceKeys(resourceKey)
qtw.totalResource = tc.totalResource
for _, node := range tc.nodes {
for resKey := range resourceKey {
qtw.quotaTree[resKey].insert(node.groupName, node.sharedWeight, node.request, node.min, node.guarantee, node.allowLentResource)
}
}
qtw.calculateRuntimeNoLock()
for node, rq := range tc.expectedRuntimeMp {
for resKey, q := range rq {
assert.Equal(t, q, qtw.quotaTree[resKey].quotaNodes[node].runtimeQuota)
}
}
})
}
}

func createQuotaInfoWithRes(name string, max, min corev1.ResourceList) *QuotaInfo {
Expand Down
41 changes: 39 additions & 2 deletions pkg/webhook/elasticquota/quota_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (qt *quotaTopology) ValidDeleteQuota(quota *v1alpha1.ElasticQuota) error {
return nil
}

// fillQuotaDefaultInformation fills quota with default information if not configure
// fillQuotaDefaultInformation fills quota with default information if not be configured
func (qt *quotaTopology) fillQuotaDefaultInformation(quota *v1alpha1.ElasticQuota) error {
if quota.Name == extension.RootQuotaName {
return nil
Expand Down Expand Up @@ -235,8 +235,20 @@ func (qt *quotaTopology) fillQuotaDefaultInformation(quota *v1alpha1.ElasticQuot
if sharedWeight, exist := quota.Annotations[extension.AnnotationSharedWeight]; !exist || len(sharedWeight) == 0 {
quota.Annotations[extension.AnnotationSharedWeight] = string(maxQuota)
klog.V(5).Infof("fill quota %v sharedWeight as max", quota.Name)
} else {
sharedWeightRL := make(corev1.ResourceList)
err = json.Unmarshal([]byte(sharedWeight), &sharedWeightRL)
if err != nil {
return fmt.Errorf("fillDefaultQuotaInfo unmarshal sharedWeight failed:%v", err)
}
if fixedSharedWeight(sharedWeightRL, quota.Spec.Max) {
fixedSharedWeightRL, err := json.Marshal(&sharedWeightRL)
if err != nil {
return fmt.Errorf("fillDefaultQuotaInfo marshal fixedSharedWeight max failed:%v", err)
}
quota.Annotations[extension.AnnotationSharedWeight] = string(fixedSharedWeightRL)
}
}

return nil
}

Expand Down Expand Up @@ -286,3 +298,28 @@ func (qt *quotaTopology) getQuotaInfo(name, namespace string) *QuotaInfo {
}
return nil
}

// fixedSharedWeight keep keys in sharedWeight and maxQuota same
// if key in maxQuota not included in sharedWeight, add key/value in sharedWeight
// if key in sharedWeight not included in maxQuota, delete key/value in sharedWeight
// if fixed, return true
func fixedSharedWeight(sharedWeight, maxQuota corev1.ResourceList) bool {
fixed := false
for key, value := range maxQuota {
if _, ok := sharedWeight[key]; !ok {
sharedWeight[key] = value
fixed = true
}
}
toDeleted := make([]corev1.ResourceName, 0)
for key := range sharedWeight {
if _, ok := maxQuota[key]; !ok {
toDeleted = append(toDeleted, key)
}
}
for _, key := range toDeleted {
fixed = true
delete(sharedWeight, key)
}
return fixed
}
Loading

0 comments on commit 83ca151

Please sign in to comment.