diff --git a/charts/ceph-csi-rbd/values.yaml b/charts/ceph-csi-rbd/values.yaml index 0d43b1671109..64b74e2abd2d 100644 --- a/charts/ceph-csi-rbd/values.yaml +++ b/charts/ceph-csi-rbd/values.yaml @@ -27,6 +27,11 @@ serviceAccounts: # - "" # rbd: # netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net" +# readAffinity: +# enabled: true +# crushLocationLabels: +# - topology.kubernetes.io/region +# - topology.kubernetes.io/zone csiConfig: [] # Configuration details of clusterID,PoolID and FscID mapping diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index dac23d21e44a..fa0baf029ff2 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -88,6 +88,8 @@ func init() { "", "list of Kubernetes node labels, that determines the"+ " CRUSH location the node belongs to, separated by ','") + flag.StringVar(&conf.ContainerOrchestrator, "container-orchestrator", "kubernetes", + "container orchestrator where cephcsi is running") // cephfs related flags flag.BoolVar( diff --git a/deploy/csi-config-map-sample.yaml b/deploy/csi-config-map-sample.yaml index b48e834a56f6..bc5aa796f852 100644 --- a/deploy/csi-config-map-sample.yaml +++ b/deploy/csi-config-map-sample.yaml @@ -32,12 +32,15 @@ kind: ConfigMap # path for the Ceph cluster identified by the , This will be used # by the RBD CSI plugin to execute the rbd map/unmap in the # network namespace specified by the "rbd.netNamespaceFilePath". +# The "readAffinity" fields are used to enable read affinity and pass the crush +# location map for the Ceph cluster identified by the cluster , +# enabling this will add +# "read_from_replica=localize,crush_location=" to the map option. # If a CSI plugin is using more than one Ceph cluster, repeat the section for # each such cluster in use. # NOTE: Changes to the configmap is automatically updated in the running pods, # thus restarting existing pods using the configmap is NOT required on edits # to the configmap. - # Lets see the different configuration under cluster-mapping.json key. # This configuration is needed when volumes are mirrored using the Ceph-CSI. # clusterIDMapping holds the mapping between two clusterId's of storage @@ -66,6 +69,15 @@ data: } "nfs": { "netNamespaceFilePath": "/plugins/nfs.csi.ceph.com/net", + }, + "readAffinity": { + "enabled": "false", + "crushLocationLabels": [ + "", + "" + ... + "" + ] } } ] diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index d542fe3035df..e485e5cb16c4 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -49,7 +49,7 @@ make image-cephcsi | `--maxsnapshotsonimage` | `450` | Maximum number of snapshots allowed on rbd image without flattening | | `--setmetadata` | `false` | Set metadata on volume | | `--enable-read-affinity` | `false` | enable read affinity | -| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ',' | +| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ','.
`Note: These labels will be replaced if crush location labels are defined in the ceph-csi-config ConfigMap for the specific cluster.` | **Available volume parameters:** @@ -224,6 +224,12 @@ If enabled, this option will be added to all RBD volumes mapped by Ceph CSI. Well known labels can be found [here](https://kubernetes.io/docs/reference/labels-annotations-taints/). +Read affinity can be configured for individual clusters within the +`ceph-csi-config` ConfigMap. This allows configuring the crush location labels +for each ceph cluster separately. The crush location labels specified in the +ConfigMap will supersede those provided via command line argument +`--crush-location-labels`. + >Note: Label values will have all its dots `"."` normalized with dashes `"-"` in order for it to work with ceph CRUSH map. diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 950a063afce4..809e031968d3 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -26,6 +26,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" "github.com/container-storage-interface/spec/lib/go/csi" @@ -68,14 +69,14 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer { func NewNodeServer( d *csicommon.CSIDriver, t string, - topology map[string]string, - crushLocationMap map[string]string, + nodeLabels, topology, crushLocationMap map[string]string, ) (*rbd.NodeServer, error) { ns := rbd.NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), - VolumeLocks: util.NewVolumeLocks(), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), + VolumeLocks: util.NewVolumeLocks(), + NodeLabels: nodeLabels, + CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap), } - ns.SetReadAffinityMapOptions(crushLocationMap) return &ns, nil } @@ -87,8 +88,8 @@ func NewNodeServer( // setupCSIAddonsServer(). func (r *Driver) Run(conf *util.Config) { var ( - err error - topology, crushLocationMap map[string]string + err error + nodeLabels, topology, crushLocationMap map[string]string ) // update clone soft and hard limit rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth) @@ -125,13 +126,17 @@ func (r *Driver) Run(conf *util.Config) { }) } - if conf.EnableReadAffinity { - crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID) + if conf.ContainerOrchestrator == "kubernetes" { + nodeLabels, err = k8s.GetNodeLabels(conf.NodeID) if err != nil { log.FatalLogMsg(err.Error()) } } + if conf.EnableReadAffinity { + crushLocationMap = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels) + } + // Create GRPC servers r.ids = NewIdentityServer(r.cd) @@ -140,7 +145,7 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap) + r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) if err != nil { log.FatalLogMsg("failed to start node server, err %v\n", err) } diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 25f51b35c125..96e5ed84aa4d 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -45,8 +45,10 @@ type NodeServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error VolumeLocks *util.VolumeLocks - // readAffinityMapOptions contains map options to enable read affinity. - readAffinityMapOptions string + // NodeLabels stores the node labels + NodeLabels map[string]string + // CLIReadAffinityMapOptions contains map options passed through command line to enable read affinity. + CLIReadAffinityMapOptions string } // stageTransaction struct represents the state a transaction was when it either completed @@ -258,11 +260,10 @@ func (ns *NodeServer) populateRbdVol( rv.Mounter = rbdNbdMounter } - err = getMapOptions(req, rv) + err = ns.getMapOptions(req, rv) if err != nil { return nil, err } - ns.appendReadAffinityMapOptions(rv) rv.VolID = volID @@ -280,14 +281,14 @@ func (ns *NodeServer) populateRbdVol( // appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions // if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty. -func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) { +func (rv *rbdVolume) appendReadAffinityMapOptions(readAffinityMapOptions string) { switch { - case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: + case readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: return case rv.MapOptions != "": - rv.MapOptions += "," + ns.readAffinityMapOptions + rv.MapOptions += "," + readAffinityMapOptions default: - rv.MapOptions = ns.readAffinityMapOptions + rv.MapOptions = readAffinityMapOptions } } @@ -1395,22 +1396,3 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) { return size, nil } - -func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) { - if len(crushLocationMap) == 0 { - return - } - - var b strings.Builder - b.WriteString("read_from_replica=localize,crush_location=") - first := true - for key, val := range crushLocationMap { - if first { - b.WriteString(fmt.Sprintf("%s:%s", key, val)) - first = false - } else { - b.WriteString(fmt.Sprintf("|%s:%s", key, val)) - } - } - ns.readAffinityMapOptions = b.String() -} diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index 822232c6fac9..bffc114dcd77 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -18,8 +18,12 @@ package rbd import ( "context" + "encoding/json" + "os" "testing" + "github.com/ceph/ceph-csi/internal/util" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/stretchr/testify/assert" ) @@ -107,53 +111,6 @@ func TestParseBoolOption(t *testing.T) { } } -func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { - t.Parallel() - tests := []struct { - name string - crushLocationmap map[string]string - wantAny []string - }{ - { - name: "nil crushLocationmap", - crushLocationmap: nil, - wantAny: []string{""}, - }, - { - name: "empty crushLocationmap", - crushLocationmap: map[string]string{}, - wantAny: []string{""}, - }, - { - name: "single entry in crushLocationmap", - crushLocationmap: map[string]string{ - "region": "east", - }, - wantAny: []string{"read_from_replica=localize,crush_location=region:east"}, - }, - { - name: "multiple entries in crushLocationmap", - crushLocationmap: map[string]string{ - "region": "east", - "zone": "east-1", - }, - wantAny: []string{ - "read_from_replica=localize,crush_location=region:east|zone:east-1", - "read_from_replica=localize,crush_location=zone:east-1|region:east", - }, - }, - } - for _, tt := range tests { - currentTT := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - ns := &NodeServer{} - ns.SetReadAffinityMapOptions(currentTT.crushLocationmap) - assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions) - }) - } -} - func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) { t.Parallel() type input struct { @@ -236,11 +193,128 @@ func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) { MapOptions: currentTT.args.mapOptions, Mounter: currentTT.args.mounter, } + rv.appendReadAffinityMapOptions(currentTT.args.readAffinityMapOptions) + assert.Equal(t, currentTT.want, rv.MapOptions) + }) + } +} + +func TestReadAffinity_GetReadAffinityMapOptions(t *testing.T) { + t.Parallel() + + nodeLabels := map[string]string{ + "topology.kubernetes.io/zone": "east-1", + "topology.kubernetes.io/region": "east", + } + + csiConfig := []util.ClusterInfo{ + { + ClusterID: "cluster-1", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-2", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: false, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-3", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{}, + }, + }, + { + ClusterID: "cluster-4", + }, + } + + csiConfigFileContent, err := json.Marshal(csiConfig) + if err != nil { + t.Errorf("failed to marshal csi config info %v", err) + } + tmpConfPath := util.CsiConfigFile + err = os.Mkdir("/etc/ceph-csi-config", 0o600) + if err != nil { + t.Errorf("failed to create directory %s: %v", "/etc/ceph-csi-config", err) + } + err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) + if err != nil { + t.Errorf("failed to write %s file content: %v", util.CsiConfigFile, err) + } + + tests := []struct { + name string + clusterID string + CLICrushLocationLabels string + want string + }{ + { + name: "Enabled in cluster-1 and Enabled in CLI", + clusterID: "cluster-1", + CLICrushLocationLabels: "topology.kubernetes.io/region", + want: "read_from_replica=localize,crush_location=region:east", + }, + { + name: "Disabled in cluster-2 and Enabled in CLI", + clusterID: "cluster-2", + CLICrushLocationLabels: "topology.kubernetes.io/zone", + want: "", + }, + { + name: "Enabled in cluster-3 with empty crush labels and Enabled in CLI", + clusterID: "cluster-3", + CLICrushLocationLabels: "topology.kubernetes.io/zone", + want: "read_from_replica=localize,crush_location=zone:east-1", + }, + { + name: "Enabled in cluster-3 with empty crush labels and Disabled in CLI", + clusterID: "cluster-3", + CLICrushLocationLabels: "", + want: "", + }, + { + name: "Absent in cluster-4 and Enabled in CLI", + clusterID: "cluster-4", + CLICrushLocationLabels: "topology.kubernetes.io/zone", + want: "", + }, + } + + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + crushLocationMap := util.GetCrushLocationMap(tc.CLICrushLocationLabels, nodeLabels) ns := &NodeServer{ - readAffinityMapOptions: currentTT.args.readAffinityMapOptions, + CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap), } - ns.appendReadAffinityMapOptions(rv) - assert.Equal(t, currentTT.want, rv.MapOptions) + readAffinityMapOptions, err := util.GetReadAffinityMapOptions( + tc.clusterID, ns.CLIReadAffinityMapOptions, nodeLabels, + ) + if err != nil { + assert.Fail(t, err.Error()) + } + + assert.Equal(t, tc.want, readAffinityMapOptions) }) } } diff --git a/internal/rbd/rbd_attach.go b/internal/rbd/rbd_attach.go index c8d4701e29f6..37d121d79b66 100644 --- a/internal/rbd/rbd_attach.go +++ b/internal/rbd/rbd_attach.go @@ -295,7 +295,7 @@ func parseMapOptions(mapOptions string) (string, string, error) { // getMapOptions is a wrapper func, calls parse map/unmap funcs and feeds the // rbdVolume object. -func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { +func (ns *NodeServer) getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { krbdMapOptions, nbdMapOptions, err := parseMapOptions(req.GetVolumeContext()["mapOptions"]) if err != nil { return err @@ -312,6 +312,14 @@ func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { rv.UnmapOptions = nbdUnmapOptions } + readAffinityMapOptions, err := util.GetReadAffinityMapOptions( + rv.ClusterID, ns.CLIReadAffinityMapOptions, ns.NodeLabels, + ) + if err != nil { + return err + } + rv.appendReadAffinityMapOptions(readAffinityMapOptions) + return nil } diff --git a/internal/util/crushlocation.go b/internal/util/crushlocation.go index 5f3751c339d5..bc68bf4e6720 100644 --- a/internal/util/crushlocation.go +++ b/internal/util/crushlocation.go @@ -23,20 +23,15 @@ import ( ) // GetCrushLocationMap returns the crush location map, determined from -// the crush location labels and their values from the CO system. +// the crush location labels and their values from the node labels passed in arg. // Expects crushLocationLabels in arg to be in the format "[prefix/],[prefix/],...",. // Returns map of crush location types with its array of associated values. -func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) { +func GetCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) map[string]string { if crushLocationLabels == "" { - return nil, nil - } - - nodeLabels, err := k8sGetNodeLabels(nodeName) - if err != nil { - return nil, err + return nil } - return getCrushLocationMap(crushLocationLabels, nodeLabels), nil + return getCrushLocationMap(crushLocationLabels, nodeLabels) } // getCrushLocationMap returns the crush location map, determined from diff --git a/internal/util/csiconfig.go b/internal/util/csiconfig.go index 48a2e09c1fe4..aad750db3fd3 100644 --- a/internal/util/csiconfig.go +++ b/internal/util/csiconfig.go @@ -64,6 +64,11 @@ type ClusterInfo struct { // symlink filepath for the network namespace where we need to execute commands. NetNamespaceFilePath string `json:"netNamespaceFilePath"` } `json:"nfs"` + // Read affinity map options + ReadAffinity struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + } `json:"readAffinity"` } // Expected JSON structure in the passed in config file is, @@ -209,3 +214,20 @@ func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error) return cluster.NFS.NetNamespaceFilePath, nil } + +// GetCrushLocationLabels returns the crushLocationLabels from csi config for the given clusterID +// If `readAffinity.enabled` is set to true, else returns an empty string. +func GetCrushLocationLabels(pathToConfig, clusterID string) (bool, string, error) { + cluster, err := readClusterInfo(pathToConfig, clusterID) + if err != nil { + return false, "", err + } + + if !cluster.ReadAffinity.Enabled { + return false, "", nil + } + + crushLocationLabels := strings.Join(cluster.ReadAffinity.CrushLocationLabels, ",") + + return true, crushLocationLabels, nil +} diff --git a/internal/util/csiconfig_test.go b/internal/util/csiconfig_test.go index 66b5c927d982..fb8cd8557c7d 100644 --- a/internal/util/csiconfig_test.go +++ b/internal/util/csiconfig_test.go @@ -365,3 +365,101 @@ func TestGetNFSNetNamespaceFilePath(t *testing.T) { }) } } + +func TestGetReadAffinityOptions(t *testing.T) { + t.Parallel() + tests := []struct { + name string + clusterID string + want string + }{ + { + name: "ReadAffinity enabled set to true for cluster-1", + clusterID: "cluster-1", + want: "topology.kubernetes.io/region,topology.kubernetes.io/zone,topology.io/rack", + }, + { + name: "ReadAffinity enabled set to true for cluster-2", + clusterID: "cluster-2", + want: "topology.kubernetes.io/region", + }, + { + name: "ReadAffinity enabled set to false for cluster-3", + clusterID: "cluster-3", + want: "", + }, + { + name: "ReadAffinity option not set in cluster-4", + clusterID: "cluster-4", + want: "", + }, + } + + csiConfig := []ClusterInfo{ + { + ClusterID: "cluster-1", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + "topology.kubernetes.io/zone", + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-2", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-3", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: false, + CrushLocationLabels: []string{ + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-4", + }, + } + csiConfigFileContent, err := json.Marshal(csiConfig) + if err != nil { + t.Errorf("failed to marshal csi config info %v", err) + } + tmpConfPath := t.TempDir() + "/ceph-csi.json" + err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) + if err != nil { + t.Errorf("failed to write %s file content: %v", CsiConfigFile, err) + } + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + _, got, err := GetCrushLocationLabels(tmpConfPath, tc.clusterID) + if err != nil { + t.Errorf("GetCrushLocationLabels() error = %v", err) + + return + } + if got != tc.want { + t.Errorf("GetCrushLocationLabels() = %v, want %v", got, tc.want) + } + }) + } +} diff --git a/internal/util/k8s/client.go b/internal/util/k8s/client.go index 684fd7090308..29db2ce3ab86 100644 --- a/internal/util/k8s/client.go +++ b/internal/util/k8s/client.go @@ -25,8 +25,14 @@ import ( "k8s.io/client-go/tools/clientcmd" ) +var kubeclient *kubernetes.Clientset + // NewK8sClient create kubernetes client. func NewK8sClient() (*kubernetes.Clientset, error) { + if kubeclient != nil { + return kubeclient, nil + } + var cfg *rest.Config var err error cPath := os.Getenv("KUBERNETES_CONFIG_PATH") @@ -46,5 +52,7 @@ func NewK8sClient() (*kubernetes.Clientset, error) { return nil, fmt.Errorf("failed to create client: %w", err) } + kubeclient = client + return client, nil } diff --git a/internal/util/k8s/node.go b/internal/util/k8s/node.go new file mode 100644 index 000000000000..ad7760cd8803 --- /dev/null +++ b/internal/util/k8s/node.go @@ -0,0 +1,39 @@ +/* +Copyright 2023 The CephCSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func GetNodeLabels(nodeName string) (map[string]string, error) { + client, err := NewK8sClient() + if err != nil { + return nil, fmt.Errorf("can not get node %q information, failed "+ + "to connect to Kubernetes: %w", nodeName, err) + } + + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) + } + + return node.GetLabels(), nil +} diff --git a/internal/util/read_affinity.go b/internal/util/read_affinity.go new file mode 100644 index 000000000000..c877aea7474a --- /dev/null +++ b/internal/util/read_affinity.go @@ -0,0 +1,76 @@ +// /* +// Copyright 2023 The Ceph-CSI Authors. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package util + +import ( + "fmt" + "strings" +) + +// ConstructReadAffinityMapOption constructs a read affinity map option based on the provided crushLocationMap. +// It appends crush location labels in the format +// "read_from_replica=localize,crush_location=label1:value1|label2:value2|...". +func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string { + if len(crushLocationMap) == 0 { + return "" + } + + var b strings.Builder + b.WriteString("read_from_replica=localize,crush_location=") + first := true + for key, val := range crushLocationMap { + if first { + b.WriteString(fmt.Sprintf("%s:%s", key, val)) + first = false + } else { + b.WriteString(fmt.Sprintf("|%s:%s", key, val)) + } + } + + return b.String() +} + +// GetReadAffinityMapOptions retrieves the readAffinityMapOptions from the CSI config file if it exists. +// If not, it falls back to returning the `cliReadAffinityMapOptions` from the command line. +// If neither of these options is available, it returns an empty string. +func GetReadAffinityMapOptions( + clusterID, cliReadAffinityMapOptions string, nodeLabels map[string]string, +) (string, error) { + var ( + err error + configReadAffinityEnabled bool + configCrushLocationLabels string + ) + + configReadAffinityEnabled, configCrushLocationLabels, err = GetCrushLocationLabels(CsiConfigFile, clusterID) + if err != nil { + return "", err + } + + if !configReadAffinityEnabled { + return "", nil + } + + if configCrushLocationLabels == "" { + return cliReadAffinityMapOptions, nil + } + + crushLocationMap := GetCrushLocationMap(configCrushLocationLabels, nodeLabels) + readAffinityMapOptions := ConstructReadAffinityMapOption(crushLocationMap) + + return readAffinityMapOptions, nil +} diff --git a/internal/util/read_affinity_test.go b/internal/util/read_affinity_test.go new file mode 100644 index 000000000000..89da727412ff --- /dev/null +++ b/internal/util/read_affinity_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadAffinity_ConstructReadAffinityMapOption(t *testing.T) { + t.Parallel() + tests := []struct { + name string + crushLocationmap map[string]string + wantAny []string + }{ + { + name: "nil crushLocationmap", + crushLocationmap: nil, + wantAny: []string{""}, + }, + { + name: "empty crushLocationmap", + crushLocationmap: map[string]string{}, + wantAny: []string{""}, + }, + { + name: "single entry in crushLocationmap", + crushLocationmap: map[string]string{ + "region": "east", + }, + wantAny: []string{"read_from_replica=localize,crush_location=region:east"}, + }, + { + name: "multiple entries in crushLocationmap", + crushLocationmap: map[string]string{ + "region": "east", + "zone": "east-1", + }, + wantAny: []string{ + "read_from_replica=localize,crush_location=region:east|zone:east-1", + "read_from_replica=localize,crush_location=zone:east-1|region:east", + }, + }, + } + for _, tt := range tests { + currentTT := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Contains(t, currentTT.wantAny, ConstructReadAffinityMapOption(currentTT.crushLocationmap)) + }) + } +} diff --git a/internal/util/topology.go b/internal/util/topology.go index be99dbbe14a0..1f08ca6ff049 100644 --- a/internal/util/topology.go +++ b/internal/util/topology.go @@ -17,16 +17,14 @@ limitations under the License. package util import ( - "context" "encoding/json" "fmt" "strings" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" - - "github.com/container-storage-interface/spec/lib/go/csi" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -34,21 +32,6 @@ const ( labelSeparator string = "," ) -func k8sGetNodeLabels(nodeName string) (map[string]string, error) { - client, err := k8s.NewK8sClient() - if err != nil { - return nil, fmt.Errorf("can not get node %q information, failed "+ - "to connect to Kubernetes: %w", nodeName, err) - } - - node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) - } - - return node.GetLabels(), nil -} - // GetTopologyFromDomainLabels returns the CSI topology map, determined from // the domain labels and their values from the CO system // Expects domainLabels in arg to be in the format "[prefix/],[prefix/],...",. @@ -82,7 +65,7 @@ func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map labelCount++ } - nodeLabels, err := k8sGetNodeLabels(nodeName) + nodeLabels, err := k8s.GetNodeLabels(nodeName) if err != nil { return nil, err } diff --git a/internal/util/util.go b/internal/util/util.go index 659b4032080e..c5832ab09eb1 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -151,6 +151,9 @@ type Config struct { // Read affinity related options EnableReadAffinity bool // enable OSD read affinity. CrushLocationLabels string // list of CRUSH location labels to read from the node. + + // ContainerOrchestrator represents the choice of container orchestration system to be used. + ContainerOrchestrator string } // ValidateDriverName validates the driver name.