Skip to content

Commit

Permalink
add etcd watcher for blockaffinity
Browse files Browse the repository at this point in the history
Signed-off-by: duanmengkk <[email protected]>
  • Loading branch information
duanmengkk committed Jan 25, 2025
1 parent 9d41c32 commit b4d2d9f
Show file tree
Hide file tree
Showing 7 changed files with 445 additions and 334 deletions.
85 changes: 56 additions & 29 deletions pkg/clusterlink/controllers/cluster/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"

"github.com/projectcalico/calico/libcalico-go/lib/apiconfig"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/etcdv3"
"github.com/projectcalico/calico/libcalico-go/lib/clientv3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -46,68 +48,93 @@ func CheckIsEtcd(cluster *clusterlinkv1alpha1.Cluster) bool {
}

func GetCalicoClient(cluster *clusterlinkv1alpha1.Cluster) (clientv3.Interface, error) {
var calicoConfig CalicoConfig
config, err := ctrl.GetConfig()
calicoConfig, err := getCalicoAPIConfig(cluster)
if err != nil {
klog.Errorf("failed to get k8s config: %v", err)
klog.Errorf("Error getting calicAPIConfig: %v", err)
return nil, err
}

clientSet, err := kubernetes.NewForConfig(config)
calicoClient, err := clientv3.New(*calicoConfig)
if err != nil {
klog.Errorf("failed to build k8s kubeClient: %v", err)
klog.Errorf("failed to new kubeClient, cluster name is %s.", cluster.Name)
return nil, err
}

clusterConfigMap, err := clientSet.CoreV1().ConfigMaps(utils.DefaultNamespace).Get(context.Background(), cluster.Name, metav1.GetOptions{})
return calicoClient, nil
}

func GetETCDClient(cluster *clusterlinkv1alpha1.Cluster) (api.Client, error) {
calicoConfig, err := getCalicoAPIConfig(cluster)
if err != nil {
klog.Errorf("failed to get cluster configmap, cluster name is %s.", cluster.Name)
klog.Errorf("Error getting calicAPIConfig: %v", err)
return nil, err
}

calicoAPIConfig := apiconfig.NewCalicoAPIConfig()
calicoData := clusterConfigMap.Data
calicoJSONStr, err := json.Marshal(calicoData)
etcdV3Client, err := etcdv3.NewEtcdV3Client(&calicoConfig.Spec.EtcdConfig)
if err != nil {
klog.Errorf("failed to marshal cluster configmap %s to json string.", cluster.Name)
klog.Errorf("failed to new etcdClient, cluster name is %s.", cluster.Name)
return nil, err
}
err = json.Unmarshal(calicoJSONStr, &calicoConfig)

return etcdV3Client, nil
}

func getCalicoAPIConfig(cluster *clusterlinkv1alpha1.Cluster) (*apiconfig.CalicoAPIConfig, error) {
config, err := ctrl.GetConfig()
if err != nil {
klog.Errorf("failed to unmarshal json string to calico config, cluster configmap is %s.", cluster.Name)
return nil, err
return nil, fmt.Errorf("failed to get k8s config: %v", err)
}

// Decoding etcd config.
decodeEtcdKey, err := base64.StdEncoding.DecodeString(calicoConfig.EtcdKey)
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to build k8s kubeClient: %v", err)
}

clusterConfigMap, err := clientSet.CoreV1().ConfigMaps(utils.DefaultNamespace).Get(context.Background(), cluster.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get cluster ConfigMap for cluster %s: %v", cluster.Name, err)
}

var calicoConfig CalicoConfig
calicoData := clusterConfigMap.Data
calicoJSONStr, err := json.Marshal(calicoData)
if err != nil {
return nil, fmt.Errorf("failed to marshal ConfigMap data for cluster %s to JSON: %v", cluster.Name, err)
}

if err := json.Unmarshal(calicoJSONStr, &calicoConfig); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON to CalicoConfig for cluster %s: %v", cluster.Name, err)
}

etcdKey, err := decodeBase64(calicoConfig.EtcdKey, "etcd key", cluster.Name)
if err != nil {
klog.Errorf("decoding exception, etcd key is invalid, cluster name is %s.", cluster.Name)
return nil, err
}
decodeEtcdCert, err := base64.StdEncoding.DecodeString(calicoConfig.EtcdCert)
etcdCert, err := decodeBase64(calicoConfig.EtcdCert, "etcd cert", cluster.Name)
if err != nil {
klog.Errorf("decoding exception, etcd cert is invalid, cluster name is %s.", cluster.Name)
return nil, err
}
decodeEtcdCACert, err := base64.StdEncoding.DecodeString(calicoConfig.EtcdCACert)
etcdCACert, err := decodeBase64(calicoConfig.EtcdCACert, "etcd CA cert", cluster.Name)
if err != nil {
klog.Errorf("decoding exception, etcd ca.cert is invalid, cluster name is %s.", cluster.Name)
return nil, err
}

calicoAPIConfig := apiconfig.NewCalicoAPIConfig()
calicoAPIConfig.Spec.DatastoreType = apiconfig.DatastoreType(calicoConfig.DatastoreType)
calicoAPIConfig.Spec.EtcdEndpoints = calicoConfig.EtcdEndpoints
calicoAPIConfig.Spec.EtcdKey = string(decodeEtcdKey)
calicoAPIConfig.Spec.EtcdCert = string(decodeEtcdCert)
calicoAPIConfig.Spec.EtcdCACert = string(decodeEtcdCACert)
calicoAPIConfig.Spec.EtcdKey = etcdKey
calicoAPIConfig.Spec.EtcdCert = etcdCert
calicoAPIConfig.Spec.EtcdCACert = etcdCACert

return calicoAPIConfig, nil
}

calicoClient, err := clientv3.New(*calicoAPIConfig)
func decodeBase64(encoded, fieldName, clusterName string) (string, error) {
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
klog.Errorf("failed to new kubeClient, cluster name is %s.", cluster.Name)
return nil, err
return "", fmt.Errorf("failed to decode %s for cluster %s: %v", fieldName, clusterName, err)
}

return calicoClient, nil
return string(decoded), nil
}

func ResolveServiceCIDRs(pod *corev1.Pod) ([]string, error) {
Expand Down
154 changes: 154 additions & 0 deletions pkg/clusterlink/controllers/nodecidr/adaper/calico.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package adaper

import (
"strings"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
)

type CalicoAdapter struct {
sync bool
config *rest.Config
blockLister cache.GenericLister
clusterNodeLister clusterlister.ClusterNodeLister
processor lifted.AsyncWorker
}

// nolint:revive
func NewCalicoAdapter(config *rest.Config,
clusterNodeLister clusterlister.ClusterNodeLister,
processor lifted.AsyncWorker) *CalicoAdapter {
return &CalicoAdapter{
config: config,
clusterNodeLister: clusterNodeLister,
processor: processor,
}
}

func (c *CalicoAdapter) Start(stopCh <-chan struct{}) error {
client, err := dynamic.NewForConfig(c.config)
if err != nil {
klog.Errorf("init dynamic client err: %v", err)
return err
}
gvr := schema.GroupVersionResource{
Group: "crd.projectcalico.org",
Version: "v1",
Resource: "blockaffinities",
}
informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, 0)
_, err = informerFactory.ForResource(gvr).Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.OnAdd,
UpdateFunc: c.OnUpdate,
DeleteFunc: c.OnDelete,
})
if err != nil {
klog.Errorf("add event handler error: %v", err)
return err
}

c.blockLister = informerFactory.ForResource(gvr).Lister()
informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)

c.sync = true
klog.Info("calico blockaffinities informer started!")
return nil
}

func (c *CalicoAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) {
blockAffinities, err := c.blockLister.List(labels.Everything())
if err != nil {
klog.Errorf("list blockAffinities error: %v", err)
return nil, err
}
var podCIDRS []string
for _, ba := range blockAffinities {
uobj := ba.(*unstructured.Unstructured)
node, found, err := unstructured.NestedString(uobj.Object, "spec", "node")
if err != nil {
klog.Errorf("get spec.node from blockAffinity err: ", err)
}
if !found {
continue
}
cidr, found, err := unstructured.NestedString(uobj.Object, "spec", "cidr")
if err != nil {
klog.Errorf("get spec.cidr from blockAffinity err: ", err)
}
if !found {
continue
}
if strings.Compare(node, nodeName) == 0 {
podCIDRS = append(podCIDRS, cidr)
}
}

return podCIDRS, nil
}

func (c *CalicoAdapter) Synced() bool {
return c.sync
}

func (c *CalicoAdapter) OnAdd(obj interface{}) {
klog.V(7).Info("add event")
runtimeObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return
}
node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node")
if err != nil {
klog.Errorf("get spec.node from blockAffinity err: ", err)
}
if !found {
return
}
klog.V(7).Info("add event Enqueue")
requeue(node, c.clusterNodeLister, c.processor)
}

// OnUpdate handles object update event and push the object to queue.
func (c *CalicoAdapter) OnUpdate(_, newObj interface{}) {
klog.V(7).Info("update event")
runtimeObj, ok := newObj.(*unstructured.Unstructured)
if !ok {
return
}
node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node")
if err != nil {
klog.Errorf("get spec.node from blockAffinity err: ", err)
}
if !found {
return
}
klog.V(7).Info("update event Enqueue")
requeue(node, c.clusterNodeLister, c.processor)
}

// OnDelete handles object delete event and push the object to queue.
func (c *CalicoAdapter) OnDelete(obj interface{}) {
runtimeObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return
}
node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node")
if err != nil {
klog.Errorf("get spec.node from blockAffinity err: ", err)
}
if !found {
return
}
requeue(node, c.clusterNodeLister, c.processor)
}
55 changes: 55 additions & 0 deletions pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package adaper

import (
clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
)

type CalicoETCDAdapter struct {
sync bool
etcdClient api.Client
clusterNodeLister clusterlister.ClusterNodeLister
processor lifted.AsyncWorker
}

// nolint:revive
func NewCalicoETCDAdapter(etcdClient api.Client,
clusterNodeLister clusterlister.ClusterNodeLister,
processor lifted.AsyncWorker) *CalicoETCDAdapter {
return &CalicoETCDAdapter{
etcdClient: etcdClient,
clusterNodeLister: clusterNodeLister,
processor: processor,
}
}

func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error {
// todo use c.etcdClient to list and watch blockaffinity in etcd
return nil
}

func (c *CalicoETCDAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) {
// see calicoctl/calicoctl/commands/datastore/migrate/migrateipam.go
// and libcalico-go/lib/backend/model/block_affinity.go
// todo use c.etcdClient to get blockaffinity in etcd
return nil, nil
}

func (c *CalicoETCDAdapter) Synced() bool {
return c.sync
}

func (c *CalicoETCDAdapter) OnAdd(obj interface{}) {
// todo add event info to c.processor
}

// OnUpdate handles object update event and push the object to queue.
func (c *CalicoETCDAdapter) OnUpdate(_, newObj interface{}) {
// todo add event info to c.processor
}

// OnDelete handles object delete event and push the object to queue.
func (c *CalicoETCDAdapter) OnDelete(obj interface{}) {
// todo add event info to c.processor
}
Loading

0 comments on commit b4d2d9f

Please sign in to comment.