From b4d2d9fc99cf95868a395cfd54d79dd024e4d2a1 Mon Sep 17 00:00:00 2001 From: duanmengkk Date: Sat, 25 Jan 2025 11:56:43 +0800 Subject: [PATCH] add etcd watcher for blockaffinity Signed-off-by: duanmengkk --- pkg/clusterlink/controllers/cluster/helper.go | 85 +++-- .../controllers/nodecidr/adaper/calico.go | 154 +++++++++ .../nodecidr/adaper/calico_etcd.go | 55 ++++ .../controllers/nodecidr/adaper/common.go | 99 ++++++ .../controllers/nodecidr/adaper/interface.go | 64 ++++ .../controllers/nodecidr/adapter.go | 292 ------------------ .../nodecidr/nodecidr_controller.go | 30 +- 7 files changed, 445 insertions(+), 334 deletions(-) create mode 100644 pkg/clusterlink/controllers/nodecidr/adaper/calico.go create mode 100644 pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go create mode 100644 pkg/clusterlink/controllers/nodecidr/adaper/common.go create mode 100644 pkg/clusterlink/controllers/nodecidr/adaper/interface.go delete mode 100644 pkg/clusterlink/controllers/nodecidr/adapter.go diff --git a/pkg/clusterlink/controllers/cluster/helper.go b/pkg/clusterlink/controllers/cluster/helper.go index fa188212c..9e4d4a2c6 100644 --- a/pkg/clusterlink/controllers/cluster/helper.go +++ b/pkg/clusterlink/controllers/cluster/helper.go @@ -8,6 +8,8 @@ import ( "strings" "github.com/projectcalico/calico/libcalico-go/lib/apiconfig" + "github.com/projectcalico/calico/libcalico-go/lib/backend/api" + "github.com/projectcalico/calico/libcalico-go/lib/backend/etcdv3" "github.com/projectcalico/calico/libcalico-go/lib/clientv3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,68 +48,93 @@ func CheckIsEtcd(cluster *clusterlinkv1alpha1.Cluster) bool { } func GetCalicoClient(cluster *clusterlinkv1alpha1.Cluster) (clientv3.Interface, error) { - var calicoConfig CalicoConfig - config, err := ctrl.GetConfig() + calicoConfig, err := getCalicoAPIConfig(cluster) if err != nil { - klog.Errorf("failed to get k8s config: %v", err) + klog.Errorf("Error getting calicAPIConfig: %v", err) return nil, err } - clientSet, err := kubernetes.NewForConfig(config) + calicoClient, err := clientv3.New(*calicoConfig) if err != nil { - klog.Errorf("failed to build k8s kubeClient: %v", err) + klog.Errorf("failed to new kubeClient, cluster name is %s.", cluster.Name) return nil, err } - clusterConfigMap, err := clientSet.CoreV1().ConfigMaps(utils.DefaultNamespace).Get(context.Background(), cluster.Name, metav1.GetOptions{}) + return calicoClient, nil +} + +func GetETCDClient(cluster *clusterlinkv1alpha1.Cluster) (api.Client, error) { + calicoConfig, err := getCalicoAPIConfig(cluster) if err != nil { - klog.Errorf("failed to get cluster configmap, cluster name is %s.", cluster.Name) + klog.Errorf("Error getting calicAPIConfig: %v", err) return nil, err } - calicoAPIConfig := apiconfig.NewCalicoAPIConfig() - calicoData := clusterConfigMap.Data - calicoJSONStr, err := json.Marshal(calicoData) + etcdV3Client, err := etcdv3.NewEtcdV3Client(&calicoConfig.Spec.EtcdConfig) if err != nil { - klog.Errorf("failed to marshal cluster configmap %s to json string.", cluster.Name) + klog.Errorf("failed to new etcdClient, cluster name is %s.", cluster.Name) return nil, err } - err = json.Unmarshal(calicoJSONStr, &calicoConfig) + + return etcdV3Client, nil +} + +func getCalicoAPIConfig(cluster *clusterlinkv1alpha1.Cluster) (*apiconfig.CalicoAPIConfig, error) { + config, err := ctrl.GetConfig() if err != nil { - klog.Errorf("failed to unmarshal json string to calico config, cluster configmap is %s.", cluster.Name) - return nil, err + return nil, fmt.Errorf("failed to get k8s config: %v", err) } - // Decoding etcd config. - decodeEtcdKey, err := base64.StdEncoding.DecodeString(calicoConfig.EtcdKey) + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to build k8s kubeClient: %v", err) + } + + clusterConfigMap, err := clientSet.CoreV1().ConfigMaps(utils.DefaultNamespace).Get(context.Background(), cluster.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get cluster ConfigMap for cluster %s: %v", cluster.Name, err) + } + + var calicoConfig CalicoConfig + calicoData := clusterConfigMap.Data + calicoJSONStr, err := json.Marshal(calicoData) + if err != nil { + return nil, fmt.Errorf("failed to marshal ConfigMap data for cluster %s to JSON: %v", cluster.Name, err) + } + + if err := json.Unmarshal(calicoJSONStr, &calicoConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON to CalicoConfig for cluster %s: %v", cluster.Name, err) + } + + etcdKey, err := decodeBase64(calicoConfig.EtcdKey, "etcd key", cluster.Name) if err != nil { - klog.Errorf("decoding exception, etcd key is invalid, cluster name is %s.", cluster.Name) return nil, err } - decodeEtcdCert, err := base64.StdEncoding.DecodeString(calicoConfig.EtcdCert) + etcdCert, err := decodeBase64(calicoConfig.EtcdCert, "etcd cert", cluster.Name) if err != nil { - klog.Errorf("decoding exception, etcd cert is invalid, cluster name is %s.", cluster.Name) return nil, err } - decodeEtcdCACert, err := base64.StdEncoding.DecodeString(calicoConfig.EtcdCACert) + etcdCACert, err := decodeBase64(calicoConfig.EtcdCACert, "etcd CA cert", cluster.Name) if err != nil { - klog.Errorf("decoding exception, etcd ca.cert is invalid, cluster name is %s.", cluster.Name) return nil, err } + calicoAPIConfig := apiconfig.NewCalicoAPIConfig() calicoAPIConfig.Spec.DatastoreType = apiconfig.DatastoreType(calicoConfig.DatastoreType) calicoAPIConfig.Spec.EtcdEndpoints = calicoConfig.EtcdEndpoints - calicoAPIConfig.Spec.EtcdKey = string(decodeEtcdKey) - calicoAPIConfig.Spec.EtcdCert = string(decodeEtcdCert) - calicoAPIConfig.Spec.EtcdCACert = string(decodeEtcdCACert) + calicoAPIConfig.Spec.EtcdKey = etcdKey + calicoAPIConfig.Spec.EtcdCert = etcdCert + calicoAPIConfig.Spec.EtcdCACert = etcdCACert + + return calicoAPIConfig, nil +} - calicoClient, err := clientv3.New(*calicoAPIConfig) +func decodeBase64(encoded, fieldName, clusterName string) (string, error) { + decoded, err := base64.StdEncoding.DecodeString(encoded) if err != nil { - klog.Errorf("failed to new kubeClient, cluster name is %s.", cluster.Name) - return nil, err + return "", fmt.Errorf("failed to decode %s for cluster %s: %v", fieldName, clusterName, err) } - - return calicoClient, nil + return string(decoded), nil } func ResolveServiceCIDRs(pod *corev1.Pod) ([]string, error) { diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/calico.go b/pkg/clusterlink/controllers/nodecidr/adaper/calico.go new file mode 100644 index 000000000..6dcfc61f6 --- /dev/null +++ b/pkg/clusterlink/controllers/nodecidr/adaper/calico.go @@ -0,0 +1,154 @@ +package adaper + +import ( + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" +) + +type CalicoAdapter struct { + sync bool + config *rest.Config + blockLister cache.GenericLister + clusterNodeLister clusterlister.ClusterNodeLister + processor lifted.AsyncWorker +} + +// nolint:revive +func NewCalicoAdapter(config *rest.Config, + clusterNodeLister clusterlister.ClusterNodeLister, + processor lifted.AsyncWorker) *CalicoAdapter { + return &CalicoAdapter{ + config: config, + clusterNodeLister: clusterNodeLister, + processor: processor, + } +} + +func (c *CalicoAdapter) Start(stopCh <-chan struct{}) error { + client, err := dynamic.NewForConfig(c.config) + if err != nil { + klog.Errorf("init dynamic client err: %v", err) + return err + } + gvr := schema.GroupVersionResource{ + Group: "crd.projectcalico.org", + Version: "v1", + Resource: "blockaffinities", + } + informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, 0) + _, err = informerFactory.ForResource(gvr).Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.OnAdd, + UpdateFunc: c.OnUpdate, + DeleteFunc: c.OnDelete, + }) + if err != nil { + klog.Errorf("add event handler error: %v", err) + return err + } + + c.blockLister = informerFactory.ForResource(gvr).Lister() + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + + c.sync = true + klog.Info("calico blockaffinities informer started!") + return nil +} + +func (c *CalicoAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) { + blockAffinities, err := c.blockLister.List(labels.Everything()) + if err != nil { + klog.Errorf("list blockAffinities error: %v", err) + return nil, err + } + var podCIDRS []string + for _, ba := range blockAffinities { + uobj := ba.(*unstructured.Unstructured) + node, found, err := unstructured.NestedString(uobj.Object, "spec", "node") + if err != nil { + klog.Errorf("get spec.node from blockAffinity err: ", err) + } + if !found { + continue + } + cidr, found, err := unstructured.NestedString(uobj.Object, "spec", "cidr") + if err != nil { + klog.Errorf("get spec.cidr from blockAffinity err: ", err) + } + if !found { + continue + } + if strings.Compare(node, nodeName) == 0 { + podCIDRS = append(podCIDRS, cidr) + } + } + + return podCIDRS, nil +} + +func (c *CalicoAdapter) Synced() bool { + return c.sync +} + +func (c *CalicoAdapter) OnAdd(obj interface{}) { + klog.V(7).Info("add event") + runtimeObj, ok := obj.(*unstructured.Unstructured) + if !ok { + return + } + node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node") + if err != nil { + klog.Errorf("get spec.node from blockAffinity err: ", err) + } + if !found { + return + } + klog.V(7).Info("add event Enqueue") + requeue(node, c.clusterNodeLister, c.processor) +} + +// OnUpdate handles object update event and push the object to queue. +func (c *CalicoAdapter) OnUpdate(_, newObj interface{}) { + klog.V(7).Info("update event") + runtimeObj, ok := newObj.(*unstructured.Unstructured) + if !ok { + return + } + node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node") + if err != nil { + klog.Errorf("get spec.node from blockAffinity err: ", err) + } + if !found { + return + } + klog.V(7).Info("update event Enqueue") + requeue(node, c.clusterNodeLister, c.processor) +} + +// OnDelete handles object delete event and push the object to queue. +func (c *CalicoAdapter) OnDelete(obj interface{}) { + runtimeObj, ok := obj.(*unstructured.Unstructured) + if !ok { + return + } + node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node") + if err != nil { + klog.Errorf("get spec.node from blockAffinity err: ", err) + } + if !found { + return + } + requeue(node, c.clusterNodeLister, c.processor) +} diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go new file mode 100644 index 000000000..1ca57d3fc --- /dev/null +++ b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go @@ -0,0 +1,55 @@ +package adaper + +import ( + clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" + "github.com/projectcalico/calico/libcalico-go/lib/backend/api" +) + +type CalicoETCDAdapter struct { + sync bool + etcdClient api.Client + clusterNodeLister clusterlister.ClusterNodeLister + processor lifted.AsyncWorker +} + +// nolint:revive +func NewCalicoETCDAdapter(etcdClient api.Client, + clusterNodeLister clusterlister.ClusterNodeLister, + processor lifted.AsyncWorker) *CalicoETCDAdapter { + return &CalicoETCDAdapter{ + etcdClient: etcdClient, + clusterNodeLister: clusterNodeLister, + processor: processor, + } +} + +func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error { + // todo use c.etcdClient to list and watch blockaffinity in etcd + return nil +} + +func (c *CalicoETCDAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) { + // see calicoctl/calicoctl/commands/datastore/migrate/migrateipam.go + // and libcalico-go/lib/backend/model/block_affinity.go + // todo use c.etcdClient to get blockaffinity in etcd + return nil, nil +} + +func (c *CalicoETCDAdapter) Synced() bool { + return c.sync +} + +func (c *CalicoETCDAdapter) OnAdd(obj interface{}) { + // todo add event info to c.processor +} + +// OnUpdate handles object update event and push the object to queue. +func (c *CalicoETCDAdapter) OnUpdate(_, newObj interface{}) { + // todo add event info to c.processor +} + +// OnDelete handles object delete event and push the object to queue. +func (c *CalicoETCDAdapter) OnDelete(obj interface{}) { + // todo add event info to c.processor +} diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/common.go b/pkg/clusterlink/controllers/nodecidr/adaper/common.go new file mode 100644 index 000000000..e0f80d3e4 --- /dev/null +++ b/pkg/clusterlink/controllers/nodecidr/adaper/common.go @@ -0,0 +1,99 @@ +package adaper + +import ( + corev1 "k8s.io/api/core/v1" + informer "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + lister "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" +) + +type CommonAdapter struct { + sync bool + config *rest.Config + nodeLister lister.NodeLister + clusterNodeLister clusterlister.ClusterNodeLister + processor lifted.AsyncWorker +} + +// nolint:revive +func NewCommonAdapter(config *rest.Config, + clusterNodeLister clusterlister.ClusterNodeLister, + processor lifted.AsyncWorker) *CommonAdapter { + return &CommonAdapter{ + config: config, + clusterNodeLister: clusterNodeLister, + processor: processor, + } +} + +func (c *CommonAdapter) Start(stopCh <-chan struct{}) error { + client, err := kubernetes.NewForConfig(c.config) + if err != nil { + return err + } + + informerFactory := informer.NewSharedInformerFactory(client, 0) + c.nodeLister = informerFactory.Core().V1().Nodes().Lister() + _, err = informerFactory.Core().V1().Nodes().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.OnAdd, + UpdateFunc: c.OnUpdate, + DeleteFunc: c.OnDelete, + }) + if err != nil { + return err + } + + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + + c.sync = true + klog.Info("common informer started!") + return nil +} + +func (c *CommonAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) { + node, err := c.nodeLister.Get(nodeName) + if err != nil { + klog.Infof("get node %s error:%v", nodeName, err) + return nil, err + } + + return node.Spec.PodCIDRs, nil +} + +func (c *CommonAdapter) Synced() bool { + return c.sync +} + +func (c *CommonAdapter) OnAdd(obj interface{}) { + runtimeObj, ok := obj.(*corev1.Node) + if !ok { + return + } + requeue(runtimeObj.Name, c.clusterNodeLister, c.processor) +} + +// OnUpdate handles object update event and push the object to queue. +func (c *CommonAdapter) OnUpdate(_, newObj interface{}) { + runtimeObj, ok := newObj.(*corev1.Node) + if !ok { + return + } + requeue(runtimeObj.Name, c.clusterNodeLister, c.processor) +} + +// OnDelete handles object delete event and push the object to queue. +func (c *CommonAdapter) OnDelete(obj interface{}) { + runtimeObj, ok := obj.(*corev1.Node) + if !ok { + return + } + requeue(runtimeObj.Name, c.clusterNodeLister, c.processor) +} diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/interface.go b/pkg/clusterlink/controllers/nodecidr/adaper/interface.go new file mode 100644 index 000000000..decdb017a --- /dev/null +++ b/pkg/clusterlink/controllers/nodecidr/adaper/interface.go @@ -0,0 +1,64 @@ +package adaper + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" + + clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/utils/keys" + "github.com/kosmos.io/kosmos/pkg/utils/lifted" +) + +type CniAdapter interface { + GetCIDRByNodeName(nodeName string) ([]string, error) + + Start(stopCh <-chan struct{}) error + + Synced() bool +} + +func requeue(originNodeName string, clusterNodeLister clusterlister.ClusterNodeLister, processor lifted.AsyncWorker) { + clusterNodes, err := clusterNodeLister.List(labels.Everything()) + if err != nil { + klog.Errorf("list clusterNodes err: %v", err) + return + } + + flag := false + for _, clusterNode := range clusterNodes { + if clusterNode.Spec.NodeName == originNodeName { + key, err := ClusterWideKeyFunc(clusterNode) + if err != nil { + klog.Errorf("make clusterNode as a reconsile key err: %v", err) + return + } + + klog.V(7).Infof("key %s is enqueued!", originNodeName) + processor.Add(key) + flag = true + break + } + } + if !flag { + clusterNode := &clusterlinkv1alpha1.ClusterNode{ + ObjectMeta: metav1.ObjectMeta{ + Name: originNodeName, + }, + } + key, err := ClusterWideKeyFunc(clusterNode) + if err != nil { + klog.Errorf("make clusterNode as a reconsile key err: %v", err) + return + } + + klog.V(7).Infof("can't find match clusternode %s", originNodeName) + processor.Add(key) + } +} + +// ClusterWideKeyFunc generates a ClusterWideKey for object. +func ClusterWideKeyFunc(obj interface{}) (lifted.QueueKey, error) { + return keys.ClusterWideKeyFunc(obj) +} diff --git a/pkg/clusterlink/controllers/nodecidr/adapter.go b/pkg/clusterlink/controllers/nodecidr/adapter.go deleted file mode 100644 index 8f8105d70..000000000 --- a/pkg/clusterlink/controllers/nodecidr/adapter.go +++ /dev/null @@ -1,292 +0,0 @@ -package nodecidr - -import ( - "strings" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" - informer "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - lister "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - - clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/utils/lifted" -) - -type cniAdapter interface { - getCIDRByNodeName(nodeName string) ([]string, error) - - start(stopCh <-chan struct{}) error - - synced() bool -} - -type commonAdapter struct { - sync bool - config *rest.Config - nodeLister lister.NodeLister - clusterNodeLister clusterlister.ClusterNodeLister - processor lifted.AsyncWorker -} - -// nolint:revive -func NewCommonAdapter(config *rest.Config, - clusterNodeLister clusterlister.ClusterNodeLister, - processor lifted.AsyncWorker) *commonAdapter { - return &commonAdapter{ - config: config, - clusterNodeLister: clusterNodeLister, - processor: processor, - } -} - -func (c *commonAdapter) start(stopCh <-chan struct{}) error { - client, err := kubernetes.NewForConfig(c.config) - if err != nil { - return err - } - - informerFactory := informer.NewSharedInformerFactory(client, 0) - c.nodeLister = informerFactory.Core().V1().Nodes().Lister() - _, err = informerFactory.Core().V1().Nodes().Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.OnAdd, - UpdateFunc: c.OnUpdate, - DeleteFunc: c.OnDelete, - }) - if err != nil { - return err - } - - informerFactory.Start(stopCh) - informerFactory.WaitForCacheSync(stopCh) - - c.sync = true - klog.Info("common informer started!") - return nil -} - -func (c *commonAdapter) getCIDRByNodeName(nodeName string) ([]string, error) { - node, err := c.nodeLister.Get(nodeName) - if err != nil { - klog.Infof("get node %s error:%v", nodeName, err) - return nil, err - } - - return node.Spec.PodCIDRs, nil -} - -func (c *commonAdapter) synced() bool { - return c.sync -} - -func (c *commonAdapter) OnAdd(obj interface{}) { - runtimeObj, ok := obj.(*corev1.Node) - if !ok { - return - } - requeue(runtimeObj.Name, c.clusterNodeLister, c.processor) -} - -// OnUpdate handles object update event and push the object to queue. -func (c *commonAdapter) OnUpdate(_, newObj interface{}) { - runtimeObj, ok := newObj.(*corev1.Node) - if !ok { - return - } - requeue(runtimeObj.Name, c.clusterNodeLister, c.processor) -} - -// OnDelete handles object delete event and push the object to queue. -func (c *commonAdapter) OnDelete(obj interface{}) { - runtimeObj, ok := obj.(*corev1.Node) - if !ok { - return - } - requeue(runtimeObj.Name, c.clusterNodeLister, c.processor) -} - -type calicoAdapter struct { - sync bool - config *rest.Config - blockLister cache.GenericLister - clusterNodeLister clusterlister.ClusterNodeLister - processor lifted.AsyncWorker -} - -// nolint:revive -func NewCalicoAdapter(config *rest.Config, - clusterNodeLister clusterlister.ClusterNodeLister, - processor lifted.AsyncWorker) *calicoAdapter { - return &calicoAdapter{ - config: config, - clusterNodeLister: clusterNodeLister, - processor: processor, - } -} - -func (c *calicoAdapter) start(stopCh <-chan struct{}) error { - client, err := dynamic.NewForConfig(c.config) - if err != nil { - klog.Errorf("init dynamic client err: %v", err) - return err - } - gvr := schema.GroupVersionResource{ - Group: "crd.projectcalico.org", - Version: "v1", - Resource: "blockaffinities", - } - informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, 0) - _, err = informerFactory.ForResource(gvr).Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.OnAdd, - UpdateFunc: c.OnUpdate, - DeleteFunc: c.OnDelete, - }) - if err != nil { - klog.Errorf("add event handler error: %v", err) - return err - } - - c.blockLister = informerFactory.ForResource(gvr).Lister() - informerFactory.Start(stopCh) - informerFactory.WaitForCacheSync(stopCh) - - c.sync = true - klog.Info("calico blockaffinities informer started!") - return nil -} - -func (c *calicoAdapter) getCIDRByNodeName(nodeName string) ([]string, error) { - blockAffinities, err := c.blockLister.List(labels.Everything()) - if err != nil { - klog.Errorf("list blockAffinities error: %v", err) - return nil, err - } - var podCIDRS []string - for _, ba := range blockAffinities { - uobj := ba.(*unstructured.Unstructured) - node, found, err := unstructured.NestedString(uobj.Object, "spec", "node") - if err != nil { - klog.Errorf("get spec.node from blockAffinity err: ", err) - } - if !found { - continue - } - cidr, found, err := unstructured.NestedString(uobj.Object, "spec", "cidr") - if err != nil { - klog.Errorf("get spec.cidr from blockAffinity err: ", err) - } - if !found { - continue - } - if strings.Compare(node, nodeName) == 0 { - podCIDRS = append(podCIDRS, cidr) - } - } - - return podCIDRS, nil -} - -func (c *calicoAdapter) synced() bool { - return c.sync -} - -func (c *calicoAdapter) OnAdd(obj interface{}) { - klog.V(7).Info("add event") - runtimeObj, ok := obj.(*unstructured.Unstructured) - if !ok { - return - } - node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node") - if err != nil { - klog.Errorf("get spec.node from blockAffinity err: ", err) - } - if !found { - return - } - klog.V(7).Info("add event Enqueue") - requeue(node, c.clusterNodeLister, c.processor) -} - -// OnUpdate handles object update event and push the object to queue. -func (c *calicoAdapter) OnUpdate(_, newObj interface{}) { - klog.V(7).Info("update event") - runtimeObj, ok := newObj.(*unstructured.Unstructured) - if !ok { - return - } - node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node") - if err != nil { - klog.Errorf("get spec.node from blockAffinity err: ", err) - } - if !found { - return - } - klog.V(7).Info("update event Enqueue") - requeue(node, c.clusterNodeLister, c.processor) -} - -// OnDelete handles object delete event and push the object to queue. -func (c *calicoAdapter) OnDelete(obj interface{}) { - runtimeObj, ok := obj.(*unstructured.Unstructured) - if !ok { - return - } - node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node") - if err != nil { - klog.Errorf("get spec.node from blockAffinity err: ", err) - } - if !found { - return - } - requeue(node, c.clusterNodeLister, c.processor) -} - -func requeue(originNodeName string, clusterNodeLister clusterlister.ClusterNodeLister, processor lifted.AsyncWorker) { - clusterNodes, err := clusterNodeLister.List(labels.Everything()) - if err != nil { - klog.Errorf("list clusterNodes err: %v", err) - return - } - - flag := false - for _, clusterNode := range clusterNodes { - if clusterNode.Spec.NodeName == originNodeName { - key, err := ClusterWideKeyFunc(clusterNode) - if err != nil { - klog.Errorf("make clusterNode as a reconsile key err: %v", err) - return - } - - klog.V(7).Infof("key %s is enqueued!", originNodeName) - processor.Add(key) - flag = true - break - } - } - if !flag { - clusterNode := &clusterlinkv1alpha1.ClusterNode{ - ObjectMeta: metav1.ObjectMeta{ - Name: originNodeName, - }, - } - key, err := ClusterWideKeyFunc(clusterNode) - if err != nil { - klog.Errorf("make clusterNode as a reconsile key err: %v", err) - return - } - - klog.V(7).Infof("can't find match clusternode %s", originNodeName) - processor.Add(key) - } -} diff --git a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go index 4e42761e3..7cb5540c4 100644 --- a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go +++ b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go @@ -18,7 +18,9 @@ import ( "k8s.io/klog/v2" clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + clustercontroller "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/cluster" nodecontroller "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/node" + "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/nodecidr/adaper" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" clusterinformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/kosmos/v1alpha1" @@ -47,7 +49,7 @@ type Controller struct { clusterNodeInformer clusterinformer.ClusterNodeInformer clusterNodeLister clusterlister.ClusterNodeLister - cniAdapter cniAdapter + cniAdapter adaper.CniAdapter sync.RWMutex ctx context.Context } @@ -67,7 +69,7 @@ func (c *Controller) Start(ctx context.Context) error { opt := lifted.WorkerOptions{ Name: "node cidr Controller", - KeyFunc: ClusterWideKeyFunc, + KeyFunc: adaper.ClusterWideKeyFunc, ReconcileFunc: c.Reconcile, RateLimiterOptions: c.RateLimiterOptions, } @@ -117,12 +119,19 @@ func (c *Controller) Start(ctx context.Context) error { // third step: init CNI Adapter if cluster.Spec.ClusterLinkOptions.CNI == calicoCNI { klog.Infof("cluster %s's cni is %s", c.clusterName, calicoCNI) - c.cniAdapter = NewCalicoAdapter(c.config, c.clusterNodeLister, c.processor) + c.cniAdapter = adaper.NewCalicoAdapter(c.config, c.clusterNodeLister, c.processor) + if clustercontroller.CheckIsEtcd(cluster) { + etcdClient, err := clustercontroller.GetETCDClient(cluster) + if err != nil { + klog.Errorf("init etcd client err: %v", err) + } + c.cniAdapter = adaper.NewCalicoETCDAdapter(etcdClient, c.clusterNodeLister, c.processor) + } } else { klog.Infof("cluster %s's cni is %s", c.clusterName, cluster.Spec.ClusterLinkOptions.CNI) - c.cniAdapter = NewCommonAdapter(c.config, c.clusterNodeLister, c.processor) + c.cniAdapter = adaper.NewCommonAdapter(c.config, c.clusterNodeLister, c.processor) } - err = c.cniAdapter.start(stopCh) + err = c.cniAdapter.Start(stopCh) if err != nil { return err } @@ -145,7 +154,7 @@ func (c *Controller) Reconcile(key lifted.QueueKey) error { if err != nil { if apierrors.IsNotFound(err) { klog.Info("maybe clusterWideKey.Name is k8s node's name instead of clusternode's name,try to get node podCIDRs") - nodePodcidr, err := c.cniAdapter.getCIDRByNodeName(clusterWideKey.Name) + nodePodcidr, err := c.cniAdapter.GetCIDRByNodeName(clusterWideKey.Name) // get cluster node name by clustername and k8s node's name clusterNodeName := nodecontroller.ClusterNodeName(c.clusterName, clusterWideKey.Name) // if err is no nil, means get node error or list blockAffinities error @@ -192,12 +201,12 @@ func (c *Controller) Reconcile(key lifted.QueueKey) error { return nil } - if !c.cniAdapter.synced() { + if !c.cniAdapter.Synced() { c.processor.AddAfter(key, requeueTime) return nil } - podCIDRs, err := c.cniAdapter.getCIDRByNodeName(originNodeName) + podCIDRs, err := c.cniAdapter.GetCIDRByNodeName(originNodeName) if err != nil { klog.Errorf("get node %s's podCIDRs err: %v", originNodeName, err) return err @@ -217,11 +226,6 @@ func (c *Controller) Reconcile(key lifted.QueueKey) error { }) } -// ClusterWideKeyFunc generates a ClusterWideKey for object. -func ClusterWideKeyFunc(obj interface{}) (lifted.QueueKey, error) { - return keys.ClusterWideKeyFunc(obj) -} - // OnAdd handles object add event and push the object to queue. func (c *Controller) OnAdd(obj interface{}) { runtimeObj, ok := obj.(runtime.Object)