diff --git a/pkg/fleetautoscalers/controller.go b/pkg/fleetautoscalers/controller.go index bfbfb5e4ed..8fe969b9b7 100644 --- a/pkg/fleetautoscalers/controller.go +++ b/pkg/fleetautoscalers/controller.go @@ -313,7 +313,7 @@ func (c *Controller) syncFleetAutoscaler(ctx context.Context, key string) error } currentReplicas := fleet.Status.Replicas - desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas, fleet, c.gameServerLister, c.counter.Counts()) + desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas.Spec.Policy, fleet, c.gameServerLister, c.counter.Counts()) if err != nil { c.recorder.Eventf(fas, corev1.EventTypeWarning, "FleetAutoscaler", "Error calculating desired fleet size on FleetAutoscaler %s. Error: %s", fas.ObjectMeta.Name, err.Error()) diff --git a/pkg/fleetautoscalers/fleetautoscalers.go b/pkg/fleetautoscalers/fleetautoscalers.go index 5ef77e0edf..2df38e552f 100644 --- a/pkg/fleetautoscalers/fleetautoscalers.go +++ b/pkg/fleetautoscalers/fleetautoscalers.go @@ -29,6 +29,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/robfig/cron/v3" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" @@ -50,17 +51,21 @@ var client = http.Client{ } // computeDesiredFleetSize computes the new desired size of the given fleet -func computeDesiredFleetSize(fas *autoscalingv1.FleetAutoscaler, f *agonesv1.Fleet, +func computeDesiredFleetSize(pol autoscalingv1.FleetAutoscalerPolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) { - switch fas.Spec.Policy.Type { + switch pol.Type { case autoscalingv1.BufferPolicyType: - return applyBufferPolicy(fas.Spec.Policy.Buffer, f) + return applyBufferPolicy(pol.Buffer, f) case autoscalingv1.WebhookPolicyType: - return applyWebhookPolicy(fas.Spec.Policy.Webhook, f) + return applyWebhookPolicy(pol.Webhook, f) case autoscalingv1.CounterPolicyType: - return applyCounterOrListPolicy(fas.Spec.Policy.Counter, nil, f, gameServerLister, nodeCounts) + return applyCounterOrListPolicy(pol.Counter, nil, f, gameServerLister, nodeCounts) case autoscalingv1.ListPolicyType: - return applyCounterOrListPolicy(nil, fas.Spec.Policy.List, f, gameServerLister, nodeCounts) + return applyCounterOrListPolicy(nil, pol.List, f, gameServerLister, nodeCounts) + case autoscalingv1.SchedulePolicyType: + return applySchedulePolicy(pol.Schedule, f, gameServerLister, nodeCounts) + case autoscalingv1.ChainPolicyType: + return applyChainPolicy(pol.Chain, f, gameServerLister, nodeCounts) } return 0, false, errors.New("wrong policy type, should be one of: Buffer, Webhook, Counter, List") @@ -362,6 +367,88 @@ func applyCounterOrListPolicy(c *autoscalingv1.CounterPolicy, l *autoscalingv1.L return 0, false, errors.Errorf("unable to apply ListPolicy %v", l) } +func applySchedulePolicy(s *autoscalingv1.SchedulePolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) { + // Ensure the scheduled autoscaler feature gate is enabled + if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) { + return 0, false, errors.Errorf("cannot apply SchedulePolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler) + } + + if isScheduleActive(s) { + return computeDesiredFleetSize(s.Policy, f, gameServerLister, nodeCounts) + } + + return f.Status.Replicas, false, nil +} + +func applyChainPolicy(c autoscalingv1.ChainPolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) { + // Ensure the scheduled autoscaler feature gate is enabled + if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) { + return 0, false, errors.Errorf("cannot apply ChainPolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler) + } + + // Loop over all entries in the chain + for _, entry := range c { + switch entry.Type { + case autoscalingv1.SchedulePolicyType: + schedRep, schedLim, schedErr := applySchedulePolicy(entry.Schedule, f, gameServerLister, nodeCounts) + // If the schedule is active and no error was returned from the policy, then return the replicas, limited and error + if isScheduleActive(entry.Schedule) && schedErr == nil { + return schedRep, schedLim, schedErr + } + case autoscalingv1.WebhookPolicyType: + webhookRep, webhookLim, webhookErr := applyWebhookPolicy(entry.Webhook, f) + if webhookErr == nil { + return webhookRep, webhookLim, webhookErr + } + default: + return computeDesiredFleetSize(entry.FleetAutoscalerPolicy, f, gameServerLister, nodeCounts) + } + } + + return f.Status.Replicas, false, nil +} + +// isScheduleActive checks if a chain entry's is active and returns a boolean, true if active, false otherwise +func isScheduleActive(s *autoscalingv1.SchedulePolicy) bool { + now := time.Now() + scheduleDelta := time.Minute * -1 + + // If a start time is present and the current time is before the start time, the schedule is inactive so return false + startTime := s.Between.Start.Time + if !startTime.IsZero() && now.Before(startTime) { + return false + } + + // If an end time is present and the current time is after the end time, the schedule is inactive so return false + endTime := s.Between.End.Time + if !endTime.IsZero() && now.After(endTime) { + return false + } + + // If no startCron field is specified, then it's automatically true (duration is no longer relevant since we're always running) + if s.ActivePeriod.StartCron == "" { + return true + } + + location, _ := time.LoadLocation(s.ActivePeriod.Timezone) + startCron, _ := cron.ParseStandard(s.ActivePeriod.StartCron) + nextStart := startCron.Next(now.In(location)).Add(scheduleDelta) + duration, err := time.ParseDuration(s.ActivePeriod.Duration) + + // If there's an err, then the duration field is empty, meaning duration is indefinite + if err != nil { + duration = 0 // Indefinite duration if not set + } + + // If the current time is after the next start time, and the duration is indefinite or the current time is before the next start time + duration, + // then return true + if now.After(nextStart) && (duration == 0 || now.Before(nextStart.Add(duration))) { + return true + } + + return false +} + // getSortedGameServers returns the list of Game Servers for the Fleet in the order in which the // Game Servers would be deleted. func getSortedGameServers(f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, diff --git a/pkg/fleetautoscalers/fleetautoscalers_test.go b/pkg/fleetautoscalers/fleetautoscalers_test.go index 0b1f9bee74..1aeffb6b48 100644 --- a/pkg/fleetautoscalers/fleetautoscalers_test.go +++ b/pkg/fleetautoscalers/fleetautoscalers_test.go @@ -187,7 +187,7 @@ func TestComputeDesiredFleetSize(t *testing.T) { _, cancel := agtesting.StartInformers(m, gameServers.Informer().HasSynced) defer cancel() - replicas, limited, err := computeDesiredFleetSize(fas, f, gameServers.Lister(), nc) + replicas, limited, err := computeDesiredFleetSize(fas.Spec.Policy, f, gameServers.Lister(), nc) if tc.expected.err != "" && assert.NotNil(t, err) { assert.Equal(t, tc.expected.err, err.Error())