Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds autoscaling logic for new Chain and Schedule policies #3929

Merged
merged 8 commits into from
Aug 7, 2024
10 changes: 3 additions & 7 deletions pkg/apis/autoscaling/v1/fleetautoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,13 +581,9 @@ func (c *ChainPolicy) ValidateChainPolicy(fldPath *field.Path) field.ErrorList {
seenIDs[entry.ID] = true
}
// Ensure that chain entry has a policy
hasValidPolicy := entry.Buffer == nil && entry.Webhook == nil && entry.Counter == nil && entry.List == nil && entry.Schedule == nil
if entry.Type == "" || hasValidPolicy {
allErrs = append(allErrs, field.Required(fldPath.Index(i), "policy is missing"))
}
// Ensure the chain entry's policy is not a chain policy (to avoid nested chain policies)
if entry.Chain != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Index(i), entry.FleetAutoscalerPolicy.Type, "chain policy cannot be used in chain policy"))
hasValidPolicy := entry.Buffer != nil || entry.Webhook != nil || entry.Counter != nil || entry.List != nil || entry.Schedule != nil
if entry.Type == "" || !hasValidPolicy {
allErrs = append(allErrs, field.Required(fldPath.Index(i), "valid policy is missing"))
}
// Validate the chain entry's policy
allErrs = append(allErrs, entry.FleetAutoscalerPolicy.ValidatePolicy(fldPath.Index(i).Child("policy"))...)
Expand Down
4 changes: 1 addition & 3 deletions pkg/apis/autoscaling/v1/fleetautoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,6 @@ func TestFleetAutoscalerScheduleValidateUpdate(t *testing.T) {
},
"end time before start time": {
fas: modifiedFAS(func(fap *FleetAutoscalerPolicy) {
mustParseDate("3999-06-15T15:59:59Z")
mustParseDate("3999-05-15T15:59:59Z")
fap.Schedule.Between.Start = mustParseDate("3999-06-15T15:59:59Z")
fap.Schedule.Between.End = mustParseDate("3999-05-15T15:59:59Z")
}),
Expand Down Expand Up @@ -588,7 +586,7 @@ func TestFleetAutoscalerChainValidateUpdate(t *testing.T) {
}
}),
featureFlags: string(runtime.FeatureScheduledAutoscaler) + "=true",
wantLength: 2,
wantLength: 1,
wantField: "spec.policy.chain[1]",
},
"invalid nested policy format": {
Expand Down
5 changes: 3 additions & 2 deletions pkg/fleetautoscalers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,9 @@ func (c *Controller) syncFleetAutoscaler(ctx context.Context, key string) error
}

currentReplicas := fleet.Status.Replicas
desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas, fleet, c.gameServerLister, c.counter.Counts())
if err != nil {
desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas.Spec.Policy, fleet, c.gameServerLister, c.counter.Counts())
// If there err is nil and not an inactive schedule error (ignorable in this case), then record the event
if err != nil && !errors.Is(err, InactiveScheduleError{}) {
c.recorder.Eventf(fas, corev1.EventTypeWarning, "FleetAutoscaler",
"Error calculating desired fleet size on FleetAutoscaler %s. Error: %s", fas.ObjectMeta.Name, err.Error())

Expand Down
130 changes: 124 additions & 6 deletions pkg/fleetautoscalers/fleetautoscalers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"

Expand All @@ -41,6 +42,8 @@ import (
"agones.dev/agones/pkg/util/runtime"
)

const maxDuration = "2540400h" // 290 Years

var tlsConfig = &tls.Config{}
var client = http.Client{
Timeout: 15 * time.Second,
Expand All @@ -49,18 +52,29 @@ var client = http.Client{
},
}

// InactiveScheduleError denotes an error for schedules that are not currently active.
type InactiveScheduleError struct{}

func (InactiveScheduleError) Error() string {
return "inactive schedule, policy not applicable"
}

// computeDesiredFleetSize computes the new desired size of the given fleet
func computeDesiredFleetSize(fas *autoscalingv1.FleetAutoscaler, f *agonesv1.Fleet,
func computeDesiredFleetSize(pol autoscalingv1.FleetAutoscalerPolicy, f *agonesv1.Fleet,
gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) {
switch fas.Spec.Policy.Type {
switch pol.Type {
case autoscalingv1.BufferPolicyType:
return applyBufferPolicy(fas.Spec.Policy.Buffer, f)
return applyBufferPolicy(pol.Buffer, f)
case autoscalingv1.WebhookPolicyType:
return applyWebhookPolicy(fas.Spec.Policy.Webhook, f)
return applyWebhookPolicy(pol.Webhook, f)
case autoscalingv1.CounterPolicyType:
return applyCounterOrListPolicy(fas.Spec.Policy.Counter, nil, f, gameServerLister, nodeCounts)
return applyCounterOrListPolicy(pol.Counter, nil, f, gameServerLister, nodeCounts)
case autoscalingv1.ListPolicyType:
return applyCounterOrListPolicy(nil, fas.Spec.Policy.List, f, gameServerLister, nodeCounts)
return applyCounterOrListPolicy(nil, pol.List, f, gameServerLister, nodeCounts)
case autoscalingv1.SchedulePolicyType:
return applySchedulePolicy(pol.Schedule, f, gameServerLister, nodeCounts, time.Now())
case autoscalingv1.ChainPolicyType:
return applyChainPolicy(pol.Chain, f, gameServerLister, nodeCounts, time.Now())
}

return 0, false, errors.New("wrong policy type, should be one of: Buffer, Webhook, Counter, List")
Expand Down Expand Up @@ -362,6 +376,110 @@ func applyCounterOrListPolicy(c *autoscalingv1.CounterPolicy, l *autoscalingv1.L
return 0, false, errors.Errorf("unable to apply ListPolicy %v", l)
}

func applySchedulePolicy(s *autoscalingv1.SchedulePolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount, currentTime time.Time) (int32, bool, error) {
// Ensure the scheduled autoscaler feature gate is enabled
if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) {
return 0, false, errors.Errorf("cannot apply SchedulePolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler)
}

if isScheduleActive(s, currentTime) {
return computeDesiredFleetSize(s.Policy, f, gameServerLister, nodeCounts)
}

// If the schedule wasn't active then return the current replica amount of the fleet
return f.Status.Replicas, false, &InactiveScheduleError{}
}

func applyChainPolicy(c autoscalingv1.ChainPolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount, currentTime time.Time) (int32, bool, error) {
// Ensure the scheduled autoscaler feature gate is enabled
if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) {
return 0, false, errors.Errorf("cannot apply ChainPolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler)
}

replicas := f.Status.Replicas
var limited bool
var err error

// Loop over all entries in the chain
for _, entry := range c {
switch entry.Type {
case autoscalingv1.SchedulePolicyType:
replicas, limited, err = applySchedulePolicy(entry.Schedule, f, gameServerLister, nodeCounts, currentTime)
// If no error was returned from the schedule policy (schedule is active and/or webhook policy within schedule was successful), then return the values given
if err == nil {
return replicas, limited, nil
}
zmerlynn marked this conversation as resolved.
Show resolved Hide resolved
case autoscalingv1.WebhookPolicyType:
replicas, limited, err = applyWebhookPolicy(entry.Webhook, f)
// If no error was returned from the webhook policy, then return the values given
if err == nil {
return replicas, limited, nil
}
zmerlynn marked this conversation as resolved.
Show resolved Hide resolved
default:
// Every other policy type we just want to compute the desired fleet and return it
return computeDesiredFleetSize(entry.FleetAutoscalerPolicy, f, gameServerLister, nodeCounts)
}

}

// Fall off the chain
return replicas, limited, err
}

// isScheduleActive checks if a chain entry's is active and returns a boolean, true if active, false otherwise
func isScheduleActive(s *autoscalingv1.SchedulePolicy, currentTime time.Time) bool {
// Used for checking ahead of the schedule for daylight savings purposes
cronDelta := (time.Minute * -1) + (time.Second * -30)
indexjoseph marked this conversation as resolved.
Show resolved Hide resolved

// If the current time is before the start time, the schedule is inactive so return false
startTime := s.Between.Start.Time
if currentTime.Before(startTime) {
return false
}

// If an end time is present and the current time is after the end time, the schedule is inactive so return false
endTime := s.Between.End.Time
if !endTime.IsZero() && currentTime.After(endTime) {
return false
}

// If no startCron field is specified, then it's automatically true (duration is no longer relevant since we're always running)
if s.ActivePeriod.StartCron == "" {
return true
}

// Ignore the error as validation is already done within the validateChainPolicy after being unmarshalled
location, _ := time.LoadLocation(s.ActivePeriod.Timezone)

// Ignore the error as validation is already done within the validateChainPolicy after being unmarshalled
startCron, _ := cron.ParseStandard(s.ActivePeriod.StartCron)

// Ignore the error as validation is already done within the validateChainPolicy after being unmarshalled.
// If the duration is empty set it to the largest duration possible (290 years)
duration, _ := time.ParseDuration(s.ActivePeriod.Duration)
if s.ActivePeriod.Duration == "" {
duration, _ = time.ParseDuration(maxDuration)
}

// Get the current time - duration
currentTimeMinusDuration := currentTime.Add(duration * -1)
// Take (current time - duration) to get the first available start time
cronStartTime := startCron.Next(currentTimeMinusDuration.In(location))
// Take the (cronStartTime + duration) to get the end time
cronEndTime := cronStartTime.Add(duration)

// If the current time is after the cronStartTime - 90 seconds (for daylight saving purposes) AND the current time before the cronEndTime
// then return true
// Example: startCron = 0 14 * * * // 2:00 PM Everyday | duration = 1 hr | cronDelta = 90 seconds | currentTime = 2024-08-01T14:30:00Z | currentTimeMinusDuration = 2024-08-01T13:30:00Z
// then cronStartTime = 2024-08-01T14:00:00Z and cronEndTime = 2024-08-01T15:00:00Z
// and since currentTime > cronStartTime + cronDelta AND currentTime < cronEndTime, we return true
if currentTime.After(cronStartTime.Add(cronDelta)) && currentTime.Before(cronEndTime) {
return true
}

return false
}

// getSortedGameServers returns the list of Game Servers for the Fleet in the order in which the
// Game Servers would be deleted.
func getSortedGameServers(f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister,
Expand Down
Loading
Loading