diff --git a/pkg/kubenest/controller/global.node.controller/global_node_controller.go b/pkg/kubenest/controller/global.node.controller/global_node_controller.go index a9dbacdb7..8ce8a6976 100644 --- a/pkg/kubenest/controller/global.node.controller/global_node_controller.go +++ b/pkg/kubenest/controller/global.node.controller/global_node_controller.go @@ -2,6 +2,7 @@ package globalnodecontroller import ( "context" + "reflect" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -55,6 +56,31 @@ func compareMaps(map1, map2 map[string]string) bool { return true } +// CustomPredicateForGlobalNode is used for event filtering of the GlobalNode resource. +var CustomPredicateForGlobalNode = predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return true + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + oldObj, okOld := updateEvent.ObjectOld.(*v1alpha1.GlobalNode) + newObj, okNew := updateEvent.ObjectNew.(*v1alpha1.GlobalNode) + + if !okOld || !okNew { + return true + } + + specChanged := !reflect.DeepEqual(oldObj.Spec, newObj.Spec) + + return specChanged + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + GenericFunc: func(e event.GenericEvent) bool { + return true + }, +} + func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error { if r.Client == nil { r.Client = mgr.GetClient() @@ -70,6 +96,7 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error { UpdateFunc: func(updateEvent event.UpdateEvent) bool { oldObj := updateEvent.ObjectOld.(*v1.Node) newObj := updateEvent.ObjectNew.(*v1.Node) + return !compareMaps(oldObj.Labels, newObj.Labels) }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { @@ -81,12 +108,13 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error { })). Watches(&source.Kind{Type: &v1alpha1.GlobalNode{}}, handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request { gn := a.(*v1alpha1.GlobalNode) + return []reconcile.Request{ {NamespacedName: types.NamespacedName{ Name: gn.Name, }}, } - })). + }), builder.WithPredicates(CustomPredicateForGlobalNode)). // Watches(&source.Kind{Type: &v1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.newNodeMapFunc())). Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(r.newVirtualClusterMapFunc())). Complete(r) diff --git a/pkg/kubenest/controller/global.node.controller/global_node_lifecycle_controller.go b/pkg/kubenest/controller/global.node.controller/global_node_lifecycle_controller.go index 09851b33a..e2c842213 100644 --- a/pkg/kubenest/controller/global.node.controller/global_node_lifecycle_controller.go +++ b/pkg/kubenest/controller/global.node.controller/global_node_lifecycle_controller.go @@ -2,6 +2,7 @@ package globalnodecontroller import ( "context" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -9,6 +10,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -18,15 +20,24 @@ import ( const ( GlobalNodeStatusControllerName = "global-node-status-controller" + NodeNotReady = v1.NodeConditionType("NotReady") + NodeReady = v1.NodeReady DefaultStatusUpdateInterval = 15 * time.Second ClientHeartbeatThreshold = 10 * time.Second + nodeUpdateWorkerSize = 8 + RequiredNotReadyCount = 5 ) +type nodeHealthData struct { + notReadyCount int +} + type GlobalNodeStatusController struct { root client.Client statusInterval time.Duration - kosmosClient versioned.Interface + kosmosClient versioned.Interface + nodeHealthMap sync.Map // map[string]*nodeHealthData } func NewGlobalNodeStatusController( @@ -37,6 +48,7 @@ func NewGlobalNodeStatusController( root: root, statusInterval: DefaultStatusUpdateInterval, kosmosClient: kosmosClient, + nodeHealthMap: sync.Map{}, } } func (c *GlobalNodeStatusController) Start(ctx context.Context) error { @@ -47,8 +59,6 @@ func (c *GlobalNodeStatusController) Start(ctx context.Context) error { } func (c *GlobalNodeStatusController) syncGlobalNodeStatus(ctx context.Context) { globalNodes := make([]*v1alpha1.GlobalNode, 0) - //c.globalNodeLock.Lock() - //defer c.globalNodeLock.Unlock() nodeList, err := c.kosmosClient.KosmosV1alpha1().GlobalNodes().List(ctx, metav1.ListOptions{}) if err != nil { @@ -70,14 +80,23 @@ func (c *GlobalNodeStatusController) updateGlobalNodeStatus( ctx context.Context, globalNodes []*v1alpha1.GlobalNode, ) error { - for _, globalNode := range globalNodes { - err := c.updateStatusForGlobalNode(ctx, globalNode) - if err != nil { - klog.Errorf("Failed to update status for global node %s: %v", globalNode.Name, err) - return err + errChan := make(chan error, len(globalNodes)) + + workqueue.ParallelizeUntil(ctx, nodeUpdateWorkerSize, len(globalNodes), func(piece int) { + node := globalNodes[piece] + if err := c.updateStatusForGlobalNode(ctx, node); err != nil { + klog.Errorf("Failed to update status for global node %s: %v", node.Name, err) + errChan <- err } + }) + + close(errChan) + + var retErr error + for err := range errChan { + retErr = err } - return nil + return retErr } func (c *GlobalNodeStatusController) updateStatusForGlobalNode( @@ -100,13 +119,30 @@ func (c *GlobalNodeStatusController) updateStatusForGlobalNode( lastHeartbeatTime := condition.LastHeartbeatTime timeDiff := time.Since(lastHeartbeatTime.Time) - statusType := "Ready" + statusType := NodeReady if timeDiff > ClientHeartbeatThreshold { - statusType = "NotReady" + statusType = NodeNotReady } - if string(condition.Type) != statusType { - condition.Type = v1.NodeConditionType(statusType) + dataRaw, _ := c.nodeHealthMap.LoadOrStore(globalNode.Name, &nodeHealthData{}) + nh := dataRaw.(*nodeHealthData) + + if statusType == NodeNotReady { + nh.notReadyCount++ + if condition.Type == NodeReady { + klog.V(2).Infof("GlobalNode %s: notReadyCount=%d, newStatus=%s", globalNode.Name, nh.notReadyCount, statusType) + } + } else { + nh.notReadyCount = 0 + } + + if nh.notReadyCount > 0 && nh.notReadyCount < RequiredNotReadyCount { + c.nodeHealthMap.Store(globalNode.Name, nh) + return nil + } + + if condition.Type != statusType { + condition.Type = statusType condition.LastTransitionTime = metav1.NewTime(time.Now()) currentNode.Status.Conditions[0] = condition @@ -122,10 +158,9 @@ func (c *GlobalNodeStatusController) updateStatusForGlobalNode( } klog.Infof("Successfully updated status for GlobalNode %s to %s", globalNode.Name, statusType) - } else { - klog.Infof("No status update required for GlobalNode %s, current status: %s", globalNode.Name, condition.Type) + nh.notReadyCount = 0 + c.nodeHealthMap.Store(globalNode.Name, nh) } - return nil }) }