diff --git a/.gitignore b/.gitignore index d8ccf433..8bdefb31 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ cover.out /vendor /.vendor-new +__debug* .vscode .idea .DS_Store diff --git a/cmd/provider/main.go b/cmd/provider/main.go index c73217ba..566a56ce 100644 --- a/cmd/provider/main.go +++ b/cmd/provider/main.go @@ -97,7 +97,8 @@ func main() { enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool() enableManagementPolicies = app.Flag("enable-management-policies", "Enable support for Management Policies.").Default("false").Envar("ENABLE_MANAGEMENT_POLICIES").Bool() - autoPauseBucket = app.Flag("auto-pause-bucket", "Enable auto pause of reconciliation of ready buckets").Default("false").Envar("AUTO_PAUSE_BUCKET").Bool() + autoPauseBucket = app.Flag("auto-pause-bucket", "Enable auto pause of reconciliation of ready buckets").Default("false").Envar("AUTO_PAUSE_BUCKET").Bool() + recreateMissingBucket = app.Flag("recreate-missing-bucket", "Recreates existing bucket if missing").Default("false").Envar("RECREATE_MISSING_BUCKET").Bool() assumeRoleArn = app.Flag("assume-role-arn", "Assume role ARN to be used for STS authentication").Default("").Envar("ASSUME_ROLE_ARN").String() @@ -272,6 +273,11 @@ func main() { backendStore := backendstore.NewBackendStore() + kubeClientUncached, err := client.New(cfg, client.Options{ + Scheme: providerSCheme, + }) + kingpin.FatalIfError(err, "Cannot create Kube client") + kingpin.FatalIfError(ctrl.NewWebhookManagedBy(mgr). For(&providercephv1alpha1.Bucket{}). WithValidator(bucket.NewBucketValidator(backendStore)). @@ -286,7 +292,8 @@ func main() { healthcheck.NewController( healthcheck.WithAutoPause(autoPauseBucket), healthcheck.WithBackendStore(backendStore), - healthcheck.WithKubeClient(mgr.GetClient()), + healthcheck.WithKubeClientUncached(kubeClientUncached), + healthcheck.WithKubeClientCached(mgr.GetClient()), healthcheck.WithLogger(o.Logger))), "Cannot setup ProviderConfig controllers") @@ -299,6 +306,7 @@ func main() { kingpin.FatalIfError(bucket.Setup(mgr, o, bucket.NewConnector( bucket.WithAutoPause(autoPauseBucket), + bucket.WithRecreateMissingBucket(recreateMissingBucket), bucket.WithBackendStore(backendStore), bucket.WithKubeClient(mgr.GetClient()), bucket.WithOperationTimeout(*reconcileTimeout), diff --git a/internal/controller/bucket/connector.go b/internal/controller/bucket/connector.go index f19c4987..d7056084 100644 --- a/internal/controller/bucket/connector.go +++ b/internal/controller/bucket/connector.go @@ -16,17 +16,18 @@ import ( // A Connector is expected to produce an ExternalClient when its Connect method // is called. type Connector struct { - kube client.Client - autoPauseBucket bool - backendStore *backendstore.BackendStore - subresourceClients []SubresourceClient - s3ClientHandler *s3clienthandler.Handler - log logging.Logger - operationTimeout time.Duration - creationGracePeriod time.Duration - pollInterval time.Duration - usage resource.Tracker - newServiceFn func(creds []byte) (interface{}, error) + kube client.Client + autoPauseBucket bool + recreateMissingBucket bool + backendStore *backendstore.BackendStore + subresourceClients []SubresourceClient + s3ClientHandler *s3clienthandler.Handler + log logging.Logger + operationTimeout time.Duration + creationGracePeriod time.Duration + pollInterval time.Duration + usage resource.Tracker + newServiceFn func(creds []byte) (interface{}, error) } func NewConnector(options ...func(*Connector)) *Connector { @@ -50,6 +51,12 @@ func WithAutoPause(a *bool) func(*Connector) { } } +func WithRecreateMissingBucket(a *bool) func(*Connector) { + return func(c *Connector) { + c.recreateMissingBucket = *a + } +} + func WithOperationTimeout(t time.Duration) func(*Connector) { return func(c *Connector) { c.operationTimeout = t @@ -110,24 +117,26 @@ func (c *Connector) Connect(ctx context.Context, mg resource.Managed) (managed.E } return &external{ - kubeClient: c.kube, - autoPauseBucket: c.autoPauseBucket, - operationTimeout: c.operationTimeout, - backendStore: c.backendStore, - subresourceClients: c.subresourceClients, - s3ClientHandler: c.s3ClientHandler, - log: c.log}, + kubeClient: c.kube, + autoPauseBucket: c.autoPauseBucket, + recreateMissingBucket: c.recreateMissingBucket, + operationTimeout: c.operationTimeout, + backendStore: c.backendStore, + subresourceClients: c.subresourceClients, + s3ClientHandler: c.s3ClientHandler, + log: c.log}, nil } // external observes, then either creates, updates, or deletes an external // resource to ensure it reflects the managed resource's desired state. type external struct { - kubeClient client.Client - autoPauseBucket bool - operationTimeout time.Duration - backendStore *backendstore.BackendStore - subresourceClients []SubresourceClient - s3ClientHandler *s3clienthandler.Handler - log logging.Logger + kubeClient client.Client + autoPauseBucket bool + recreateMissingBucket bool + operationTimeout time.Duration + backendStore *backendstore.BackendStore + subresourceClients []SubresourceClient + s3ClientHandler *s3clienthandler.Handler + log logging.Logger } diff --git a/internal/controller/bucket/consts.go b/internal/controller/bucket/consts.go index 58c86bd5..19b49544 100644 --- a/internal/controller/bucket/consts.go +++ b/internal/controller/bucket/consts.go @@ -19,4 +19,6 @@ const ( // Lifecycle configuration error messages. errObserveLifecycleConfig = "failed to observe bucket lifecycle configuration" errHandleLifecycleConfig = "failed to handle bucket lifecycle configuration" + + True = "true" ) diff --git a/internal/controller/bucket/create.go b/internal/controller/bucket/create.go index 9003218f..9636dde2 100644 --- a/internal/controller/bucket/create.go +++ b/internal/controller/bucket/create.go @@ -18,6 +18,7 @@ import ( "github.com/linode/provider-ceph/internal/consts" "github.com/linode/provider-ceph/internal/otel/traces" "github.com/linode/provider-ceph/internal/rgw" + "github.com/linode/provider-ceph/internal/utils" ) //nolint:maintidx,gocognit,gocyclo,cyclop,nolintlint // Function requires numerous checks. @@ -49,22 +50,18 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext return managed.ExternalCreation{}, err } - if len(bucket.Spec.Providers) == 0 { - bucket.Spec.Providers = c.backendStore.GetAllActiveBackendNames() - } + providerNames := utils.GetBucketProvidersFilterDisabledLabel(bucket, c.backendStore.GetAllActiveBackendNames()) // Create the bucket on each backend in a separate go routine - activeBackends := c.backendStore.GetActiveBackends(bucket.Spec.Providers) + activeBackends := c.backendStore.GetActiveBackends(providerNames) if len(activeBackends) == 0 { err := errors.New(errNoActiveS3Backends) traces.SetAndRecordError(span, err) return managed.ExternalCreation{}, err - } else if len(activeBackends) != len(bucket.Spec.Providers) { - err := errors.New(errMissingS3Backend) - traces.SetAndRecordError(span, err) - - return managed.ExternalCreation{}, err + } else if len(activeBackends) != len(providerNames) { + c.log.Info("Missing S3 backends", consts.KeyBucketName, bucket.Name, "providers", providerNames, "activeBackends", activeBackends) + traces.SetAndRecordError(span, errors.New(errMissingS3Backend)) } // This value shows a bucket on one backend is already created. @@ -165,7 +162,7 @@ func (c *external) waitForCreationAndUpdateBucketCR(ctx context.Context, bucket if bucketLatest.ObjectMeta.Labels == nil { bucketLatest.ObjectMeta.Labels = map[string]string{} } - bucketLatest.ObjectMeta.Labels[v1alpha1.BackendLabelPrefix+beName] = "" + bucketLatest.ObjectMeta.Labels[v1alpha1.BackendLabelPrefix+beName] = True return NeedsObjectUpdate }, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired { diff --git a/internal/controller/bucket/create_test.go b/internal/controller/bucket/create_test.go index ebfeabbb..4442d344 100644 --- a/internal/controller/bucket/create_test.go +++ b/internal/controller/bucket/create_test.go @@ -76,47 +76,6 @@ func TestCreateBasicErrors(t *testing.T) { err: errors.New(errNoActiveS3Backends), }, }, - "S3 backend reference inactive": { - fields: fields{ - backendStore: func() *backendstore.BackendStore { - bs := backendstore.NewBackendStore() - bs.AddOrUpdateBackend("s3-backend-0", nil, nil, true, apisv1alpha1.HealthStatusUnknown) - bs.AddOrUpdateBackend("s3-backend-1", nil, nil, false, apisv1alpha1.HealthStatusUnknown) - - return bs - }(), - }, - args: args{ - mg: &v1alpha1.Bucket{ - Spec: v1alpha1.BucketSpec{ - Providers: []string{"s3-backend-0", "s3-backend-1"}, - }, - }, - }, - want: want{ - err: errors.New(errMissingS3Backend), - }, - }, - "S3 backend reference missing": { - fields: fields{ - backendStore: func() *backendstore.BackendStore { - bs := backendstore.NewBackendStore() - bs.AddOrUpdateBackend("s3-backend-0", nil, nil, true, apisv1alpha1.HealthStatusUnknown) - - return bs - }(), - }, - args: args{ - mg: &v1alpha1.Bucket{ - Spec: v1alpha1.BucketSpec{ - Providers: []string{"s3-backend-0", "s3-backend-1"}, - }, - }, - }, - want: want{ - err: errors.New(errMissingS3Backend), - }, - }, "S3 backend not referenced and none exist": { fields: fields{ backendStore: backendstore.NewBackendStore(), diff --git a/internal/controller/bucket/delete.go b/internal/controller/bucket/delete.go index e8e31140..4e5649da 100644 --- a/internal/controller/bucket/delete.go +++ b/internal/controller/bucket/delete.go @@ -45,12 +45,17 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error { g := new(errgroup.Group) - activeBackends := bucket.Spec.Providers - if len(activeBackends) == 0 { - activeBackends = c.backendStore.GetAllActiveBackendNames() - } + providerNames := []string{} + for backendName, backend := range bucket.Status.AtProvider.Backends { + providerNames = append(providerNames, backendName) + + reason := backend.BucketCondition.Reason + if reason != xpv1.ReasonAvailable { + c.log.Info("Skipping deletion of bucket on backend, not available", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName, "status", reason) + + continue + } - for _, backendName := range activeBackends { bucketBackends.setBucketCondition(bucket.Name, backendName, xpv1.Deleting()) c.log.Info("Deleting bucket on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName) @@ -89,8 +94,7 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error { // CR spec. If the deletion is successful or unsuccessful, the bucket CR status must be // updated. if err := c.updateBucketCR(ctx, bucket, func(bucketDeepCopy, bucketLatest *v1alpha1.Bucket) UpdateRequired { - bucketLatest.Spec.Providers = activeBackends - setBucketStatus(bucketLatest, bucketBackends) + setBucketStatus(bucketLatest, bucketBackends, providerNames) return NeedsStatusUpdate }); err != nil { diff --git a/internal/controller/bucket/helpers.go b/internal/controller/bucket/helpers.go index 81df4355..7e2bfdcf 100644 --- a/internal/controller/bucket/helpers.go +++ b/internal/controller/bucket/helpers.go @@ -2,6 +2,7 @@ package bucket import ( "context" + "strings" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/errors" @@ -10,6 +11,7 @@ import ( "github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1" "github.com/linode/provider-ceph/internal/backendstore" "github.com/linode/provider-ceph/internal/consts" + "github.com/linode/provider-ceph/internal/utils" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -18,7 +20,7 @@ import ( // isBucketPaused returns true if the bucket has the paused label set. func isBucketPaused(bucket *v1alpha1.Bucket) bool { - if val, ok := bucket.Labels[meta.AnnotationKeyReconciliationPaused]; ok && val == "true" { + if val, ok := bucket.Labels[meta.AnnotationKeyReconciliationPaused]; ok && val == True { return true } @@ -30,7 +32,7 @@ func pauseBucket(bucket *v1alpha1.Bucket) { if bucket.ObjectMeta.Labels == nil { bucket.ObjectMeta.Labels = map[string]string{} } - bucket.Labels[meta.AnnotationKeyReconciliationPaused] = "true" + bucket.Labels[meta.AnnotationKeyReconciliationPaused] = True } // isPauseRequired determines if the Bucket should be paused. @@ -59,10 +61,10 @@ func isPauseRequired(b *v1alpha1.Bucket, c map[string]backendstore.S3Client, bb b.Labels[meta.AnnotationKeyReconciliationPaused] == "" } -// isBucketAvailableFromStatus checks the backends listed in Spec.Providers against the +// isBucketAvailableFromStatus checks the backends listed in providerNames against the // backends in Status to ensure buckets are considered Available on all desired backends. -func isBucketAvailableFromStatus(bucket *v1alpha1.Bucket, backendClients map[string]backendstore.S3Client) bool { - for _, backendName := range bucket.Spec.Providers { +func isBucketAvailableFromStatus(bucket *v1alpha1.Bucket, providerNames []string, backendClients map[string]backendstore.S3Client) bool { + for _, backendName := range providerNames { if _, ok := backendClients[backendName]; !ok { // This backend does not exist in the list of available backends. // The backend may be offline, so it is skipped. @@ -82,24 +84,35 @@ func isBucketAvailableFromStatus(bucket *v1alpha1.Bucket, backendClients map[str } // setBackendLabels adds label "provider-ceph.backends." to the Bucket for each backend. -func setBackendLabels(bucket *v1alpha1.Bucket) { - for _, beName := range bucket.Spec.Providers { - beLabel := v1alpha1.BackendLabelPrefix + beName +func setBackendLabels(bucket *v1alpha1.Bucket, providerNames []string) { + if bucket.ObjectMeta.Labels == nil { + bucket.ObjectMeta.Labels = map[string]string{} + } + + labelsToDelete := []string{} + for k := range bucket.ObjectMeta.Labels { + if strings.HasPrefix(k, v1alpha1.BackendLabelPrefix) && bucket.ObjectMeta.Labels[k] == True { + labelsToDelete = append(labelsToDelete, k) + } + } + for _, k := range labelsToDelete { + delete(bucket.ObjectMeta.Labels, k) + } + + for _, beName := range providerNames { + beLabel := utils.GetBackendLabel(beName) if _, ok := bucket.ObjectMeta.Labels[beLabel]; ok { continue } - if bucket.ObjectMeta.Labels == nil { - bucket.ObjectMeta.Labels = map[string]string{} - } - bucket.ObjectMeta.Labels[beLabel] = "" + bucket.ObjectMeta.Labels[beLabel] = True } } -func setBucketStatus(bucket *v1alpha1.Bucket, bucketBackends *bucketBackends) { +func setBucketStatus(bucket *v1alpha1.Bucket, bucketBackends *bucketBackends, providerNames []string) { bucket.Status.SetConditions(xpv1.Unavailable()) - backends := bucketBackends.getBackends(bucket.Name, bucket.Spec.Providers) + backends := bucketBackends.getBackends(bucket.Name, providerNames) bucket.Status.AtProvider.Backends = backends for _, backend := range backends { diff --git a/internal/controller/bucket/observe.go b/internal/controller/bucket/observe.go index 7ef8c5ad..98f96b37 100644 --- a/internal/controller/bucket/observe.go +++ b/internal/controller/bucket/observe.go @@ -16,6 +16,7 @@ import ( "github.com/linode/provider-ceph/internal/consts" "github.com/linode/provider-ceph/internal/otel/traces" "github.com/linode/provider-ceph/internal/rgw" + "github.com/linode/provider-ceph/internal/utils" ) //nolint:gocyclo,cyclop // Function requires numerous checks. @@ -67,13 +68,17 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex } // If no Providers are specified in the Bucket Spec, the bucket is to be created on all backends. - if len(bucket.Spec.Providers) == 0 { - bucket.Spec.Providers = c.backendStore.GetAllActiveBackendNames() + providerNames := utils.GetBucketProvidersFilterDisabledLabel(bucket, c.backendStore.GetAllActiveBackendNames()) + if len(providerNames) == 0 { + err := errors.New(errNoActiveS3Backends) + traces.SetAndRecordError(span, err) + + return managed.ExternalObservation{}, err } - backendClients := c.backendStore.GetBackendS3Clients(bucket.Spec.Providers) + backendClients := c.backendStore.GetBackendS3Clients(providerNames) // Check that the Bucket CR is Available according to its Status backends. - if !isBucketAvailableFromStatus(bucket, backendClients) { + if !isBucketAvailableFromStatus(bucket, providerNames, backendClients) { return managed.ExternalObservation{ ResourceExists: true, ResourceUpToDate: false, @@ -82,7 +87,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex // Observe sub-resources for the Bucket to check if they too are up to date. for _, subResourceClient := range c.subresourceClients { - obs, err := subResourceClient.Observe(ctx, bucket, bucket.Spec.Providers) + obs, err := subResourceClient.Observe(ctx, bucket, providerNames) if err != nil { err := errors.Wrap(err, errObserveSubresource) traces.SetAndRecordError(span, err) diff --git a/internal/controller/bucket/update.go b/internal/controller/bucket/update.go index 9bf296d6..b1457861 100644 --- a/internal/controller/bucket/update.go +++ b/internal/controller/bucket/update.go @@ -21,6 +21,7 @@ import ( "github.com/linode/provider-ceph/internal/consts" "github.com/linode/provider-ceph/internal/otel/traces" "github.com/linode/provider-ceph/internal/rgw" + "github.com/linode/provider-ceph/internal/utils" ) func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.ExternalUpdate, error) { @@ -44,33 +45,28 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext return managed.ExternalUpdate{}, c.Delete(ctx, mg) } - if len(bucket.Spec.Providers) == 0 { - bucket.Spec.Providers = c.backendStore.GetAllActiveBackendNames() - } + providerNames := utils.GetBucketProvidersFilterDisabledLabel(bucket, c.backendStore.GetAllActiveBackendNames()) - activeBackends := c.backendStore.GetActiveBackends(bucket.Spec.Providers) + activeBackends := c.backendStore.GetActiveBackends(providerNames) if len(activeBackends) == 0 { err := errors.New(errNoActiveS3Backends) traces.SetAndRecordError(span, err) return managed.ExternalUpdate{}, err - } else if len(activeBackends) != len(bucket.Spec.Providers) { - err := errors.New(errMissingS3Backend) - traces.SetAndRecordError(span, err) - - return managed.ExternalUpdate{}, err + } else if len(activeBackends) != len(providerNames) { + c.log.Info("Missing S3 backends", consts.KeyBucketName, bucket.Name, "providers", providerNames, "activeBackends", activeBackends) + traces.SetAndRecordError(span, errors.New(errMissingS3Backend)) } bucketBackends := newBucketBackends() - updateAllErr := c.updateOnAllBackends(ctx, bucket, bucketBackends) + updateAllErr := c.updateOnAllBackends(ctx, bucket, bucketBackends, providerNames) // Whether buckets are updated successfully or not on backends, we need to update the // Bucket CR Status in all cases to represent the conditions of each individual bucket. if err := c.updateBucketCR(ctx, bucket, func(bucketDeepCopy, bucketLatest *v1alpha1.Bucket) UpdateRequired { - bucketLatest.Spec.Providers = bucketDeepCopy.Spec.Providers - setBucketStatus(bucketLatest, bucketBackends) + setBucketStatus(bucketLatest, bucketBackends, providerNames) return NeedsStatusUpdate }); err != nil { @@ -90,17 +86,15 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext // Bucket CR Spec accordingly. err := c.updateBucketCR(ctx, bucket, func(bucketDeepCopy, bucketLatest *v1alpha1.Bucket) UpdateRequired { - bucketLatest.Spec.Providers = bucketDeepCopy.Spec.Providers - // Auto pause the Bucket CR if required. - cls := c.backendStore.GetBackendS3Clients(bucketLatest.Spec.Providers) + cls := c.backendStore.GetBackendS3Clients(providerNames) if isPauseRequired(bucketLatest, cls, bucketBackends, c.autoPauseBucket) { c.log.Info("Auto pausing bucket", consts.KeyBucketName, bucket.Name) pauseBucket(bucketLatest) } // Add labels for backends if they don't exist. - setBackendLabels(bucket) + setBackendLabels(bucket, providerNames) controllerutil.AddFinalizer(bucketLatest, v1alpha1.InUseFinalizer) @@ -116,15 +110,15 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext return managed.ExternalUpdate{}, nil } -func (c *external) updateOnAllBackends(ctx context.Context, bucket *v1alpha1.Bucket, bb *bucketBackends) error { +func (c *external) updateOnAllBackends(ctx context.Context, bucket *v1alpha1.Bucket, bb *bucketBackends, providerNames []string) error { ctx, span := otel.Tracer("").Start(ctx, "updateOnAllBackends") defer span.End() - defer setBucketStatus(bucket, bb) + defer setBucketStatus(bucket, bb, providerNames) g := new(errgroup.Group) - for backendName := range c.backendStore.GetActiveBackends(bucket.Spec.Providers) { + for backendName := range c.backendStore.GetActiveBackends(providerNames) { if !c.backendStore.IsBackendActive(backendName) { c.log.Info("Backend is marked inactive - bucket will not be updated on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName) @@ -139,55 +133,68 @@ func (c *external) updateOnAllBackends(ctx context.Context, bucket *v1alpha1.Buc continue } - beName := backendName - g.Go(func() error { - c.log.Info("Updating bucket on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, beName) - bucketExists, err := rgw.BucketExists(ctx, cl, bucket.Name) - if err != nil { - c.log.Info("Error occurred attempting HeadBucket", "err", err.Error(), consts.KeyBucketName, bucket.Name, consts.KeyBackendName, beName) - bb.setBucketCondition(bucket.Name, beName, xpv1.Unavailable().WithMessage(err.Error())) + g.Go(c.updateOnBackend(ctx, backendName, bucket, cl, bb)) + } - return err - } - if !bucketExists { + if err := g.Wait(); err != nil { + traces.SetAndRecordError(span, err) + + return err + } + + return nil +} + +func (c *external) updateOnBackend(ctx context.Context, beName string, bucket *v1alpha1.Bucket, cl backendstore.S3Client, bb *bucketBackends) func() error { + return func() error { + c.log.Info("Updating bucket on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, beName) + bucketExists, err := rgw.BucketExists(ctx, cl, bucket.Name) + if err != nil { + c.log.Info("Error occurred attempting HeadBucket", "err", err.Error(), consts.KeyBucketName, bucket.Name, consts.KeyBackendName, beName) + bb.setBucketCondition(bucket.Name, beName, xpv1.Unavailable().WithMessage(err.Error())) + + return err + } + if !bucketExists { + if !c.recreateMissingBucket { bb.deleteBackend(bucket.Name, beName) return nil } - err = c.updateOnBackend(ctx, cl, bucket, beName, bb) + _, err := rgw.CreateBucket(ctx, cl, rgw.BucketToCreateBucketInput(bucket)) if err != nil { - c.log.Info("Error occurred attempting to update bucket", "err", err.Error(), consts.KeyBucketName, bucket.Name, consts.KeyBackendName, beName) - bb.setBucketCondition(bucket.Name, beName, xpv1.Unavailable().WithMessage(err.Error())) + c.log.Info("Failed to recreate missing bucket on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, beName, "err", err.Error()) return err } - // Check if this backend has been marked as 'Unhealthy'. In which case the - // bucket condition must remain in 'Unavailable' for this backend. - if c.backendStore.GetBackendHealthStatus(beName) == apisv1alpha1.HealthStatusUnhealthy { - bb.setBucketCondition(bucket.Name, beName, xpv1.Unavailable().WithMessage("Backend is marked Unhealthy")) + c.log.Info("Recreated missing bucket on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, beName) + } - return nil - } - // Bucket has been successfully updated and the backend is either 'Healthy' or 'Unknown'. - // It may be 'Unknown' due to the healthcheck being disabled, in which case we can only assume - // the backend is healthy. Either way, set the bucket condition as 'Available' for this backend. - bb.setBucketCondition(bucket.Name, beName, xpv1.Available()) + err = c.doUpdateOnBackend(ctx, cl, bucket, beName, bb) + if err != nil { + c.log.Info("Error occurred attempting to update bucket", "err", err.Error(), consts.KeyBucketName, bucket.Name, consts.KeyBackendName, beName) + bb.setBucketCondition(bucket.Name, beName, xpv1.Unavailable().WithMessage(err.Error())) - return nil - }) - } + return err + } + // Check if this backend has been marked as 'Unhealthy'. In which case the + // bucket condition must remain in 'Unavailable' for this backend. + if c.backendStore.GetBackendHealthStatus(beName) == apisv1alpha1.HealthStatusUnhealthy { + bb.setBucketCondition(bucket.Name, beName, xpv1.Unavailable().WithMessage("Backend is marked Unhealthy")) - if err := g.Wait(); err != nil { - traces.SetAndRecordError(span, err) + return nil + } + // Bucket has been successfully updated and the backend is either 'Healthy' or 'Unknown'. + // It may be 'Unknown' due to the healthcheck being disabled, in which case we can only assume + // the backend is healthy. Either way, set the bucket condition as 'Available' for this backend. + bb.setBucketCondition(bucket.Name, beName, xpv1.Available()) - return err + return nil } - - return nil } -func (c *external) updateOnBackend(ctx context.Context, cl backendstore.S3Client, b *v1alpha1.Bucket, backendName string, bb *bucketBackends) error { +func (c *external) doUpdateOnBackend(ctx context.Context, cl backendstore.S3Client, b *v1alpha1.Bucket, backendName string, bb *bucketBackends) error { if s3types.ObjectOwnership(aws.ToString(b.Spec.ForProvider.ObjectOwnership)) == s3types.ObjectOwnershipBucketOwnerEnforced { _, err := cl.PutBucketAcl(ctx, rgw.BucketToPutBucketACLInput(b)) if err != nil { @@ -198,7 +205,6 @@ func (c *external) updateOnBackend(ctx context.Context, cl backendstore.S3Client //TODO: Add functionality for bucket ownership controls, using s3 apis: // - DeleteBucketOwnershipControls // - PutBucketOwnershipControls - for _, subResourceClient := range c.subresourceClients { err := subResourceClient.Handle(ctx, b, backendName, bb) if err != nil { diff --git a/internal/controller/bucket/update_test.go b/internal/controller/bucket/update_test.go index 4f2ea66a..15f197c8 100644 --- a/internal/controller/bucket/update_test.go +++ b/internal/controller/bucket/update_test.go @@ -92,36 +92,6 @@ func TestUpdateBasicErrors(t *testing.T) { err: errors.New(errNoActiveS3Backends), }, }, - "Missing backend": { - fields: fields{ - backendStore: func() *backendstore.BackendStore { - fake := backendstorefakes.FakeS3Client{ - HeadBucketStub: func(ctx context.Context, hbi *s3.HeadBucketInput, f ...func(*s3.Options)) (*s3.HeadBucketOutput, error) { - return &s3.HeadBucketOutput{}, nil - }, - } - - bs := backendstore.NewBackendStore() - bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusHealthy) - - return bs - }(), - }, - args: args{ - mg: &v1alpha1.Bucket{ - Spec: v1alpha1.BucketSpec{ - Providers: []string{ - "s3-backend-1", - "s3-backend-2", - }, - }, - }, - }, - want: want{ - o: managed.ExternalUpdate{}, - err: errors.New(errMissingS3Backend), - }, - }, } for name, tc := range cases { tc := tc @@ -412,8 +382,8 @@ func TestUpdate(t *testing.T) { assert.Equal(t, map[string]string{ - meta.AnnotationKeyReconciliationPaused: "true", - "provider-ceph.backends.s3-backend-1": "", + meta.AnnotationKeyReconciliationPaused: True, + "provider-ceph.backends.s3-backend-1": True, }, bucket.Labels, "unexpected bucket labels", diff --git a/internal/controller/providerconfig/healthcheck/healthcheck.go b/internal/controller/providerconfig/healthcheck/healthcheck.go index cd7e1b18..eb2a3c54 100644 --- a/internal/controller/providerconfig/healthcheck/healthcheck.go +++ b/internal/controller/providerconfig/healthcheck/healthcheck.go @@ -11,10 +11,11 @@ import ( ) type Controller struct { - kubeClient client.Client - backendStore *backendstore.BackendStore - log logging.Logger - autoPauseBucket bool + kubeClientUncached client.Client + kubeClientCached client.Client + backendStore *backendstore.BackendStore + log logging.Logger + autoPauseBucket bool } func NewController(options ...func(*Controller)) *Controller { @@ -26,9 +27,15 @@ func NewController(options ...func(*Controller)) *Controller { return r } -func WithKubeClient(k client.Client) func(*Controller) { +func WithKubeClientUncached(k client.Client) func(*Controller) { return func(r *Controller) { - r.kubeClient = k + r.kubeClientUncached = k + } +} + +func WithKubeClientCached(k client.Client) func(*Controller) { + return func(r *Controller) { + r.kubeClientCached = k } } diff --git a/internal/controller/providerconfig/healthcheck/healthcheck_controller.go b/internal/controller/providerconfig/healthcheck/healthcheck_controller.go index 127eae92..cac736de 100644 --- a/internal/controller/providerconfig/healthcheck/healthcheck_controller.go +++ b/internal/controller/providerconfig/healthcheck/healthcheck_controller.go @@ -57,6 +57,8 @@ const ( errBackendNotStored = "backend is not stored in backendstore" healthCheckSuffix = "-health-check" healthCheckFile = "health-check-file" + + True = "true" ) //nolint:gocyclo,cyclop // Function requires multiple checks. @@ -69,7 +71,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu bucketName := req.Name + healthCheckSuffix providerConfig := &apisv1alpha1.ProviderConfig{} - if err := c.kubeClient.Get(ctx, req.NamespacedName, providerConfig); err != nil { + if err := c.kubeClientCached.Get(ctx, req.NamespacedName, providerConfig); err != nil { if kerrors.IsNotFound(err) { // ProviderConfig has been deleted, perform cleanup. if err := c.cleanup(ctx, req, bucketName); err != nil { @@ -93,7 +95,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, nil } - if err := UpdateProviderConfigStatus(ctx, c.kubeClient, providerConfig, func(_, pcLatest *apisv1alpha1.ProviderConfig) { + if err := UpdateProviderConfigStatus(ctx, c.kubeClientCached, providerConfig, func(_, pcLatest *apisv1alpha1.ProviderConfig) { pcLatest.Status.SetConditions(v1alpha1.HealthCheckDisabled()) }); err != nil { err = errors.Wrap(err, errUpdateHealthStatus) @@ -120,7 +122,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return } - if err := UpdateProviderConfigStatus(ctx, c.kubeClient, providerConfig, func(pcDeepCopy, pcLatest *apisv1alpha1.ProviderConfig) { + if err := UpdateProviderConfigStatus(ctx, c.kubeClientCached, providerConfig, func(pcDeepCopy, pcLatest *apisv1alpha1.ProviderConfig) { pcLatest.Status.SetConditions(pcDeepCopy.Status.Conditions...) }); err != nil { err = errors.Wrap(err, errUpdateHealthStatus) @@ -277,8 +279,8 @@ func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) { // Only list Buckets that (a) were created on s3BackendName // and (b) are already paused. listLabels := labels.SelectorFromSet(labels.Set(map[string]string{ - v1alpha1.BackendLabelPrefix + s3BackendName: "true", - meta.AnnotationKeyReconciliationPaused: "true", + utils.GetBackendLabel(s3BackendName): True, + meta.AnnotationKeyReconciliationPaused: True, })) buckets := &v1alpha1.BucketList{} @@ -288,7 +290,9 @@ func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) { Factor: factor, Jitter: jitter, }, resource.IsAPIError, func() error { - return c.kubeClient.List(ctx, buckets, &client.ListOptions{LabelSelector: listLabels}) + return c.kubeClientUncached.List(ctx, buckets, &client.ListOptions{ + LabelSelector: listLabels, + }) }) if err != nil { c.log.Info("Error attempting to list Buckets on backend", "error", err.Error(), consts.KeyBackendName, s3BackendName) @@ -306,10 +310,10 @@ func (c *Controller) unpauseBuckets(ctx context.Context, s3BackendName string) { Jitter: jitter, }, resource.IsAPIError, func() error { if (c.autoPauseBucket || buckets.Items[i].Spec.AutoPause) && - buckets.Items[i].Labels[meta.AnnotationKeyReconciliationPaused] == "true" { + buckets.Items[i].Labels[meta.AnnotationKeyReconciliationPaused] == True { buckets.Items[i].Labels[meta.AnnotationKeyReconciliationPaused] = "" - return c.kubeClient.Update(ctx, &buckets.Items[i]) + return c.kubeClientCached.Update(ctx, &buckets.Items[i]) } return nil diff --git a/internal/controller/providerconfig/healthcheck/healthcheck_controller_test.go b/internal/controller/providerconfig/healthcheck/healthcheck_controller_test.go index 99054fb1..2e11159e 100644 --- a/internal/controller/providerconfig/healthcheck/healthcheck_controller_test.go +++ b/internal/controller/providerconfig/healthcheck/healthcheck_controller_test.go @@ -31,6 +31,7 @@ import ( apisv1alpha1 "github.com/linode/provider-ceph/apis/v1alpha1" "github.com/linode/provider-ceph/internal/backendstore" "github.com/linode/provider-ceph/internal/backendstore/backendstorefakes" + "github.com/linode/provider-ceph/internal/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -407,8 +408,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-1", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "true", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "true", }, }, }, @@ -416,8 +417,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-2", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "true", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "true", }, }, }, @@ -425,8 +426,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-3", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "", }, }, }, @@ -474,8 +475,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-1", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "", }, }, }, @@ -483,8 +484,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-2", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "", }, }, }, @@ -492,8 +493,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-3", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "", }, }, }, @@ -535,8 +536,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-1", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "true", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "true", }, }, }, @@ -544,8 +545,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-2", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "true", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "true", }, }, }, @@ -553,8 +554,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-3", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "", }, }, }, @@ -602,8 +603,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-1", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "", }, }, }, @@ -611,8 +612,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-2", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "", }, }, }, @@ -620,8 +621,8 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "bucket-3", Labels: map[string]string{ - v1alpha1.BackendLabelPrefix + backendName: "true", - meta.AnnotationKeyReconciliationPaused: "", + utils.GetBackendLabel(backendName): "true", + meta.AnnotationKeyReconciliationPaused: "", }, }, }, @@ -672,7 +673,8 @@ func TestReconcile(t *testing.T) { r := NewController( WithAutoPause(&tc.fields.autopause), WithBackendStore(bs), - WithKubeClient(c), + WithKubeClientUncached(c), + WithKubeClientCached(c), WithLogger(logging.NewNopLogger())) got, err := r.Reconcile(context.Background(), tc.args.req) diff --git a/internal/utils/utils.go b/internal/utils/utils.go index c554a8c6..b8aaee35 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -27,3 +27,28 @@ func MapConditionToHealthStatus(condition commonv1.Condition) apisv1alpha1.Healt return apisv1alpha1.HealthStatusUnknown } + +// GetBucketProvidersFilterDisabledLabel returns the specified providers or default providers, +// and filters out providers disabledby label. +func GetBucketProvidersFilterDisabledLabel(bucket *v1alpha1.Bucket, backends []string) []string { + providers := bucket.Spec.Providers + if len(providers) == 0 { + providers = backends + } + + okProviders := []string{} + for i := range providers { + if status, ok := bucket.Labels[GetBackendLabel(providers[i])]; ok && status != "true" { + continue + } + + okProviders = append(okProviders, providers[i]) + } + + return okProviders +} + +// GetBackendLabel renders label key for provider. +func GetBackendLabel(provider string) string { + return v1alpha1.BackendLabelPrefix + provider +}