Skip to content

Commit

Permalink
Fix unpause buckets and recreate if missing (#167)
Browse files Browse the repository at this point in the history
* Fix unpause buckets and recreate if missing

* Disable backend via label

* Delete buckets by status instead of default providers

* Typo fixes

---------

Co-authored-by: Richard Kovacs <[email protected]>
  • Loading branch information
mhmxs and Richard Kovacs authored Feb 27, 2024
1 parent 6acd1c2 commit bc319e2
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 227 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
cover.out
/vendor
/.vendor-new
__debug*
.vscode
.idea
.DS_Store
Expand Down
12 changes: 10 additions & 2 deletions cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func main() {
enableExternalSecretStores = app.Flag("enable-external-secret-stores", "Enable support for ExternalSecretStores.").Default("false").Envar("ENABLE_EXTERNAL_SECRET_STORES").Bool()
enableManagementPolicies = app.Flag("enable-management-policies", "Enable support for Management Policies.").Default("false").Envar("ENABLE_MANAGEMENT_POLICIES").Bool()

autoPauseBucket = app.Flag("auto-pause-bucket", "Enable auto pause of reconciliation of ready buckets").Default("false").Envar("AUTO_PAUSE_BUCKET").Bool()
autoPauseBucket = app.Flag("auto-pause-bucket", "Enable auto pause of reconciliation of ready buckets").Default("false").Envar("AUTO_PAUSE_BUCKET").Bool()
recreateMissingBucket = app.Flag("recreate-missing-bucket", "Recreates existing bucket if missing").Default("false").Envar("RECREATE_MISSING_BUCKET").Bool()

assumeRoleArn = app.Flag("assume-role-arn", "Assume role ARN to be used for STS authentication").Default("").Envar("ASSUME_ROLE_ARN").String()

Expand Down Expand Up @@ -272,6 +273,11 @@ func main() {

backendStore := backendstore.NewBackendStore()

kubeClientUncached, err := client.New(cfg, client.Options{
Scheme: providerSCheme,
})
kingpin.FatalIfError(err, "Cannot create Kube client")

kingpin.FatalIfError(ctrl.NewWebhookManagedBy(mgr).
For(&providercephv1alpha1.Bucket{}).
WithValidator(bucket.NewBucketValidator(backendStore)).
Expand All @@ -286,7 +292,8 @@ func main() {
healthcheck.NewController(
healthcheck.WithAutoPause(autoPauseBucket),
healthcheck.WithBackendStore(backendStore),
healthcheck.WithKubeClient(mgr.GetClient()),
healthcheck.WithKubeClientUncached(kubeClientUncached),
healthcheck.WithKubeClientCached(mgr.GetClient()),
healthcheck.WithLogger(o.Logger))),
"Cannot setup ProviderConfig controllers")

Expand All @@ -299,6 +306,7 @@ func main() {

kingpin.FatalIfError(bucket.Setup(mgr, o, bucket.NewConnector(
bucket.WithAutoPause(autoPauseBucket),
bucket.WithRecreateMissingBucket(recreateMissingBucket),
bucket.WithBackendStore(backendStore),
bucket.WithKubeClient(mgr.GetClient()),
bucket.WithOperationTimeout(*reconcileTimeout),
Expand Down
59 changes: 34 additions & 25 deletions internal/controller/bucket/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ import (
// A Connector is expected to produce an ExternalClient when its Connect method
// is called.
type Connector struct {
kube client.Client
autoPauseBucket bool
backendStore *backendstore.BackendStore
subresourceClients []SubresourceClient
s3ClientHandler *s3clienthandler.Handler
log logging.Logger
operationTimeout time.Duration
creationGracePeriod time.Duration
pollInterval time.Duration
usage resource.Tracker
newServiceFn func(creds []byte) (interface{}, error)
kube client.Client
autoPauseBucket bool
recreateMissingBucket bool
backendStore *backendstore.BackendStore
subresourceClients []SubresourceClient
s3ClientHandler *s3clienthandler.Handler
log logging.Logger
operationTimeout time.Duration
creationGracePeriod time.Duration
pollInterval time.Duration
usage resource.Tracker
newServiceFn func(creds []byte) (interface{}, error)
}

func NewConnector(options ...func(*Connector)) *Connector {
Expand All @@ -50,6 +51,12 @@ func WithAutoPause(a *bool) func(*Connector) {
}
}

func WithRecreateMissingBucket(a *bool) func(*Connector) {
return func(c *Connector) {
c.recreateMissingBucket = *a
}
}

func WithOperationTimeout(t time.Duration) func(*Connector) {
return func(c *Connector) {
c.operationTimeout = t
Expand Down Expand Up @@ -110,24 +117,26 @@ func (c *Connector) Connect(ctx context.Context, mg resource.Managed) (managed.E
}

return &external{
kubeClient: c.kube,
autoPauseBucket: c.autoPauseBucket,
operationTimeout: c.operationTimeout,
backendStore: c.backendStore,
subresourceClients: c.subresourceClients,
s3ClientHandler: c.s3ClientHandler,
log: c.log},
kubeClient: c.kube,
autoPauseBucket: c.autoPauseBucket,
recreateMissingBucket: c.recreateMissingBucket,
operationTimeout: c.operationTimeout,
backendStore: c.backendStore,
subresourceClients: c.subresourceClients,
s3ClientHandler: c.s3ClientHandler,
log: c.log},
nil
}

// external observes, then either creates, updates, or deletes an external
// resource to ensure it reflects the managed resource's desired state.
type external struct {
kubeClient client.Client
autoPauseBucket bool
operationTimeout time.Duration
backendStore *backendstore.BackendStore
subresourceClients []SubresourceClient
s3ClientHandler *s3clienthandler.Handler
log logging.Logger
kubeClient client.Client
autoPauseBucket bool
recreateMissingBucket bool
operationTimeout time.Duration
backendStore *backendstore.BackendStore
subresourceClients []SubresourceClient
s3ClientHandler *s3clienthandler.Handler
log logging.Logger
}
2 changes: 2 additions & 0 deletions internal/controller/bucket/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ const (
// Lifecycle configuration error messages.
errObserveLifecycleConfig = "failed to observe bucket lifecycle configuration"
errHandleLifecycleConfig = "failed to handle bucket lifecycle configuration"

True = "true"
)
17 changes: 7 additions & 10 deletions internal/controller/bucket/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/linode/provider-ceph/internal/consts"
"github.com/linode/provider-ceph/internal/otel/traces"
"github.com/linode/provider-ceph/internal/rgw"
"github.com/linode/provider-ceph/internal/utils"
)

//nolint:maintidx,gocognit,gocyclo,cyclop,nolintlint // Function requires numerous checks.
Expand Down Expand Up @@ -49,22 +50,18 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
return managed.ExternalCreation{}, err
}

if len(bucket.Spec.Providers) == 0 {
bucket.Spec.Providers = c.backendStore.GetAllActiveBackendNames()
}
providerNames := utils.GetBucketProvidersFilterDisabledLabel(bucket, c.backendStore.GetAllActiveBackendNames())

// Create the bucket on each backend in a separate go routine
activeBackends := c.backendStore.GetActiveBackends(bucket.Spec.Providers)
activeBackends := c.backendStore.GetActiveBackends(providerNames)
if len(activeBackends) == 0 {
err := errors.New(errNoActiveS3Backends)
traces.SetAndRecordError(span, err)

return managed.ExternalCreation{}, err
} else if len(activeBackends) != len(bucket.Spec.Providers) {
err := errors.New(errMissingS3Backend)
traces.SetAndRecordError(span, err)

return managed.ExternalCreation{}, err
} else if len(activeBackends) != len(providerNames) {
c.log.Info("Missing S3 backends", consts.KeyBucketName, bucket.Name, "providers", providerNames, "activeBackends", activeBackends)
traces.SetAndRecordError(span, errors.New(errMissingS3Backend))
}

// This value shows a bucket on one backend is already created.
Expand Down Expand Up @@ -165,7 +162,7 @@ func (c *external) waitForCreationAndUpdateBucketCR(ctx context.Context, bucket
if bucketLatest.ObjectMeta.Labels == nil {
bucketLatest.ObjectMeta.Labels = map[string]string{}
}
bucketLatest.ObjectMeta.Labels[v1alpha1.BackendLabelPrefix+beName] = ""
bucketLatest.ObjectMeta.Labels[v1alpha1.BackendLabelPrefix+beName] = True

return NeedsObjectUpdate
}, func(_, bucketLatest *v1alpha1.Bucket) UpdateRequired {
Expand Down
41 changes: 0 additions & 41 deletions internal/controller/bucket/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,47 +76,6 @@ func TestCreateBasicErrors(t *testing.T) {
err: errors.New(errNoActiveS3Backends),
},
},
"S3 backend reference inactive": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend("s3-backend-0", nil, nil, true, apisv1alpha1.HealthStatusUnknown)
bs.AddOrUpdateBackend("s3-backend-1", nil, nil, false, apisv1alpha1.HealthStatusUnknown)

return bs
}(),
},
args: args{
mg: &v1alpha1.Bucket{
Spec: v1alpha1.BucketSpec{
Providers: []string{"s3-backend-0", "s3-backend-1"},
},
},
},
want: want{
err: errors.New(errMissingS3Backend),
},
},
"S3 backend reference missing": {
fields: fields{
backendStore: func() *backendstore.BackendStore {
bs := backendstore.NewBackendStore()
bs.AddOrUpdateBackend("s3-backend-0", nil, nil, true, apisv1alpha1.HealthStatusUnknown)

return bs
}(),
},
args: args{
mg: &v1alpha1.Bucket{
Spec: v1alpha1.BucketSpec{
Providers: []string{"s3-backend-0", "s3-backend-1"},
},
},
},
want: want{
err: errors.New(errMissingS3Backend),
},
},
"S3 backend not referenced and none exist": {
fields: fields{
backendStore: backendstore.NewBackendStore(),
Expand Down
18 changes: 11 additions & 7 deletions internal/controller/bucket/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {

g := new(errgroup.Group)

activeBackends := bucket.Spec.Providers
if len(activeBackends) == 0 {
activeBackends = c.backendStore.GetAllActiveBackendNames()
}
providerNames := []string{}
for backendName, backend := range bucket.Status.AtProvider.Backends {
providerNames = append(providerNames, backendName)

reason := backend.BucketCondition.Reason
if reason != xpv1.ReasonAvailable {
c.log.Info("Skipping deletion of bucket on backend, not available", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName, "status", reason)

continue
}

for _, backendName := range activeBackends {
bucketBackends.setBucketCondition(bucket.Name, backendName, xpv1.Deleting())

c.log.Info("Deleting bucket on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName)
Expand Down Expand Up @@ -89,8 +94,7 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
// CR spec. If the deletion is successful or unsuccessful, the bucket CR status must be
// updated.
if err := c.updateBucketCR(ctx, bucket, func(bucketDeepCopy, bucketLatest *v1alpha1.Bucket) UpdateRequired {
bucketLatest.Spec.Providers = activeBackends
setBucketStatus(bucketLatest, bucketBackends)
setBucketStatus(bucketLatest, bucketBackends, providerNames)

return NeedsStatusUpdate
}); err != nil {
Expand Down
41 changes: 27 additions & 14 deletions internal/controller/bucket/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bucket

import (
"context"
"strings"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/errors"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1"
"github.com/linode/provider-ceph/internal/backendstore"
"github.com/linode/provider-ceph/internal/consts"
"github.com/linode/provider-ceph/internal/utils"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -18,7 +20,7 @@ import (

// isBucketPaused returns true if the bucket has the paused label set.
func isBucketPaused(bucket *v1alpha1.Bucket) bool {
if val, ok := bucket.Labels[meta.AnnotationKeyReconciliationPaused]; ok && val == "true" {
if val, ok := bucket.Labels[meta.AnnotationKeyReconciliationPaused]; ok && val == True {
return true
}

Expand All @@ -30,7 +32,7 @@ func pauseBucket(bucket *v1alpha1.Bucket) {
if bucket.ObjectMeta.Labels == nil {
bucket.ObjectMeta.Labels = map[string]string{}
}
bucket.Labels[meta.AnnotationKeyReconciliationPaused] = "true"
bucket.Labels[meta.AnnotationKeyReconciliationPaused] = True
}

// isPauseRequired determines if the Bucket should be paused.
Expand Down Expand Up @@ -59,10 +61,10 @@ func isPauseRequired(b *v1alpha1.Bucket, c map[string]backendstore.S3Client, bb
b.Labels[meta.AnnotationKeyReconciliationPaused] == ""
}

// isBucketAvailableFromStatus checks the backends listed in Spec.Providers against the
// isBucketAvailableFromStatus checks the backends listed in providerNames against the
// backends in Status to ensure buckets are considered Available on all desired backends.
func isBucketAvailableFromStatus(bucket *v1alpha1.Bucket, backendClients map[string]backendstore.S3Client) bool {
for _, backendName := range bucket.Spec.Providers {
func isBucketAvailableFromStatus(bucket *v1alpha1.Bucket, providerNames []string, backendClients map[string]backendstore.S3Client) bool {
for _, backendName := range providerNames {
if _, ok := backendClients[backendName]; !ok {
// This backend does not exist in the list of available backends.
// The backend may be offline, so it is skipped.
Expand All @@ -82,24 +84,35 @@ func isBucketAvailableFromStatus(bucket *v1alpha1.Bucket, backendClients map[str
}

// setBackendLabels adds label "provider-ceph.backends.<backend-name>" to the Bucket for each backend.
func setBackendLabels(bucket *v1alpha1.Bucket) {
for _, beName := range bucket.Spec.Providers {
beLabel := v1alpha1.BackendLabelPrefix + beName
func setBackendLabels(bucket *v1alpha1.Bucket, providerNames []string) {
if bucket.ObjectMeta.Labels == nil {
bucket.ObjectMeta.Labels = map[string]string{}
}

labelsToDelete := []string{}
for k := range bucket.ObjectMeta.Labels {
if strings.HasPrefix(k, v1alpha1.BackendLabelPrefix) && bucket.ObjectMeta.Labels[k] == True {
labelsToDelete = append(labelsToDelete, k)
}
}
for _, k := range labelsToDelete {
delete(bucket.ObjectMeta.Labels, k)
}

for _, beName := range providerNames {
beLabel := utils.GetBackendLabel(beName)
if _, ok := bucket.ObjectMeta.Labels[beLabel]; ok {
continue
}

if bucket.ObjectMeta.Labels == nil {
bucket.ObjectMeta.Labels = map[string]string{}
}
bucket.ObjectMeta.Labels[beLabel] = ""
bucket.ObjectMeta.Labels[beLabel] = True
}
}

func setBucketStatus(bucket *v1alpha1.Bucket, bucketBackends *bucketBackends) {
func setBucketStatus(bucket *v1alpha1.Bucket, bucketBackends *bucketBackends, providerNames []string) {
bucket.Status.SetConditions(xpv1.Unavailable())

backends := bucketBackends.getBackends(bucket.Name, bucket.Spec.Providers)
backends := bucketBackends.getBackends(bucket.Name, providerNames)
bucket.Status.AtProvider.Backends = backends

for _, backend := range backends {
Expand Down
15 changes: 10 additions & 5 deletions internal/controller/bucket/observe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/linode/provider-ceph/internal/consts"
"github.com/linode/provider-ceph/internal/otel/traces"
"github.com/linode/provider-ceph/internal/rgw"
"github.com/linode/provider-ceph/internal/utils"
)

//nolint:gocyclo,cyclop // Function requires numerous checks.
Expand Down Expand Up @@ -67,13 +68,17 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
}

// If no Providers are specified in the Bucket Spec, the bucket is to be created on all backends.
if len(bucket.Spec.Providers) == 0 {
bucket.Spec.Providers = c.backendStore.GetAllActiveBackendNames()
providerNames := utils.GetBucketProvidersFilterDisabledLabel(bucket, c.backendStore.GetAllActiveBackendNames())
if len(providerNames) == 0 {
err := errors.New(errNoActiveS3Backends)
traces.SetAndRecordError(span, err)

return managed.ExternalObservation{}, err
}
backendClients := c.backendStore.GetBackendS3Clients(bucket.Spec.Providers)
backendClients := c.backendStore.GetBackendS3Clients(providerNames)

// Check that the Bucket CR is Available according to its Status backends.
if !isBucketAvailableFromStatus(bucket, backendClients) {
if !isBucketAvailableFromStatus(bucket, providerNames, backendClients) {
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: false,
Expand All @@ -82,7 +87,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex

// Observe sub-resources for the Bucket to check if they too are up to date.
for _, subResourceClient := range c.subresourceClients {
obs, err := subResourceClient.Observe(ctx, bucket, bucket.Spec.Providers)
obs, err := subResourceClient.Observe(ctx, bucket, providerNames)
if err != nil {
err := errors.Wrap(err, errObserveSubresource)
traces.SetAndRecordError(span, err)
Expand Down
Loading

0 comments on commit bc319e2

Please sign in to comment.