Skip to content

Commit

Permalink
[WIP] storageclassrequest: fulfill requests with RADOS namespaces
Browse files Browse the repository at this point in the history
This set of changes achieves two things:

* Reconciles a given CephBlockPool to serve a single storage profile,
  regardless of consumer. As part of this, also changes the
  CephBlockPool name generation to remove the UUID portion.
* Reconciles a CephBlockPoolRadosNamespace to provide isolation of data
  for consumers on shared block pools.
  • Loading branch information
jarrpa committed Feb 26, 2024
1 parent a9d5bb5 commit 33f5168
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 30 deletions.
116 changes: 102 additions & 14 deletions controllers/storageclassrequest/storageclassrequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strings"

"github.com/go-logr/logr"
"github.com/google/uuid"
snapapi "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"github.com/red-hat-storage/ocs-operator/api/v4/v1alpha1"
Expand Down Expand Up @@ -60,6 +59,7 @@ type StorageClassRequestReconciler struct {
storageCluster *v1.StorageCluster
StorageClassRequest *v1alpha1.StorageClassRequest
cephBlockPool *rookCephv1.CephBlockPool
cephRadosNamespace *rookCephv1.CephBlockPoolRadosNamespace
cephFilesystemSubVolumeGroup *rookCephv1.CephFilesystemSubVolumeGroup
cephClientProvisioner *rookCephv1.CephClient
cephClientNode *rookCephv1.CephClient
Expand Down Expand Up @@ -139,6 +139,15 @@ func (r *StorageClassRequestReconciler) Reconcile(ctx context.Context, request r

func (r *StorageClassRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := mgr.GetCache().IndexField(
context.TODO(),
&rookCephv1.CephBlockPoolRadosNamespace{},
util.OwnerUIDIndexName,
util.OwnersIndexFieldFunc,
); err != nil {
return fmt.Errorf("unable to set up FieldIndexer on CephBlockPoolRadosNamespaces for owner reference UIDs: %v", err)
}

if err := mgr.GetCache().IndexField(
context.TODO(),
&rookCephv1.CephFilesystemSubVolumeGroup{},
Expand Down Expand Up @@ -168,6 +177,7 @@ func (r *StorageClassRequestReconciler) SetupWithManager(mgr ctrl.Manager) error
predicate.GenerationChangedPredicate{},
)).
Watches(&rookCephv1.CephBlockPool{}, enqueueForOwner).
Watches(&rookCephv1.CephBlockPoolRadosNamespace{}, enqueueForOwner).
Watches(&rookCephv1.CephFilesystemSubVolumeGroup{}, enqueueForOwner).
Watches(&rookCephv1.CephClient{}, enqueueForOwner).
Watches(&storagev1.StorageClass{}, enqueueStorageConsumerRequest).
Expand Down Expand Up @@ -222,14 +232,25 @@ func (r *StorageClassRequestReconciler) initPhase(storageProfile *v1.StorageProf
}
}

// check if a cephblockpool resource exists for the desired storageconsumer and storageprofile.
r.cephRadosNamespace = &rookCephv1.CephBlockPoolRadosNamespace{}
r.cephRadosNamespace.Namespace = r.OperatorNamespace
for _, res := range r.StorageClassRequest.Status.CephResources {
if res.Kind == "CephBlockPoolRadosNamespace" {
r.cephRadosNamespace.Name = res.Name
break
}
}

// check if a cephblockpool resource exists for the desired storageprofile.
if r.cephBlockPool.Name == "" {
cephBlockPoolList := &rookCephv1.CephBlockPoolList{}
listOptions := &client.MatchingLabels{
controllers.StorageConsumerNameLabel: r.storageConsumer.Name,
controllers.StorageProfileSpecLabel: storageProfile.GetSpecHash(),
listOptions := []client.ListOption{
client.InNamespace(r.OperatorNamespace),
&client.MatchingLabels{
controllers.StorageProfileSpecLabel: storageProfile.GetSpecHash(),
},
}
if err := r.list(cephBlockPoolList, client.InNamespace(r.OperatorNamespace), listOptions); err != nil {
if err := r.list(cephBlockPoolList, listOptions...); err != nil {
return err
}

Expand All @@ -238,17 +259,51 @@ func (r *StorageClassRequestReconciler) initPhase(storageProfile *v1.StorageProf
// if we found more than one CephBlockPool, we can't determine which one to select, so error out
cbpItemsLen := len(cephBlockPoolList.Items)
if cbpItemsLen == 0 {
cbpNewName := fmt.Sprintf("cephblockpool-%s-%s", r.storageConsumer.Name, generateUUID())
cbpNewName := fmt.Sprintf("cephblockpool-profile-%s", storageProfile.Name)
r.log.V(1).Info("no valid CephBlockPool found, creating new one", "CephBlockPool", cbpNewName)
r.cephBlockPool.Name = cbpNewName
} else if cbpItemsLen == 1 {
r.cephBlockPool.Name = cephBlockPoolList.Items[0].GetName()
r.log.V(1).Info("valid CephBlockPool found", "CephBlockPool", r.cephBlockPool.Name)
} else {
return fmt.Errorf("invalid number of CephBlockPools for storage consumer %q and storage profile %q: found %d, expecting 0 or 1", r.storageConsumer.Name, storageProfile.Name, cbpItemsLen)
return fmt.Errorf("invalid number of CephBlockPools for storage profile %q: found %d, expecting 0 or 1", storageProfile.Name, cbpItemsLen)
}
}

// check if a CephBlockPoolRadosNamespace resource exists for the desired storageconsumer and storageprofile.
if r.cephRadosNamespace.Name == "" {
cephRadosNamespaceList := &rookCephv1.CephBlockPoolRadosNamespaceList{}
err := r.Client.List(
r.ctx,
cephRadosNamespaceList,
client.InNamespace(r.OperatorNamespace),
client.MatchingFields{util.OwnerUIDIndexName: string(r.StorageClassRequest.UID)})
if err != nil {
return err
}

// if we found no CephBlockPoolRadosNamespaces, generate a new name
// if we found only one CephBlockPoolRadosNamespace with our query, we're good
// if we found more than one CephBlockPoolRadosNamespace, we can't determine which one to select, so error out
rnsItemsLen := len(cephRadosNamespaceList.Items)
if rnsItemsLen == 0 {
rnsNewName := fmt.Sprintf("cephradosnamespace-%s-%s", r.cephBlockPool.Name, r.storageConsumer.Name)
r.log.V(1).Info("no valid CephBlockPoolRadosNamespace found, creating new one", "CephBlockPoolRadosNamespace", rnsNewName)
r.cephRadosNamespace.Name = rnsNewName
} else if rnsItemsLen == 1 {
cephRns := cephRadosNamespaceList.Items[0]
if r.cephBlockPool.Name != "" && cephRns.Spec.BlockPoolName != r.cephBlockPool.Name {
return fmt.Errorf("found CephBlockPoolRadosNamespace %q with BlockPoolName %q, but CephResources lists CephBlockPool %q", cephRns.Name, cephRns.Spec.BlockPoolName, r.cephBlockPool.Name)
}
r.cephRadosNamespace.Name = cephRns.GetName()
r.log.V(1).Info("valid CephBlockPoolRadosNamespace found", "CephBlockPoolRadosNamespace", r.cephRadosNamespace.Name)
} else {
return fmt.Errorf("invalid number of CephBlockPoolRadosNamespaces for storage consumer %q and storage profile %q: found %d, expecting 0 or 1", r.storageConsumer.Name, storageProfile.Name, rnsItemsLen)
}
}

r.cephRadosNamespace.Spec.BlockPoolName = r.cephBlockPool.Name

} else if r.StorageClassRequest.Spec.Type == "sharedfilesystem" {
r.cephFilesystemSubVolumeGroup = &rookCephv1.CephFilesystemSubVolumeGroup{}
r.cephFilesystemSubVolumeGroup.Namespace = r.OperatorNamespace
Expand Down Expand Up @@ -321,6 +376,10 @@ func (r *StorageClassRequestReconciler) reconcilePhases() (reconcile.Result, err
return reconcile.Result{}, err
}

if err := r.reconcileRadosNamespace(&storageProfile); err != nil {
return reconcile.Result{}, err
}

} else if r.StorageClassRequest.Spec.Type == "sharedfilesystem" {
if err := r.reconcileCephClientCephFSProvisioner(); err != nil {
return reconcile.Result{}, err
Expand Down Expand Up @@ -417,6 +476,41 @@ func (r *StorageClassRequestReconciler) reconcileCephBlockPool(storageProfile *v
return nil
}

func (r *StorageClassRequestReconciler) reconcileRadosNamespace(storageProfile *v1.StorageProfile) error {
_, err := ctrl.CreateOrUpdate(r.ctx, r.Client, r.cephRadosNamespace, func() error {
if err := r.own(r.cephRadosNamespace); err != nil {
return err
}

addLabel(r.cephRadosNamespace, controllers.StorageConsumerNameLabel, r.storageConsumer.Name)
addLabel(r.cephBlockPool, controllers.StorageProfileSpecLabel, storageProfile.GetSpecHash())

r.cephRadosNamespace.Spec = rookCephv1.CephBlockPoolRadosNamespaceSpec{
BlockPoolName: r.cephBlockPool.Name,
}
return nil
})

if err != nil {
r.log.Error(
err,
"Failed to update CephBlockPoolRadosNamespace.",
"CephBlockPoolRadosNamespace",
klog.KRef(r.cephRadosNamespace.Namespace, r.cephRadosNamespace.Name),
)
return err
}

phase := ""
if r.cephRadosNamespace.Status != nil {
phase = string(r.cephRadosNamespace.Status.Phase)
}

r.setCephResourceStatus(r.cephRadosNamespace.Name, "CephBlockPoolRadosNamespace", phase, nil)

return nil
}

func (r *StorageClassRequestReconciler) reconcileCephFilesystemSubVolumeGroup(storageProfile *v1.StorageProfile) error {

cephFilesystem := rookCephv1.CephFilesystem{
Expand Down Expand Up @@ -690,9 +784,3 @@ func addLabel(obj metav1.Object, key string, value string) {
}
labels[key] = value
}

// generateUUID generates a random UUID string and return first 8 characters.
func generateUUID() string {
newUUID := uuid.New().String()
return newUUID[:8]
}
Loading

0 comments on commit 33f5168

Please sign in to comment.