Skip to content

Commit

Permalink
Store the annotated apps in memory (#116)
Browse files Browse the repository at this point in the history
* Fix linting errors because of missing variables

* Change the progressive sync to be a multi-region object

* Sync first stage

* Set second stage apps to progressing

* Check the stages status while the second stage apps are progressing

* Add a test for adding the annotation

* Add a check to make sure the PS status is in progress after the first stage

* Add a check to make sure the PS status is in progress after the second stage

* Fix second stage cluster order

* Add setAppHealthStatusProgressing and setAppStatusCompleted helper functions

* Refactor second stage with helper functions

* Add third stage

* Fix failed stage test

* WIP: update multi-stage test with syncedAt annotation

* Eventual consistency for setting app status in tests

* Update IsStageComplete to take into account MaxTargets

* Add method to identify annotated apps

* WIP: Progress on resolving reconciliation loop bugs

* Use single namespace and different application names

* Better variable names

* Remove duplicate code

* Remove ginkgo random stuff

* Add more informative log messages

* Fix GetAppsName comment

* Fix IsStageInProgress by adding a condition on how many apps we still want to sync

* Try increasing the timeout

* Try increasing the timeout

* Always log the progressive sync object

* Restore --randomizeAllSpecs --randomizeSuites in ginkgo

* More key values in the logs and fix typo

* Remove ginkgo randomization

* Restore create/delete a namespace after each test

* Add namespace to the progressivesync log value

* Add a condition to avoid moving to the next stage and exiting the reconciliation loop

* Increment timeout to 60 seconds

* Always requeue in case of error but with a delay

* Increase the timeout to 120s

* Add more verbose logging

* Add annotations to log

* Increase to 5 minutes timeout

* Uncomment Remove Annotation

* Annotate before syncing

* Add more logging

* Add a test to make sure we set the app annotation

* Avoid deleting all the annotations if syncedAt key is missing

* Comment out remove annotation and one test

* Increase timeout

* Re-add should fail if unable to sync application test

* Re-comment out should fail if unable to sync application test

* Delete ps object at the end of test

* Semplify annotation logic

* Add logging to scheduler

* Add logic to protect againt more apps synced than maxTargets

* Add scheduler unit test

* Increase timeout for integration tests

* Log when tests pass

* Schedule progressing but out-of-sync apps as well

* Remove delete annotation logic

* Change GetSyncedAppsByStage to match only on sync status and not health status

* Switch to GetAppsByAnnotation as more generic function

* Fix IsStageFailed by passing a stage as well

* Prefer GetSyncedAppsByStage over getting the app by status and then annotation

* Prefer GetSyncedAppsByStage over getting the app by status and then annotation

* Rename variables to have a more meaningful name

* Explicity set Requeu to true and pass nil error

* Re-add TestIsStageInProgress

* Re-add TestStageComplete

* Add nil case for TestStageFailed

* Add Requeue: true along when setting RequeueAfter

* Build a docker image on every PR

* [skip ci] Minor tweaks in comments and remove the word rollout

* Store the annotated apps in memory

* Fix lint error

* Add log for the syncedAtStage map

* Restirct manager to a single namespace

* Add test for synced and healthy apps

* QUick fix tests

* Fix tests

* Fix the way we calculate the values for SyncedAppsPerStage

* Bump kind action chart

* Init syncedappsperstage map

* Add an empty stage test with percentages

* Fix merge conflict

* Introduce state manager

* Fix tests

* Fix tests

* Remove unnecessary test

* Cleanup unneeded code

Co-authored-by: Matteo Ruina <[email protected]>
Co-authored-by: Diogo Campos <[email protected]>
Co-authored-by: Nebojsa Prodana <[email protected]>
  • Loading branch information
4 people authored Jul 21, 2021
1 parent 644392f commit f647f1c
Show file tree
Hide file tree
Showing 12 changed files with 709 additions and 375 deletions.
14 changes: 7 additions & 7 deletions api/v1alpha1/progressivesync_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"
"time"

"github.com/Skyscanner/applicationset-progressive-sync/internal/utils"
"github.com/Skyscanner/applicationset-progressive-sync/internal/consts"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -22,8 +22,8 @@ func TestOwns(t *testing.T) {
Kind: "fakeKind",
Name: "fakeName",
}, {
APIVersion: utils.AppSetAPIGroup,
Kind: utils.AppSetKind,
APIVersion: consts.AppSetAPIGroup,
Kind: consts.AppSetKind,
Name: "owner-app-set",
}},
expected: true,
Expand All @@ -37,13 +37,13 @@ func TestOwns(t *testing.T) {
expected: false,
}}

ref := utils.AppSetAPIGroup
ref := consts.AppSetAPIGroup
pr := ProgressiveSync{
ObjectMeta: metav1.ObjectMeta{Name: "pr", Namespace: "namespace"},
Spec: ProgressiveSyncSpec{
SourceRef: corev1.TypedLocalObjectReference{
APIGroup: &ref,
Kind: utils.AppSetKind,
Kind: consts.AppSetKind,
Name: "owner-app-set",
},
Stages: nil,
Expand Down Expand Up @@ -142,13 +142,13 @@ func TestSetStageStatus(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
ref := utils.AppSetAPIGroup
ref := consts.AppSetAPIGroup
pr := ProgressiveSync{
ObjectMeta: metav1.ObjectMeta{Name: "pr", Namespace: "namespace"},
Spec: ProgressiveSyncSpec{
SourceRef: corev1.TypedLocalObjectReference{
APIGroup: &ref,
Kind: utils.AppSetKind,
Kind: consts.AppSetKind,
Name: "owner-app-set",
},
Stages: nil,
Expand Down
75 changes: 31 additions & 44 deletions controllers/progressivesync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

syncv1alpha1 "github.com/Skyscanner/applicationset-progressive-sync/api/v1alpha1"
"github.com/Skyscanner/applicationset-progressive-sync/internal/consts"
"github.com/Skyscanner/applicationset-progressive-sync/internal/scheduler"
"github.com/Skyscanner/applicationset-progressive-sync/internal/utils"
applicationpkg "github.com/argoproj/argo-cd/pkg/apiclient/application"
Expand Down Expand Up @@ -52,6 +53,7 @@ type ProgressiveSyncReconciler struct {
Log logr.Logger
Scheme *runtime.Scheme
ArgoCDAppClient utils.ArgoCDAppClient
StateManager utils.ProgressiveSyncStateManager
}

// +kubebuilder:rbac:groups=argoproj.skyscanner.net,resources=progressivesyncs,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -97,11 +99,16 @@ func (r *ProgressiveSyncReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{Requeue: true}, nil
}

latest := ps
var result reconcile.Result
var reconcileErr error

pss, _ := r.StateManager.Get(ps.Name)
for _, stage := range ps.Spec.Stages {
log = log.WithValues("stage", stage.Name)

ps, result, reconcileErr := r.reconcileStage(ctx, ps, stage)
if err := r.updateStatusWithRetry(ctx, &ps); err != nil {
latest, result, reconcileErr = r.reconcileStage(ctx, latest, stage, pss)
if err := r.updateStatusWithRetry(ctx, &latest); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -113,9 +120,9 @@ func (r *ProgressiveSyncReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

// Progressive sync completed
completed := ps.NewStatusCondition(syncv1alpha1.CompletedCondition, metav1.ConditionTrue, syncv1alpha1.StagesCompleteReason, "All stages completed")
apimeta.SetStatusCondition(ps.GetStatusConditions(), completed)
if err := r.updateStatusWithRetry(ctx, &ps); err != nil {
completed := latest.NewStatusCondition(syncv1alpha1.CompletedCondition, metav1.ConditionTrue, syncv1alpha1.StagesCompleteReason, "All stages completed")
apimeta.SetStatusCondition(latest.GetStatusConditions(), completed)
if err := r.updateStatusWithRetry(ctx, &latest); err != nil {
log.Error(err, "failed to update object status")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -241,7 +248,7 @@ func (r *ProgressiveSyncReconciler) requestsForSecretChange(o client.Object) []r
func (r *ProgressiveSyncReconciler) getClustersFromSelector(ctx context.Context, selector metav1.LabelSelector) (corev1.SecretList, error) {
secrets := corev1.SecretList{}

argoSelector := metav1.AddLabelToSelector(&selector, utils.ArgoCDSecretTypeLabel, utils.ArgoCDSecretTypeCluster)
argoSelector := metav1.AddLabelToSelector(&selector, consts.ArgoCDSecretTypeLabel, consts.ArgoCDSecretTypeCluster)
labels, err := metav1.LabelSelectorAsSelector(argoSelector)
if err != nil {
r.Log.Error(err, "unable to convert selector into labels")
Expand Down Expand Up @@ -322,42 +329,21 @@ func (r *ProgressiveSyncReconciler) updateStatusWithRetry(ctx context.Context, p
return retryErr
}

// setSyncedAtAnnotation sets the SyncedAt annotation for the currently progressing stage of the app
func (r *ProgressiveSyncReconciler) setSyncedAtAnnotation(ctx context.Context, app argov1alpha1.Application, stageName string) error {

log := r.Log.WithValues("stage", stageName)

retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
key := client.ObjectKeyFromObject(&app)
latest := argov1alpha1.Application{}
// markAppAsSynced marks the app as synced in the specified stage
func (r *ProgressiveSyncReconciler) markAppAsSynced(ctx context.Context, app argov1alpha1.Application, stage syncv1alpha1.ProgressiveSyncStage, pss utils.ProgressiveSyncState) error {

if err := r.Client.Get(ctx, key, &latest); err != nil {
log.Info("failed to get app when adding syncedAt annotation")
return err
}
log := r.Log.WithValues("stage", stage.Name, "app", fmt.Sprintf("%s/%s", app.Namespace, app.Name))

log = log.WithValues("app", fmt.Sprintf("%s/%s", latest.Namespace, latest.Name))
pss.MarkAppAsSynced(app, stage)

if latest.Annotations == nil {
latest.Annotations = map[string]string{
utils.ProgressiveSyncSyncedAtStageKey: stageName,
}
} else {
latest.Annotations[utils.ProgressiveSyncSyncedAtStageKey] = stageName
}
log.Info("app marked as synced")

if err := r.Client.Update(ctx, &latest); err != nil {
return err
}
log.Info("app annotated")
return nil
})
return retryErr
return nil
}

// reconcileStage reconcile a ProgressiveSyncStage
func (r *ProgressiveSyncReconciler) reconcileStage(ctx context.Context, ps syncv1alpha1.ProgressiveSync, stage syncv1alpha1.ProgressiveSyncStage) (syncv1alpha1.ProgressiveSync, reconcile.Result, error) {
log := r.Log.WithValues("progressivesync", fmt.Sprintf("%s/%s", ps.Namespace, ps.Name), "applicationset", ps.Spec.SourceRef.Name, "stage", stage.Name)
func (r *ProgressiveSyncReconciler) reconcileStage(ctx context.Context, ps syncv1alpha1.ProgressiveSync, stage syncv1alpha1.ProgressiveSyncStage, pss utils.ProgressiveSyncState) (syncv1alpha1.ProgressiveSync, reconcile.Result, error) {
log := r.Log.WithValues("progressivesync", fmt.Sprintf("%s/%s", ps.Namespace, ps.Name), "applicationset", ps.Spec.SourceRef.Name, "stage", stage.Name, "syncedAtStage", pss)

// Get the clusters to update
clusters, err := r.getClustersFromSelector(ctx, stage.Targets.Clusters.Selector)
Expand Down Expand Up @@ -388,13 +374,15 @@ func (r *ProgressiveSyncReconciler) reconcileStage(ctx context.Context, ps syncv
}
log.Info("fetched apps targeting selected clusters", "apps", utils.GetAppsName(apps))

pss.RefreshState(apps, stage)

// Get the Applications to update
scheduledApps := scheduler.Scheduler(log, apps, stage)
scheduledApps := scheduler.Scheduler(log, apps, stage, pss)

for _, s := range scheduledApps {
log.Info("syncing app", "app", fmt.Sprintf("%s/%s", s.Namespace, s.Name), "sync.status", s.Status.Sync.Status, "health.status", s.Status.Health.Status)

_, err := r.syncApp(ctx, s.Name)
_, err = r.syncApp(ctx, s.Name)

if err != nil {
if !strings.Contains(err.Error(), "another operation is already in progress") {
Expand All @@ -409,18 +397,17 @@ func (r *ProgressiveSyncReconciler) reconcileStage(ctx context.Context, ps syncv
}
log.Info("failed to sync app because it is already syncing")
}

err = r.setSyncedAtAnnotation(ctx, s, stage.Name)
err := r.markAppAsSynced(ctx, s, stage, pss)
if err != nil {
message := "failed at setSyncedAtAnnotation"
message := "failed at markAppAsSynced"
log.Error(err, message, "message", err.Error())

return ps, ctrl.Result{RequeueAfter: RequeueDelayOnError}, err
}

}

if scheduler.IsStageFailed(apps, stage) {
if scheduler.IsStageFailed(apps, stage, pss) {
message := fmt.Sprintf("%s stage failed", stage.Name)
log.Info(message)

Expand All @@ -433,12 +420,12 @@ func (r *ProgressiveSyncReconciler) reconcileStage(ctx context.Context, ps syncv
return ps, ctrl.Result{Requeue: true, RequeueAfter: RequeueDelayOnError}, nil
}

if scheduler.IsStageInProgress(apps, stage) {
if scheduler.IsStageInProgress(apps, stage, pss) {
message := fmt.Sprintf("%s stage in progress", stage.Name)
log.Info(message)

for _, app := range apps {
log.Info("application details", "app", fmt.Sprintf("%s/%s", app.Namespace, app.Name), "annotations", app.GetAnnotations(), "sync.status", app.Status.Sync.Status, "health.status", app.Status.Health.Status)
log.Info("application details", "app", fmt.Sprintf("%s/%s", app.Namespace, app.Name), "sync.status", app.Status.Sync.Status, "health.status", app.Status.Health.Status)
}

r.updateStageStatus(ctx, stage.Name, message, syncv1alpha1.PhaseProgressing, &ps)
Expand All @@ -450,7 +437,7 @@ func (r *ProgressiveSyncReconciler) reconcileStage(ctx context.Context, ps syncv
return ps, ctrl.Result{Requeue: true}, nil
}

if scheduler.IsStageComplete(apps, stage) {
if scheduler.IsStageComplete(apps, stage, pss) {
message := fmt.Sprintf("%s stage completed", stage.Name)
log.Info(message)

Expand Down
Loading

0 comments on commit f647f1c

Please sign in to comment.