From de47048c48d4e1d10ff0bf48d16c1ed43cb327af Mon Sep 17 00:00:00 2001
From: parth-gr <partharora1010@gmail.com>
Date: Mon, 26 Feb 2024 20:39:55 +0530
Subject: [PATCH] external: add toplogyconstraintpool support for rbd
 storageclass

with topologyconstrain pool we can enable any
replica pool usage and also can store data at any
topology

Signed-off-by: parth-gr <partharora1010@gmail.com>
---
 .../CRDs/Cluster/external-cluster.md          |  14 ++
 .../create-external-cluster-resources.py      | 208 ++++++++++++++----
 deploy/examples/import-external-cluster.sh    |  63 +++++-
 3 files changed, 240 insertions(+), 45 deletions(-)

diff --git a/Documentation/CRDs/Cluster/external-cluster.md b/Documentation/CRDs/Cluster/external-cluster.md
index 8a803fe33cabe..af3a762f40eec 100644
--- a/Documentation/CRDs/Cluster/external-cluster.md
+++ b/Documentation/CRDs/Cluster/external-cluster.md
@@ -60,6 +60,9 @@ python3 create-external-cluster-resources.py --rbd-data-pool-name <pool_name> --
 * `--upgrade`: (optional) Upgrades the cephCSIKeyrings(For example: client.csi-cephfs-provisioner) and client.healthchecker ceph users with new permissions needed for the new cluster version and older permission will still be applied.
 * `--restricted-auth-permission`: (optional) Restrict cephCSIKeyrings auth permissions to specific pools, and cluster. Mandatory flags that need to be set are `--rbd-data-pool-name`, and `--k8s-cluster-name`. `--cephfs-filesystem-name` flag can also be passed in case of CephFS user restriction, so it can restrict users to particular CephFS filesystem.
 * `--v2-port-enable`: (optional) Enables the v2 mon port (3300) for mons.
+*  `--topology-pools`: (optional) comma-separated list of topology-constrained rbd pools
+*  `--topology-failure-domain-label`: (optional) k8s cluster failure domain label (example: zone,rack,host,etc) for the topology-pools that are matching the ceph domain
+*  `--topology-failure-domain-values`: (optional) comma-separated list of the k8s cluster failure domain values corresponding to each of the pools in the topology-pools list
 
 ### Multi-tenancy
 
@@ -84,6 +87,17 @@ See the [Multisite doc](https://docs.ceph.com/en/quincy/radosgw/multisite/#confi
 python3 create-external-cluster-resources.py --rbd-data-pool-name <pool_name> --format bash --rgw-endpoint <rgw_endpoint> --rgw-realm-name <rgw_realm_name>> --rgw-zonegroup-name <rgw_zonegroup_name> --rgw-zone-name <rgw_zone_name>>
 ```
 
+### Topology Based Provisioning
+
+Enable Topology Based Provisioning for RBD pools by passing `--topology-pools`, `--topology-failure-domain-label` and `--topology-failure-domain-values` flags.
+A new storageclass will be created by the import script named `ceph-rbd-topology` with `volumeBindingMode: WaitForFirstConsumer`
+and will configure topologyConstrainedPools according the input provided.
+Later use the storageclass to create a volume in the pool matching the topology of the pod scheduling.
+
+```console
+python3 create-external-cluster-resources.py --rbd-data-pool-name pool_name --topology-pools p,q,r --topology-failure-domain-label labelName --topology-failure-domain-values x,y,z --format bash
+```
+
 ### Upgrade Example
 
 1) If consumer cluster doesn't have restricted caps, this will upgrade all the default csi-users (non-restricted):
diff --git a/deploy/examples/create-external-cluster-resources.py b/deploy/examples/create-external-cluster-resources.py
index 5ffef28d31838..bf5a3a46a4033 100644
--- a/deploy/examples/create-external-cluster-resources.py
+++ b/deploy/examples/create-external-cluster-resources.py
@@ -99,22 +99,22 @@ def _init_cmd_output_map(self):
         ) as json_file:
             ceph_status_str = json_file.read()
         self.cmd_names["fs ls"] = """{"format": "json", "prefix": "fs ls"}"""
-        self.cmd_names["quorum_status"] = (
-            """{"format": "json", "prefix": "quorum_status"}"""
-        )
-        self.cmd_names["mgr services"] = (
-            """{"format": "json", "prefix": "mgr services"}"""
-        )
+        self.cmd_names[
+            "quorum_status"
+        ] = """{"format": "json", "prefix": "quorum_status"}"""
+        self.cmd_names[
+            "mgr services"
+        ] = """{"format": "json", "prefix": "mgr services"}"""
         # all the commands and their output
-        self.cmd_output_map[self.cmd_names["fs ls"]] = (
-            """[{"name":"myfs","metadata_pool":"myfs-metadata","metadata_pool_id":2,"data_pool_ids":[3],"data_pools":["myfs-replicated"]}]"""
-        )
-        self.cmd_output_map[self.cmd_names["quorum_status"]] = (
-            """{"election_epoch":3,"quorum":[0],"quorum_names":["a"],"quorum_leader_name":"a","quorum_age":14385,"features":{"quorum_con":"4540138292836696063","quorum_mon":["kraken","luminous","mimic","osdmap-prune","nautilus","octopus"]},"monmap":{"epoch":1,"fsid":"af4e1673-0b72-402d-990a-22d2919d0f1c","modified":"2020-05-07T03:36:39.918035Z","created":"2020-05-07T03:36:39.918035Z","min_mon_release":15,"min_mon_release_name":"octopus","features":{"persistent":["kraken","luminous","mimic","osdmap-prune","nautilus","octopus"],"optional":[]},"mons":[{"rank":0,"name":"a","public_addrs":{"addrvec":[{"type":"v2","addr":"10.110.205.174:3300","nonce":0},{"type":"v1","addr":"10.110.205.174:6789","nonce":0}]},"addr":"10.110.205.174:6789/0","public_addr":"10.110.205.174:6789/0","priority":0,"weight":0}]}}"""
-        )
-        self.cmd_output_map[self.cmd_names["mgr services"]] = (
-            """{"dashboard":"https://ceph-dashboard:8443/","prometheus":"http://ceph-dashboard-db:9283/"}"""
-        )
+        self.cmd_output_map[
+            self.cmd_names["fs ls"]
+        ] = """[{"name":"myfs","metadata_pool":"myfs-metadata","metadata_pool_id":2,"data_pool_ids":[3],"data_pools":["myfs-replicated"]}]"""
+        self.cmd_output_map[
+            self.cmd_names["quorum_status"]
+        ] = """{"election_epoch":3,"quorum":[0],"quorum_names":["a"],"quorum_leader_name":"a","quorum_age":14385,"features":{"quorum_con":"4540138292836696063","quorum_mon":["kraken","luminous","mimic","osdmap-prune","nautilus","octopus"]},"monmap":{"epoch":1,"fsid":"af4e1673-0b72-402d-990a-22d2919d0f1c","modified":"2020-05-07T03:36:39.918035Z","created":"2020-05-07T03:36:39.918035Z","min_mon_release":15,"min_mon_release_name":"octopus","features":{"persistent":["kraken","luminous","mimic","osdmap-prune","nautilus","octopus"],"optional":[]},"mons":[{"rank":0,"name":"a","public_addrs":{"addrvec":[{"type":"v2","addr":"10.110.205.174:3300","nonce":0},{"type":"v1","addr":"10.110.205.174:6789","nonce":0}]},"addr":"10.110.205.174:6789/0","public_addr":"10.110.205.174:6789/0","priority":0,"weight":0}]}}"""
+        self.cmd_output_map[
+            self.cmd_names["mgr services"]
+        ] = """{"dashboard":"https://ceph-dashboard:8443/","prometheus":"http://ceph-dashboard-db:9283/"}"""
         self.cmd_output_map[
             """{"caps": ["mon", "allow r, allow command quorum_status", "osd", "profile rbd-read-only, allow rwx pool=default.rgw.meta, allow r pool=.rgw.root, allow rw pool=default.rgw.control, allow x pool=default.rgw.buckets.index"], "entity": "client.healthchecker", "format": "json", "prefix": "auth get-or-create"}"""
         ] = """[{"entity":"client.healthchecker","key":"AQDFkbNeft5bFRAATndLNUSEKruozxiZi3lrdA==","caps":{"mon":"allow r, allow command quorum_status","osd":"profile rbd-read-only, allow rwx pool=default.rgw.meta, allow r pool=.rgw.root, allow rw pool=default.rgw.control, allow x pool=default.rgw.buckets.index"}}]"""
@@ -142,9 +142,9 @@ def _init_cmd_output_map(self):
         self.cmd_output_map[
             """{"caps": ["mon", "allow r, allow command quorum_status, allow command version", "mgr", "allow command config", "osd", "profile rbd-read-only, allow rwx pool=default.rgw.meta, allow r pool=.rgw.root, allow rw pool=default.rgw.control, allow rx pool=default.rgw.log, allow x pool=default.rgw.buckets.index"], "entity": "client.healthchecker", "format": "json", "prefix": "auth caps"}"""
         ] = """[{"entity":"client.healthchecker","key":"AQDFkbNeft5bFRAATndLNUSRKruozxiZi3lrdA==","caps":{"mon": "allow r, allow command quorum_status, allow command version", "mgr": "allow command config", "osd": "profile rbd-read-only, allow rwx pool=default.rgw.meta, allow r pool=.rgw.root, allow rw pool=default.rgw.control, allow rx pool=default.rgw.log, allow x pool=default.rgw.buckets.index"}}]"""
-        self.cmd_output_map["""{"format": "json", "prefix": "mgr services"}"""] = (
-            """{"dashboard": "http://rook-ceph-mgr-a-57cf9f84bc-f4jnl:7000/", "prometheus": "http://rook-ceph-mgr-a-57cf9f84bc-f4jnl:9283/"}"""
-        )
+        self.cmd_output_map[
+            """{"format": "json", "prefix": "mgr services"}"""
+        ] = """{"dashboard": "http://rook-ceph-mgr-a-57cf9f84bc-f4jnl:7000/", "prometheus": "http://rook-ceph-mgr-a-57cf9f84bc-f4jnl:9283/"}"""
         self.cmd_output_map[
             """{"entity": "client.healthchecker", "format": "json", "prefix": "auth get"}"""
         ] = """{"dashboard": "http://rook-ceph-mgr-a-57cf9f84bc-f4jnl:7000/", "prometheus": "http://rook-ceph-mgr-a-57cf9f84bc-f4jnl:9283/"}"""
@@ -474,6 +474,24 @@ def gen_arg_parser(cls, args_to_parse=None):
             required=False,
             help="provides the name of the rgw-zonegroup",
         )
+        output_group.add_argument(
+            "--topology-pools",
+            default="",
+            required=False,
+            help="comma-separated list of topology-constrained rbd pools",
+        )
+        output_group.add_argument(
+            "--topology-failure-domain-label",
+            default="",
+            required=False,
+            help="k8s cluster failure domain label (example: zone,rack,host,etc) for the topology-pools that are matching the ceph domain",
+        )
+        output_group.add_argument(
+            "--topology-failure-domain-values",
+            default="",
+            required=False,
+            help="comma-separated list of the k8s cluster failure domain values corresponding to each of the pools in the topology-pools list",
+        )
 
         upgrade_group = argP.add_argument_group("upgrade")
         upgrade_group.add_argument(
@@ -578,7 +596,7 @@ def _invalid_endpoint(self, endpoint_str):
         if not port.isdigit():
             raise ExecutionFailureException(f"Port not valid: {port}")
         intPort = int(port)
-        if intPort < 1 or intPort > 2**16 - 1:
+        if intPort < 1 or intPort > 2 ** 16 - 1:
             raise ExecutionFailureException(f"Out of range port number: {port}")
 
         return ip_type
@@ -958,9 +976,9 @@ def get_rbd_provisioner_caps_and_entity(self):
                 rados_namespace,
             )
             if rados_namespace != "":
-                caps["osd"] = (
-                    f"profile rbd pool={rbd_pool_name} namespace={rados_namespace}"
-                )
+                caps[
+                    "osd"
+                ] = f"profile rbd pool={rbd_pool_name} namespace={rados_namespace}"
             else:
                 caps["osd"] = f"profile rbd pool={rbd_pool_name}"
 
@@ -993,9 +1011,9 @@ def get_rbd_node_caps_and_entity(self):
                 rados_namespace,
             )
             if rados_namespace != "":
-                caps["osd"] = (
-                    f"profile rbd pool={rbd_pool_name} namespace={rados_namespace}"
-                )
+                caps[
+                    "osd"
+                ] = f"profile rbd pool={rbd_pool_name} namespace={rados_namespace}"
             else:
                 caps["osd"] = f"profile rbd pool={rbd_pool_name}"
 
@@ -1329,16 +1347,15 @@ def create_rgw_admin_ops_user(self):
             "",
         )
 
-    def validate_rbd_pool(self):
-        if not self.cluster.pool_exists(self._arg_parser.rbd_data_pool_name):
+    def validate_rbd_pool(self, pool_name):
+        if not self.cluster.pool_exists(pool_name):
             raise ExecutionFailureException(
-                f"The provided pool, '{self._arg_parser.rbd_data_pool_name}', does not exist"
+                f"The provided pool, '{pool_name}', does not exist"
             )
 
-    def init_rbd_pool(self):
+    def init_rbd_pool(self, rbd_pool_name):
         if isinstance(self.cluster, DummyRados):
             return
-        rbd_pool_name = self._arg_parser.rbd_data_pool_name
         ioctx = self.cluster.open_ioctx(rbd_pool_name)
         rbd_inst = rbd.RBD()
         rbd_inst.pool_init(ioctx, True)
@@ -1509,6 +1526,54 @@ def validate_rgw_multisite(self, rgw_multisite_config_name, rgw_multisite_config
                 return "-1"
         return ""
 
+    def convert_comma_seprated_to_array(self, value):
+        return value.split(",")
+
+    def raise_exception_if_any_topology_flag_is_missing(self):
+        if (
+            (
+                self._arg_parser.topology_pools != ""
+                and (
+                    self._arg_parser.topology_failure_domain_label == ""
+                    or self._arg_parser.topology_failure_domain_values == ""
+                )
+            )
+            or (
+                self._arg_parser.topology_failure_domain_label != ""
+                and (
+                    self._arg_parser.topology_pools == ""
+                    or self._arg_parser.topology_failure_domain_values == ""
+                )
+            )
+            or (
+                self._arg_parser.topology_failure_domain_values != ""
+                and (
+                    self._arg_parser.topology_pools == ""
+                    or self._arg_parser.topology_failure_domain_label == ""
+                )
+            )
+        ):
+            raise ExecutionFailureException(
+                "provide all the topology flags --topology-pools, --topology-failure-domain-label, --topology-failure-domain-values"
+            )
+
+    def validate_topology_values(self, topology_pools, topology_fd):
+        if len(topology_pools) != len(topology_fd):
+            raise ExecutionFailureException(
+                f"The provided topology pools, '{topology_pools}', and "
+                f"topology failure domain, '{topology_fd}',"
+                f"are of different length, '{len(topology_pools)}' and '{len(topology_fd)}' respctively"
+            )
+        return
+
+    def validate_topology_rbd_pools(self, topology_rbd_pools):
+        for pool in topology_rbd_pools:
+            self.validate_rbd_pool(pool)
+
+    def init_topology_rbd_pools(self, topology_rbd_pools):
+        for pool in topology_rbd_pools:
+            self.init_rbd_pool(pool)
+
     def _gen_output_map(self):
         if self.out_map:
             return
@@ -1518,8 +1583,8 @@ def _gen_output_map(self):
         self._arg_parser.k8s_cluster_name = (
             self._arg_parser.k8s_cluster_name.lower()
         )  # always convert cluster name to lowercase characters
-        self.validate_rbd_pool()
-        self.init_rbd_pool()
+        self.validate_rbd_pool(self._arg_parser.rbd_data_pool_name)
+        self.init_rbd_pool(self._arg_parser.rbd_data_pool_name)
         self.validate_rados_namespace()
         self._excluded_keys.add("K8S_CLUSTER_NAME")
         self.get_cephfs_data_pool_details()
@@ -1541,13 +1606,13 @@ def _gen_output_map(self):
             self.out_map["CSI_RBD_PROVISIONER_SECRET_NAME"],
         ) = self.create_cephCSIKeyring_user("client.csi-rbd-provisioner")
         self.out_map["CEPHFS_POOL_NAME"] = self._arg_parser.cephfs_data_pool_name
-        self.out_map["CEPHFS_METADATA_POOL_NAME"] = (
-            self._arg_parser.cephfs_metadata_pool_name
-        )
+        self.out_map[
+            "CEPHFS_METADATA_POOL_NAME"
+        ] = self._arg_parser.cephfs_metadata_pool_name
         self.out_map["CEPHFS_FS_NAME"] = self._arg_parser.cephfs_filesystem_name
-        self.out_map["RESTRICTED_AUTH_PERMISSION"] = (
-            self._arg_parser.restricted_auth_permission
-        )
+        self.out_map[
+            "RESTRICTED_AUTH_PERMISSION"
+        ] = self._arg_parser.restricted_auth_permission
         self.out_map["RADOS_NAMESPACE"] = self._arg_parser.rados_namespace
         self.out_map["SUBVOLUME_GROUP"] = self._arg_parser.subvolume_group
         self.out_map["CSI_CEPHFS_NODE_SECRET"] = ""
@@ -1590,9 +1655,36 @@ def _gen_output_map(self):
                 self.out_map["MONITORING_ENDPOINT_PORT"],
             ) = self.get_active_and_standby_mgrs()
         self.out_map["RBD_POOL_NAME"] = self._arg_parser.rbd_data_pool_name
-        self.out_map["RBD_METADATA_EC_POOL_NAME"] = (
-            self.validate_rbd_metadata_ec_pool_name()
-        )
+        self.out_map[
+            "RBD_METADATA_EC_POOL_NAME"
+        ] = self.validate_rbd_metadata_ec_pool_name()
+        self.out_map["TOPOLOGY_POOLS"] = self._arg_parser.topology_pools
+        self.out_map[
+            "TOPOLOGY_FAILURE_DOMAIN_LABEL"
+        ] = self._arg_parser.topology_failure_domain_label
+        self.out_map[
+            "TOPOLOGY_FAILURE_DOMAIN_VALUES"
+        ] = self._arg_parser.topology_failure_domain_values
+        if (
+            self._arg_parser.topology_pools != ""
+            and self._arg_parser.topology_failure_domain_label != ""
+            and self._arg_parser.topology_failure_domain_values != ""
+        ):
+            self.validate_topology_values(
+                self.convert_comma_seprated_to_array(self.out_map["TOPOLOGY_POOLS"]),
+                self.convert_comma_seprated_to_array(
+                    self.out_map["TOPOLOGY_FAILURE_DOMAIN_VALUES"]
+                ),
+            )
+            self.validate_topology_rbd_pools(
+                self.convert_comma_seprated_to_array(self.out_map["TOPOLOGY_POOLS"])
+            )
+            self.init_topology_rbd_pools(
+                self.convert_comma_seprated_to_array(self.out_map["TOPOLOGY_POOLS"])
+            )
+        else:
+            self.raise_exception_if_any_topology_flag_is_missing()
+
         self.out_map["RGW_POOL_PREFIX"] = self._arg_parser.rgw_pool_prefix
         self.out_map["RGW_ENDPOINT"] = ""
         if self._arg_parser.rgw_endpoint:
@@ -1631,9 +1723,9 @@ def _gen_output_map(self):
                     ) = self.create_rgw_admin_ops_user()
                     err = self.validate_rgw_endpoint(info_cap_supported)
                     if self._arg_parser.rgw_tls_cert_path:
-                        self.out_map["RGW_TLS_CERT"] = (
-                            self.validate_rgw_endpoint_tls_cert()
-                        )
+                        self.out_map[
+                            "RGW_TLS_CERT"
+                        ] = self.validate_rgw_endpoint_tls_cert()
                     # if there is no error, set the RGW_ENDPOINT
                     if err != "-1":
                         self.out_map["RGW_ENDPOINT"] = self._arg_parser.rgw_endpoint
@@ -1829,6 +1921,34 @@ def gen_json_out(self):
                     }
                 )
 
+        # if 'TOPOLOGY_POOLS', 'TOPOLOGY_FAILURE_DOMAIN_LABEL', 'TOPOLOGY_FAILURE_DOMAIN_VALUES'  exists,
+        # then only add 'topology' StorageClass
+        if (
+            self.out_map["TOPOLOGY_POOLS"]
+            and self.out_map["TOPOLOGY_FAILURE_DOMAIN_LABEL"]
+            and self.out_map["TOPOLOGY_FAILURE_DOMAIN_VALUES"]
+        ):
+            json_out.append(
+                {
+                    "name": "ceph-rbd-topology-storageclass",
+                    "kind": "StorageClass",
+                    "data": {
+                        "topologyFailureDomainLabel": self.out_map[
+                            "TOPOLOGY_FAILURE_DOMAIN_LABEL"
+                        ],
+                        "topologyFailureDomainValues": self.convert_comma_seprated_to_array(
+                            self.out_map["TOPOLOGY_FAILURE_DOMAIN_VALUES"]
+                        ),
+                        "topologyPools": self.convert_comma_seprated_to_array(
+                            self.out_map["TOPOLOGY_POOLS"]
+                        ),
+                        "csi.storage.k8s.io/provisioner-secret-name": f"rook-{self.out_map['CSI_RBD_PROVISIONER_SECRET_NAME']}",
+                        "csi.storage.k8s.io/controller-expand-secret-name": f"rook-{self.out_map['CSI_RBD_PROVISIONER_SECRET_NAME']}",
+                        "csi.storage.k8s.io/node-stage-secret-name": f"rook-{self.out_map['CSI_RBD_NODE_SECRET_NAME']}",
+                    },
+                }
+            )
+
         # if 'CEPHFS_FS_NAME' exists, then only add 'cephfs' StorageClass
         if self.out_map["CEPHFS_FS_NAME"]:
             json_out.append(
diff --git a/deploy/examples/import-external-cluster.sh b/deploy/examples/import-external-cluster.sh
index 77381e715a290..a956f75aedc1f 100644
--- a/deploy/examples/import-external-cluster.sh
+++ b/deploy/examples/import-external-cluster.sh
@@ -19,9 +19,10 @@ ROOK_RBD_FEATURES=${ROOK_RBD_FEATURES:-"layering"}
 ROOK_EXTERNAL_MAX_MON_ID=2
 ROOK_EXTERNAL_MAPPING={}
 RBD_STORAGE_CLASS_NAME=ceph-rbd
+RBD_TOPOLOGY_STORAGE_CLASS_NAME=ceph-rbd-topology
 CEPHFS_STORAGE_CLASS_NAME=cephfs
 ROOK_EXTERNAL_MONITOR_SECRET=mon-secret
-OPERATOR_NAMESPACE=rook-ceph                                 # default set to rook-ceph
+OPERATOR_NAMESPACE=rook-ceph # default set to rook-ceph
 CSI_DRIVER_NAME_PREFIX=${CSI_DRIVER_NAME_PREFIX:-$OPERATOR_NAMESPACE}
 RBD_PROVISIONER=$CSI_DRIVER_NAME_PREFIX".rbd.csi.ceph.com"       # csi-provisioner-name
 CEPHFS_PROVISIONER=$CSI_DRIVER_NAME_PREFIX".cephfs.csi.ceph.com" # csi-provisioner-name
@@ -298,6 +299,62 @@ eof
   fi
 }
 
+function getTopologyTemplate() {
+  topology=$(
+    cat <<-END
+     {"poolName":"$1",
+      "domainSegments":[
+        {"domainLabel":"$2","value":"$3"}]},
+END
+  )
+}
+
+function createTopology() {
+  TOPOLOGY=""
+  declare -a topology_failure_domain_values_array=()
+  declare -a topology_pools_array=()
+  topology_pools=("$(echo "$TOPOLOGY_POOLS" | tr "," "\n")")
+  for i in $topology_pools; do topology_pools_array+=($i); done
+  topology_failure_domain_values=("$(echo "$TOPOLOGY_FAILURE_DOMAIN_VALUES" | tr "," "\n")")
+  for i in $topology_failure_domain_values; do topology_failure_domain_values_array+=($i); done
+  for ((i = 0; i < ${#topology_failure_domain_values_array[@]}; i++)); do
+    getTopologyTemplate "${topology_pools_array[$i]}" "$TOPOLOGY_FAILURE_DOMAIN_LABEL" "${topology_failure_domain_values_array[$i]}"
+    TOPOLOGY="$TOPOLOGY"$'\n'"$topology"
+    topology=""
+  done
+}
+
+function createRBDTopologyStorageClass() {
+  if ! kubectl -n "$NAMESPACE" get storageclass $RBD_TOPOLOGY_STORAGE_CLASS_NAME &>/dev/null; then
+    cat <<eof | kubectl create -f -
+apiVersion: storage.k8s.io/v1
+kind: StorageClass
+metadata:
+  name: $RBD_TOPOLOGY_STORAGE_CLASS_NAME
+provisioner: $RBD_PROVISIONER
+parameters:
+  clusterID: $CLUSTER_ID_RBD
+  imageFormat: "2"
+  imageFeatures: $ROOK_RBD_FEATURES
+  topologyConstrainedPools: |
+    [$TOPOLOGY
+    ]
+  csi.storage.k8s.io/provisioner-secret-name: "rook-$CSI_RBD_PROVISIONER_SECRET_NAME"
+  csi.storage.k8s.io/provisioner-secret-namespace: $NAMESPACE
+  csi.storage.k8s.io/controller-expand-secret-name:  "rook-$CSI_RBD_PROVISIONER_SECRET_NAME"
+  csi.storage.k8s.io/controller-expand-secret-namespace: $NAMESPACE
+  csi.storage.k8s.io/node-stage-secret-name: "rook-$CSI_RBD_NODE_SECRET_NAME"
+  csi.storage.k8s.io/node-stage-secret-namespace: $NAMESPACE
+  csi.storage.k8s.io/fstype: ext4
+allowVolumeExpansion: true
+reclaimPolicy: Delete
+volumeBindingMode: WaitForFirstConsumer
+eof
+  else
+    echo "storageclass $RBD_TOPOLOGY_STORAGE_CLASS_NAME already exists"
+  fi
+}
+
 function createCephFSStorageClass() {
   if ! $KUBECTL -n "$NAMESPACE" get storageclass $CEPHFS_STORAGE_CLASS_NAME &>/dev/null; then
     cat <<eof | $KUBECTL create -f -
@@ -357,3 +414,7 @@ fi
 if [ -n "$CEPHFS_FS_NAME" ] && [ -n "$CEPHFS_POOL_NAME" ]; then
   createCephFSStorageClass
 fi
+if [ -n "$TOPOLOGY_POOLS" ] && [ -n "$TOPOLOGY_FAILURE_DOMAIN_LABEL" ] && [ -n "$TOPOLOGY_FAILURE_DOMAIN_VALUES" ]; then
+  createTopology
+  createRBDTopologyStorageClass
+fi