Skip to content

Commit

Permalink
storageclassrequest: fulfill requests with RADOS namespaces
Browse files Browse the repository at this point in the history
This set of changes achieves two things:

* Reconciles a given CephBlockPool to serve a single storage profile,
  regardless of consumer. As part of this, also changes the
  CephBlockPool name generation to remove the UUID portion.
* Reconciles a CephBlockPoolRadosNamespace to provide isolation of data
  for consumers on shared block pools.

Signed-off-by: Jose A. Rivera <[email protected]>
  • Loading branch information
jarrpa committed Mar 19, 2024
1 parent 1a3ca59 commit 38e8460
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 225 deletions.
142 changes: 71 additions & 71 deletions controllers/storageclassrequest/storageclassrequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strings"

"github.com/go-logr/logr"
"github.com/google/uuid"
snapapi "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"github.com/red-hat-storage/ocs-operator/api/v4/v1alpha1"
Expand Down Expand Up @@ -60,6 +59,7 @@ type StorageClassRequestReconciler struct {
storageCluster *v1.StorageCluster
StorageClassRequest *v1alpha1.StorageClassRequest
cephBlockPool *rookCephv1.CephBlockPool
cephRadosNamespace *rookCephv1.CephBlockPoolRadosNamespace
cephFilesystemSubVolumeGroup *rookCephv1.CephFilesystemSubVolumeGroup
cephClientProvisioner *rookCephv1.CephClient
cephClientNode *rookCephv1.CephClient
Expand Down Expand Up @@ -139,6 +139,15 @@ func (r *StorageClassRequestReconciler) Reconcile(ctx context.Context, request r

func (r *StorageClassRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {

if err := mgr.GetCache().IndexField(
context.TODO(),
&rookCephv1.CephBlockPoolRadosNamespace{},
util.OwnerUIDIndexName,
util.OwnersIndexFieldFunc,
); err != nil {
return fmt.Errorf("unable to set up FieldIndexer on CephBlockPoolRadosNamespaces for owner reference UIDs: %v", err)
}

if err := mgr.GetCache().IndexField(
context.TODO(),
&rookCephv1.CephFilesystemSubVolumeGroup{},
Expand All @@ -163,11 +172,13 @@ func (r *StorageClassRequestReconciler) SetupWithManager(mgr ctrl.Manager) error
return []reconcile.Request{}
})
enqueueForOwner := handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &v1alpha1.StorageClassRequest{})
generationChangedPredicate := builder.WithPredicates(predicate.GenerationChangedPredicate{})

return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.StorageClassRequest{}, builder.WithPredicates(
predicate.GenerationChangedPredicate{},
)).
Watches(&rookCephv1.CephBlockPool{}, enqueueForOwner).
Owns(&rookCephv1.CephBlockPoolRadosNamespace{}, generationChangedPredicate).
Watches(&rookCephv1.CephFilesystemSubVolumeGroup{}, enqueueForOwner).
Watches(&rookCephv1.CephClient{}, enqueueForOwner).
Watches(&storagev1.StorageClass{}, enqueueStorageConsumerRequest).
Expand Down Expand Up @@ -213,49 +224,71 @@ func (r *StorageClassRequestReconciler) initPhase(storageProfile *v1.StorageProf

// check request status already contains the name of the resource. if not, add it.
if r.StorageClassRequest.Spec.Type == "blockpool" {
// initialize in-memory structs
r.cephRadosNamespace = &rookCephv1.CephBlockPoolRadosNamespace{}
r.cephRadosNamespace.Namespace = r.OperatorNamespace
r.cephBlockPool = &rookCephv1.CephBlockPool{}
r.cephBlockPool.Namespace = r.OperatorNamespace
for _, res := range r.StorageClassRequest.Status.CephResources {
if res.Kind == "CephBlockPool" {
r.cephBlockPool.Name = res.Name
break
}

// check if a CephBlockPoolRadosNamespace resource exists for the desired storageconsumer and storageprofile.
cephRadosNamespaceList := &rookCephv1.CephBlockPoolRadosNamespaceList{}
err := r.list(
cephRadosNamespaceList,
client.InNamespace(r.OperatorNamespace),
client.MatchingFields{util.OwnerUIDIndexName: string(r.StorageClassRequest.UID)})
if err != nil {
return err
}

// if we found no CephBlockPoolRadosNamespaces, generate a new name
// if we found only one CephBlockPoolRadosNamespace with our query, we're good
// if we found more than one CephBlockPoolRadosNamespace, we can't determine which one to select, so error out
rnsItemsLen := len(cephRadosNamespaceList.Items)
if rnsItemsLen == 0 {
rnsNewName := fmt.Sprintf("cephradosnamespace-%s-%s", r.StorageClassRequest.Spec.StorageProfile, r.storageConsumer.Name)
r.log.V(1).Info("no valid CephBlockPoolRadosNamespace found, creating new one", "CephBlockPoolRadosNamespace", rnsNewName)
r.cephRadosNamespace.Name = rnsNewName
} else if rnsItemsLen == 1 {
cephRns := cephRadosNamespaceList.Items[0]
r.cephBlockPool.Name = cephRns.Spec.BlockPoolName
r.cephRadosNamespace = &cephRns
r.log.V(1).Info("valid CephBlockPoolRadosNamespace found", "CephBlockPoolRadosNamespace", r.cephRadosNamespace.Name)
} else {
return fmt.Errorf("invalid number of CephBlockPoolRadosNamespaces for storage consumer %q and storage profile %q: found %d, expecting 0 or 1", r.storageConsumer.Name, profileName, rnsItemsLen)
}

// check if a cephblockpool resource exists for the desired storageconsumer and storageprofile.
// check if a cephblockpool resource exists for the desired storageprofile.
if r.cephBlockPool.Name == "" {
cephBlockPoolList := &rookCephv1.CephBlockPoolList{}
listOptions := &client.MatchingLabels{
controllers.StorageConsumerNameLabel: r.storageConsumer.Name,
controllers.StorageProfileSpecLabel: storageProfile.GetSpecHash(),
listOptions := []client.ListOption{
client.InNamespace(r.OperatorNamespace),
&client.MatchingLabels{
controllers.StorageProfileSpecLabel: storageProfile.GetSpecHash(),
},
}
if err := r.list(cephBlockPoolList, client.InNamespace(r.OperatorNamespace), listOptions); err != nil {
if err := r.list(cephBlockPoolList, listOptions...); err != nil {
return err
}

// if we found no CephBlockPools, generate a new name
// if we found only one CephBlockPool with our query, we're good
// if we found more than one CephBlockPool, we can't determine which one to select, so error out
cbpItemsLen := len(cephBlockPoolList.Items)
if cbpItemsLen == 0 {
cbpNewName := fmt.Sprintf("cephblockpool-%s-%s", r.storageConsumer.Name, generateUUID())
r.log.V(1).Info("no valid CephBlockPool found, creating new one", "CephBlockPool", cbpNewName)
r.cephBlockPool.Name = cbpNewName
} else if cbpItemsLen == 1 {
if cbpItemsLen == 1 {
r.cephBlockPool.Name = cephBlockPoolList.Items[0].GetName()
r.log.V(1).Info("valid CephBlockPool found", "CephBlockPool", r.cephBlockPool.Name)
} else {
return fmt.Errorf("invalid number of CephBlockPools for storage consumer %q and storage profile %q: found %d, expecting 0 or 1", r.storageConsumer.Name, storageProfile.Name, cbpItemsLen)
return fmt.Errorf("invalid number of CephBlockPools for storage profile %q: found %d, expecting 1", storageProfile.Name, cbpItemsLen)
}
}

r.cephRadosNamespace.Spec.BlockPoolName = r.cephBlockPool.Name
}
} else if r.StorageClassRequest.Spec.Type == "sharedfilesystem" {
r.cephFilesystemSubVolumeGroup = &rookCephv1.CephFilesystemSubVolumeGroup{}
r.cephFilesystemSubVolumeGroup.Namespace = r.OperatorNamespace

cephFilesystemSubVolumeGroupList := &rookCephv1.CephFilesystemSubVolumeGroupList{}
err := r.Client.List(
r.ctx,
err := r.list(
cephFilesystemSubVolumeGroupList,
client.InNamespace(r.OperatorNamespace),
client.MatchingFields{util.OwnerUIDIndexName: string(r.StorageClassRequest.UID)})
Expand Down Expand Up @@ -317,7 +350,7 @@ func (r *StorageClassRequestReconciler) reconcilePhases() (reconcile.Result, err
return reconcile.Result{}, err
}

if err := r.reconcileCephBlockPool(&storageProfile); err != nil {
if err := r.reconcileRadosNamespace(); err != nil {
return reconcile.Result{}, err
}

Expand Down Expand Up @@ -352,53 +385,26 @@ func (r *StorageClassRequestReconciler) reconcilePhases() (reconcile.Result, err
return reconcile.Result{}, nil
}

func (r *StorageClassRequestReconciler) reconcileCephBlockPool(storageProfile *v1.StorageProfile) error {

failureDomain := r.storageCluster.Status.FailureDomain

_, err := ctrl.CreateOrUpdate(r.ctx, r.Client, r.cephBlockPool, func() error {
if err := r.own(r.cephBlockPool); err != nil {
func (r *StorageClassRequestReconciler) reconcileRadosNamespace() error {
_, err := ctrl.CreateOrUpdate(r.ctx, r.Client, r.cephRadosNamespace, func() error {
if err := r.own(r.cephRadosNamespace); err != nil {
return err
}
deviceClass := storageProfile.Spec.DeviceClass
deviceSetList := r.storageCluster.Spec.StorageDeviceSets
var deviceSet *v1.StorageDeviceSet
for i := range deviceSetList {
ds := &deviceSetList[i]
// get the required deviceSetName of the profile
if deviceClass == ds.DeviceClass {
deviceSet = ds
break
}
}

if deviceSet == nil {
return fmt.Errorf("could not find device set with device class %q in storagecluster", deviceClass)
}

addLabel(r.cephBlockPool, controllers.StorageConsumerNameLabel, r.storageConsumer.Name)
addLabel(r.cephBlockPool, controllers.StorageProfileSpecLabel, storageProfile.GetSpecHash())
addLabel(r.cephRadosNamespace, controllers.StorageConsumerNameLabel, r.storageConsumer.Name)

r.cephBlockPool.Spec = rookCephv1.NamedBlockPoolSpec{
PoolSpec: rookCephv1.PoolSpec{
FailureDomain: failureDomain,
DeviceClass: deviceClass,
Replicated: rookCephv1.ReplicatedSpec{
Size: 3,
ReplicasPerFailureDomain: 1,
},
Parameters: storageProfile.Spec.BlockPoolConfiguration.Parameters,
},
r.cephRadosNamespace.Spec = rookCephv1.CephBlockPoolRadosNamespaceSpec{
BlockPoolName: r.cephBlockPool.Name,
}
return nil
})

if err != nil {
r.log.Error(
err,
"Failed to update CephBlockPool.",
"CephBlockPool",
klog.KRef(r.cephBlockPool.Namespace, r.cephBlockPool.Name),
"Failed to update CephBlockPoolRadosNamespace.",
"CephBlockPoolRadosNamespace",
klog.KRef(r.cephRadosNamespace.Namespace, r.cephRadosNamespace.Name),
)
return err
}
Expand All @@ -408,11 +414,11 @@ func (r *StorageClassRequestReconciler) reconcileCephBlockPool(storageProfile *v
"node": r.cephClientNode.Name,
}
phase := ""
if r.cephBlockPool.Status != nil {
phase = string(r.cephBlockPool.Status.Phase)
if r.cephRadosNamespace.Status != nil {
phase = string(r.cephRadosNamespace.Status.Phase)
}

r.setCephResourceStatus(r.cephBlockPool.Name, "CephBlockPool", phase, cephClients)
r.setCephResourceStatus(r.cephRadosNamespace.Name, "CephBlockPoolRadosNamespace", phase, cephClients)

return nil
}
Expand Down Expand Up @@ -491,9 +497,9 @@ func (r *StorageClassRequestReconciler) reconcileCephClientRBDProvisioner() erro
addStorageRelatedAnnotations(r.cephClientProvisioner, r.getNamespacedName(), "rbd", "provisioner")
r.cephClientProvisioner.Spec = rookCephv1.ClientSpec{
Caps: map[string]string{
"mon": "profile rbd",
"mon": "profile rbd, allow command 'osd blocklist'",
"mgr": "allow rw",
"osd": fmt.Sprintf("profile rbd pool=%s", r.cephBlockPool.Name),
"osd": fmt.Sprintf("profile rbd pool=%s namespace=%s", r.cephBlockPool.Name, r.cephRadosNamespace.Name),
},
}
return nil
Expand Down Expand Up @@ -529,7 +535,7 @@ func (r *StorageClassRequestReconciler) reconcileCephClientRBDNode() error {
Caps: map[string]string{
"mon": "profile rbd",
"mgr": "allow rw",
"osd": fmt.Sprintf("profile rbd pool=%s", r.cephBlockPool.Name),
"osd": fmt.Sprintf("profile rbd pool=%s namespace=%s", r.cephBlockPool.Name, r.cephRadosNamespace.Name),
},
}

Expand Down Expand Up @@ -566,7 +572,7 @@ func (r *StorageClassRequestReconciler) reconcileCephClientCephFSProvisioner() e
addStorageRelatedAnnotations(r.cephClientProvisioner, r.getNamespacedName(), "cephfs", "provisioner")
r.cephClientProvisioner.Spec = rookCephv1.ClientSpec{
Caps: map[string]string{
"mon": "allow r",
"mon": "allow r, allow command 'osd blocklist'",
"mgr": "allow rw",
"mds": fmt.Sprintf("allow rw path=/volumes/%s", r.cephFilesystemSubVolumeGroup.Name),
"osd": "allow rw tag cephfs metadata=*",
Expand Down Expand Up @@ -690,9 +696,3 @@ func addLabel(obj metav1.Object, key string, value string) {
}
labels[key] = value
}

// generateUUID generates a random UUID string and return first 8 characters.
func generateUUID() string {
newUUID := uuid.New().String()
return newUUID[:8]
}
Loading

0 comments on commit 38e8460

Please sign in to comment.