Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add etcd watcher for blockaffinity #824

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 0 additions & 88 deletions go.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package blockwatchsyncer

import (
apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
"github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer"
)

// NewBlockWatchSyncer creates a new BlockAffinity v1 Syncer.
func NewBlockWatchSyncer(client api.Client, callbacks api.SyncerCallbacks) api.Syncer {
resourceTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindBlockAffinity},
},
}

return watchersyncer.New(client, resourceTypes, callbacks)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package blockwatchsyncer

import (
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"time"
)

// syncedPollPeriod controls how often you look at the status of your sync funcs
var syncedPollPeriod = 100 * time.Millisecond

type BlockEventHandler struct {
// Channel for getting updates and status updates from syncer.
syncerC chan interface{}

processor lifted.AsyncWorker
// Channel to indicate node status reporter routine is not needed anymore.
done chan struct{}

// Flag to show we are in-sync.
inSync bool
}

func NewBlockEventHandler(processor lifted.AsyncWorker) *BlockEventHandler {
return &BlockEventHandler{
processor: processor,
}
}

func (b *BlockEventHandler) Run(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case <-b.done:
return
case event := <-b.syncerC:
switch event := event.(type) {
case []api.Update:
b.onupdate(event)
case api.SyncStatus:
b.inSync = true
}
}
}
}

func (b *BlockEventHandler) Stop() {
b.done <- struct{}{}
}

func (b *BlockEventHandler) Done() <-chan struct{} {
return b.done
}

func (b *BlockEventHandler) InSync() bool {
return b.inSync
}

func (b *BlockEventHandler) OnStatusUpdated(status api.SyncStatus) {
if status == api.InSync {
b.syncerC <- status
}
}

func (b *BlockEventHandler) OnUpdates(updates []api.Update) {
b.syncerC <- updates
}

// todo put etcd's event info AsyncWorker's queue
func (b *BlockEventHandler) onupdate(event []api.Update) {

}

func (b *BlockEventHandler) WaitForCacheSync(stopCh <-chan struct{}) bool {
err := wait.PollImmediateUntil(syncedPollPeriod, func() (done bool, err error) {
if b.inSync {
return true, nil
}
return false, nil
}, stopCh)

if err != nil {
klog.V(2).Infof("stop requested")
return false
}

klog.V(4).Infof("caches populated")
return true
}
26 changes: 11 additions & 15 deletions pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package adaper

import (
"github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer"
clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"k8s.io/klog/v2"
)

type CalicoETCDAdapter struct {
sync bool
watchSyncer api.Syncer
etcdClient api.Client
clusterNodeLister clusterlister.ClusterNodeLister
processor lifted.AsyncWorker
Expand All @@ -25,7 +28,14 @@ func NewCalicoETCDAdapter(etcdClient api.Client,
}

func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error {
// todo use c.etcdClient to list and watch blockaffinity in etcd
blockEventHandler := blockwatchsyncer.NewBlockEventHandler(c.processor)
c.watchSyncer = blockwatchsyncer.NewBlockWatchSyncer(c.etcdClient, blockEventHandler)
c.watchSyncer.Start()
blockEventHandler.Run(stopCh)

blockEventHandler.WaitForCacheSync(stopCh)
c.sync = true
klog.Info("calico blockaffinities etcd watchsyncer started!")
return nil
}

Expand All @@ -39,17 +49,3 @@ func (c *CalicoETCDAdapter) GetCIDRByNodeName(nodeName string) ([]string, error)
func (c *CalicoETCDAdapter) Synced() bool {
return c.sync
}

func (c *CalicoETCDAdapter) OnAdd(obj interface{}) {
// todo add event info to c.processor
}

// OnUpdate handles object update event and push the object to queue.
func (c *CalicoETCDAdapter) OnUpdate(_, newObj interface{}) {
// todo add event info to c.processor
}

// OnDelete handles object delete event and push the object to queue.
func (c *CalicoETCDAdapter) OnDelete(obj interface{}) {
// todo add event info to c.processor
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading