Skip to content

Commit

Permalink
Add application logic for Schedule and Chain Policy within the autosc…
Browse files Browse the repository at this point in the history
…aler
  • Loading branch information
indexjoseph committed Jul 30, 2024
1 parent 840fa8b commit f50716f
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/fleetautoscalers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (c *Controller) syncFleetAutoscaler(ctx context.Context, key string) error
}

currentReplicas := fleet.Status.Replicas
desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas, fleet, c.gameServerLister, c.counter.Counts())
desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas.Spec.Policy, fleet, c.gameServerLister, c.counter.Counts())
if err != nil {
c.recorder.Eventf(fas, corev1.EventTypeWarning, "FleetAutoscaler",
"Error calculating desired fleet size on FleetAutoscaler %s. Error: %s", fas.ObjectMeta.Name, err.Error())
Expand Down
99 changes: 93 additions & 6 deletions pkg/fleetautoscalers/fleetautoscalers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"

Expand All @@ -50,17 +51,21 @@ var client = http.Client{
}

// computeDesiredFleetSize computes the new desired size of the given fleet
func computeDesiredFleetSize(fas *autoscalingv1.FleetAutoscaler, f *agonesv1.Fleet,
func computeDesiredFleetSize(pol autoscalingv1.FleetAutoscalerPolicy, f *agonesv1.Fleet,
gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) {
switch fas.Spec.Policy.Type {
switch pol.Type {
case autoscalingv1.BufferPolicyType:
return applyBufferPolicy(fas.Spec.Policy.Buffer, f)
return applyBufferPolicy(pol.Buffer, f)
case autoscalingv1.WebhookPolicyType:
return applyWebhookPolicy(fas.Spec.Policy.Webhook, f)
return applyWebhookPolicy(pol.Webhook, f)
case autoscalingv1.CounterPolicyType:
return applyCounterOrListPolicy(fas.Spec.Policy.Counter, nil, f, gameServerLister, nodeCounts)
return applyCounterOrListPolicy(pol.Counter, nil, f, gameServerLister, nodeCounts)
case autoscalingv1.ListPolicyType:
return applyCounterOrListPolicy(nil, fas.Spec.Policy.List, f, gameServerLister, nodeCounts)
return applyCounterOrListPolicy(nil, pol.List, f, gameServerLister, nodeCounts)
case autoscalingv1.SchedulePolicyType:
return applySchedulePolicy(pol.Schedule, f, gameServerLister, nodeCounts)
case autoscalingv1.ChainPolicyType:
return applyChainPolicy(pol.Chain, f, gameServerLister, nodeCounts)
}

return 0, false, errors.New("wrong policy type, should be one of: Buffer, Webhook, Counter, List")
Expand Down Expand Up @@ -362,6 +367,88 @@ func applyCounterOrListPolicy(c *autoscalingv1.CounterPolicy, l *autoscalingv1.L
return 0, false, errors.Errorf("unable to apply ListPolicy %v", l)
}

func applySchedulePolicy(s *autoscalingv1.SchedulePolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) {
// Ensure the scheduled autoscaler feature gate is enabled
if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) {
return 0, false, errors.Errorf("cannot apply SchedulePolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler)
}

if isScheduleActive(s) {
return computeDesiredFleetSize(s.Policy, f, gameServerLister, nodeCounts)
}

return f.Status.Replicas, false, nil
}

func applyChainPolicy(c autoscalingv1.ChainPolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) {
// Ensure the scheduled autoscaler feature gate is enabled
if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) {
return 0, false, errors.Errorf("cannot apply ChainPolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler)
}

// Loop over all entries in the chain
for _, entry := range c {
switch entry.Type {
case autoscalingv1.SchedulePolicyType:
schedRep, schedLim, schedErr := applySchedulePolicy(entry.Schedule, f, gameServerLister, nodeCounts)
// If the schedule is active and no error was returned from the policy, then return the replicas, limited and error
if isScheduleActive(entry.Schedule) && schedErr == nil {
return schedRep, schedLim, schedErr
}
case autoscalingv1.WebhookPolicyType:
webhookRep, webhookLim, webhookErr := applyWebhookPolicy(entry.Webhook, f)
if webhookErr == nil {
return webhookRep, webhookLim, webhookErr
}
default:
return computeDesiredFleetSize(entry.FleetAutoscalerPolicy, f, gameServerLister, nodeCounts)
}
}

return f.Status.Replicas, false, nil
}

// isScheduleActive checks if a chain entry's is active and returns a boolean, true if active, false otherwise
func isScheduleActive(s *autoscalingv1.SchedulePolicy) bool {
now := time.Now()
scheduleDelta := time.Minute * -1

// If a start time is present and the current time is before the start time, the schedule is inactive so return false
startTime := s.Between.Start.Time
if !startTime.IsZero() && now.Before(startTime) {
return false
}

// If an end time is present and the current time is after the end time, the schedule is inactive so return false
endTime := s.Between.End.Time
if !endTime.IsZero() && now.After(endTime) {
return false
}

// If no startCron field is specified, then it's automatically true (duration is no longer relevant since we're always running)
if s.ActivePeriod.StartCron == "" {
return true
}

location, _ := time.LoadLocation(s.ActivePeriod.Timezone)
startCron, _ := cron.ParseStandard(s.ActivePeriod.StartCron)
nextStart := startCron.Next(now.In(location)).Add(scheduleDelta)
duration, err := time.ParseDuration(s.ActivePeriod.Duration)

// If there's an err, then the duration field is empty, meaning duration is indefinite
if err != nil {
duration = 0 // Indefinite duration if not set
}

// If the current time is after the next start time, and the duration is indefinite or the current time is before the next start time + duration,
// then return true
if now.After(nextStart) && (duration == 0 || now.Before(nextStart.Add(duration))) {
return true
}

return false
}

// getSortedGameServers returns the list of Game Servers for the Fleet in the order in which the
// Game Servers would be deleted.
func getSortedGameServers(f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister,
Expand Down
2 changes: 1 addition & 1 deletion pkg/fleetautoscalers/fleetautoscalers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestComputeDesiredFleetSize(t *testing.T) {
_, cancel := agtesting.StartInformers(m, gameServers.Informer().HasSynced)
defer cancel()

replicas, limited, err := computeDesiredFleetSize(fas, f, gameServers.Lister(), nc)
replicas, limited, err := computeDesiredFleetSize(fas.Spec.Policy, f, gameServers.Lister(), nc)

if tc.expected.err != "" && assert.NotNil(t, err) {
assert.Equal(t, tc.expected.err, err.Error())
Expand Down

0 comments on commit f50716f

Please sign in to comment.