Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from kosmos-io:main #27

Merged
merged 4 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 0 additions & 88 deletions go.sum

Large diffs are not rendered by default.

85 changes: 56 additions & 29 deletions pkg/clusterlink/controllers/cluster/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"

"github.com/projectcalico/calico/libcalico-go/lib/apiconfig"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/etcdv3"
"github.com/projectcalico/calico/libcalico-go/lib/clientv3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -46,68 +48,93 @@ func CheckIsEtcd(cluster *clusterlinkv1alpha1.Cluster) bool {
}

func GetCalicoClient(cluster *clusterlinkv1alpha1.Cluster) (clientv3.Interface, error) {
var calicoConfig CalicoConfig
config, err := ctrl.GetConfig()
calicoConfig, err := getCalicoAPIConfig(cluster)
if err != nil {
klog.Errorf("failed to get k8s config: %v", err)
klog.Errorf("Error getting calicAPIConfig: %v", err)
return nil, err
}

clientSet, err := kubernetes.NewForConfig(config)
calicoClient, err := clientv3.New(*calicoConfig)
if err != nil {
klog.Errorf("failed to build k8s kubeClient: %v", err)
klog.Errorf("failed to new kubeClient, cluster name is %s.", cluster.Name)
return nil, err
}

clusterConfigMap, err := clientSet.CoreV1().ConfigMaps(utils.DefaultNamespace).Get(context.Background(), cluster.Name, metav1.GetOptions{})
return calicoClient, nil
}

func GetETCDClient(cluster *clusterlinkv1alpha1.Cluster) (api.Client, error) {
calicoConfig, err := getCalicoAPIConfig(cluster)
if err != nil {
klog.Errorf("failed to get cluster configmap, cluster name is %s.", cluster.Name)
klog.Errorf("Error getting calicAPIConfig: %v", err)
return nil, err
}

calicoAPIConfig := apiconfig.NewCalicoAPIConfig()
calicoData := clusterConfigMap.Data
calicoJSONStr, err := json.Marshal(calicoData)
etcdV3Client, err := etcdv3.NewEtcdV3Client(&calicoConfig.Spec.EtcdConfig)
if err != nil {
klog.Errorf("failed to marshal cluster configmap %s to json string.", cluster.Name)
klog.Errorf("failed to new etcdClient, cluster name is %s.", cluster.Name)
return nil, err
}
err = json.Unmarshal(calicoJSONStr, &calicoConfig)

return etcdV3Client, nil
}

func getCalicoAPIConfig(cluster *clusterlinkv1alpha1.Cluster) (*apiconfig.CalicoAPIConfig, error) {
config, err := ctrl.GetConfig()
if err != nil {
klog.Errorf("failed to unmarshal json string to calico config, cluster configmap is %s.", cluster.Name)
return nil, err
return nil, fmt.Errorf("failed to get k8s config: %v", err)
}

// Decoding etcd config.
decodeEtcdKey, err := base64.StdEncoding.DecodeString(calicoConfig.EtcdKey)
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to build k8s kubeClient: %v", err)
}

clusterConfigMap, err := clientSet.CoreV1().ConfigMaps(utils.DefaultNamespace).Get(context.Background(), cluster.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get cluster ConfigMap for cluster %s: %v", cluster.Name, err)
}

var calicoConfig CalicoConfig
calicoData := clusterConfigMap.Data
calicoJSONStr, err := json.Marshal(calicoData)
if err != nil {
return nil, fmt.Errorf("failed to marshal ConfigMap data for cluster %s to JSON: %v", cluster.Name, err)
}

if err := json.Unmarshal(calicoJSONStr, &calicoConfig); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON to CalicoConfig for cluster %s: %v", cluster.Name, err)
}

etcdKey, err := decodeBase64(calicoConfig.EtcdKey, "etcd key", cluster.Name)
if err != nil {
klog.Errorf("decoding exception, etcd key is invalid, cluster name is %s.", cluster.Name)
return nil, err
}
decodeEtcdCert, err := base64.StdEncoding.DecodeString(calicoConfig.EtcdCert)
etcdCert, err := decodeBase64(calicoConfig.EtcdCert, "etcd cert", cluster.Name)
if err != nil {
klog.Errorf("decoding exception, etcd cert is invalid, cluster name is %s.", cluster.Name)
return nil, err
}
decodeEtcdCACert, err := base64.StdEncoding.DecodeString(calicoConfig.EtcdCACert)
etcdCACert, err := decodeBase64(calicoConfig.EtcdCACert, "etcd CA cert", cluster.Name)
if err != nil {
klog.Errorf("decoding exception, etcd ca.cert is invalid, cluster name is %s.", cluster.Name)
return nil, err
}

calicoAPIConfig := apiconfig.NewCalicoAPIConfig()
calicoAPIConfig.Spec.DatastoreType = apiconfig.DatastoreType(calicoConfig.DatastoreType)
calicoAPIConfig.Spec.EtcdEndpoints = calicoConfig.EtcdEndpoints
calicoAPIConfig.Spec.EtcdKey = string(decodeEtcdKey)
calicoAPIConfig.Spec.EtcdCert = string(decodeEtcdCert)
calicoAPIConfig.Spec.EtcdCACert = string(decodeEtcdCACert)
calicoAPIConfig.Spec.EtcdKey = etcdKey
calicoAPIConfig.Spec.EtcdCert = etcdCert
calicoAPIConfig.Spec.EtcdCACert = etcdCACert

return calicoAPIConfig, nil
}

calicoClient, err := clientv3.New(*calicoAPIConfig)
func decodeBase64(encoded, fieldName, clusterName string) (string, error) {
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
klog.Errorf("failed to new kubeClient, cluster name is %s.", cluster.Name)
return nil, err
return "", fmt.Errorf("failed to decode %s for cluster %s: %v", fieldName, clusterName, err)
}

return calicoClient, nil
return string(decoded), nil
}

func ResolveServiceCIDRs(pod *corev1.Pod) ([]string, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package blockwatchsyncer

import (
apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
"github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer"
)

// NewBlockWatchSyncer creates a new BlockAffinity v1 Syncer.
func NewBlockWatchSyncer(client api.Client, callbacks api.SyncerCallbacks) api.Syncer {
resourceTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindBlockAffinity},
},
}

return watchersyncer.New(client, resourceTypes, callbacks)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package blockwatchsyncer

import (
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"time"
)

// syncedPollPeriod controls how often you look at the status of your sync funcs
var syncedPollPeriod = 100 * time.Millisecond

type BlockEventHandler struct {
// Channel for getting updates and status updates from syncer.
syncerC chan interface{}

processor lifted.AsyncWorker
// Channel to indicate node status reporter routine is not needed anymore.
done chan struct{}

// Flag to show we are in-sync.
inSync bool
}

func NewBlockEventHandler(processor lifted.AsyncWorker) *BlockEventHandler {
return &BlockEventHandler{
processor: processor,
}
}

func (b *BlockEventHandler) Run(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case <-b.done:
return
case event := <-b.syncerC:
switch event := event.(type) {
case []api.Update:
b.onupdate(event)
case api.SyncStatus:
b.inSync = true
}
}
}
}

func (b *BlockEventHandler) Stop() {
b.done <- struct{}{}
}

func (b *BlockEventHandler) Done() <-chan struct{} {
return b.done
}

func (b *BlockEventHandler) InSync() bool {
return b.inSync
}

func (b *BlockEventHandler) OnStatusUpdated(status api.SyncStatus) {
if status == api.InSync {
b.syncerC <- status
}
}

func (b *BlockEventHandler) OnUpdates(updates []api.Update) {
b.syncerC <- updates
}

// todo put etcd's event info AsyncWorker's queue
func (b *BlockEventHandler) onupdate(event []api.Update) {

}

func (b *BlockEventHandler) WaitForCacheSync(stopCh <-chan struct{}) bool {
err := wait.PollImmediateUntil(syncedPollPeriod, func() (done bool, err error) {
if b.inSync {
return true, nil
}
return false, nil
}, stopCh)

if err != nil {
klog.V(2).Infof("stop requested")
return false
}

klog.V(4).Infof("caches populated")
return true
}
154 changes: 154 additions & 0 deletions pkg/clusterlink/controllers/nodecidr/adaper/calico.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package adaper

import (
"strings"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
)

type CalicoAdapter struct {
sync bool
config *rest.Config
blockLister cache.GenericLister
clusterNodeLister clusterlister.ClusterNodeLister
processor lifted.AsyncWorker
}

// nolint:revive
func NewCalicoAdapter(config *rest.Config,
clusterNodeLister clusterlister.ClusterNodeLister,
processor lifted.AsyncWorker) *CalicoAdapter {
return &CalicoAdapter{
config: config,
clusterNodeLister: clusterNodeLister,
processor: processor,
}
}

func (c *CalicoAdapter) Start(stopCh <-chan struct{}) error {
client, err := dynamic.NewForConfig(c.config)
if err != nil {
klog.Errorf("init dynamic client err: %v", err)
return err
}
gvr := schema.GroupVersionResource{
Group: "crd.projectcalico.org",
Version: "v1",
Resource: "blockaffinities",
}
informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(client, 0)
_, err = informerFactory.ForResource(gvr).Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.OnAdd,
UpdateFunc: c.OnUpdate,
DeleteFunc: c.OnDelete,
})
if err != nil {
klog.Errorf("add event handler error: %v", err)
return err
}

c.blockLister = informerFactory.ForResource(gvr).Lister()
informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)

c.sync = true
klog.Info("calico blockaffinities informer started!")
return nil
}

func (c *CalicoAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) {
blockAffinities, err := c.blockLister.List(labels.Everything())
if err != nil {
klog.Errorf("list blockAffinities error: %v", err)
return nil, err
}
var podCIDRS []string
for _, ba := range blockAffinities {
uobj := ba.(*unstructured.Unstructured)
node, found, err := unstructured.NestedString(uobj.Object, "spec", "node")
if err != nil {
klog.Errorf("get spec.node from blockAffinity err: ", err)
}
if !found {
continue
}
cidr, found, err := unstructured.NestedString(uobj.Object, "spec", "cidr")
if err != nil {
klog.Errorf("get spec.cidr from blockAffinity err: ", err)
}
if !found {
continue
}
if strings.Compare(node, nodeName) == 0 {
podCIDRS = append(podCIDRS, cidr)
}
}

return podCIDRS, nil
}

func (c *CalicoAdapter) Synced() bool {
return c.sync
}

func (c *CalicoAdapter) OnAdd(obj interface{}) {
klog.V(7).Info("add event")
runtimeObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return
}
node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node")
if err != nil {
klog.Errorf("get spec.node from blockAffinity err: ", err)
}
if !found {
return
}
klog.V(7).Info("add event Enqueue")
requeue(node, c.clusterNodeLister, c.processor)
}

// OnUpdate handles object update event and push the object to queue.
func (c *CalicoAdapter) OnUpdate(_, newObj interface{}) {
klog.V(7).Info("update event")
runtimeObj, ok := newObj.(*unstructured.Unstructured)
if !ok {
return
}
node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node")
if err != nil {
klog.Errorf("get spec.node from blockAffinity err: ", err)
}
if !found {
return
}
klog.V(7).Info("update event Enqueue")
requeue(node, c.clusterNodeLister, c.processor)
}

// OnDelete handles object delete event and push the object to queue.
func (c *CalicoAdapter) OnDelete(obj interface{}) {
runtimeObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return
}
node, found, err := unstructured.NestedString(runtimeObj.Object, "spec", "node")
if err != nil {
klog.Errorf("get spec.node from blockAffinity err: ", err)
}
if !found {
return
}
requeue(node, c.clusterNodeLister, c.processor)
}
Loading
Loading