Skip to content

Commit

Permalink
Merge pull request kosmos-io#793 from lyzuiui/dev-globalnode-mod
Browse files Browse the repository at this point in the history
Fix for the issue where the GlobalNode is in a NotReady state at certain times.
  • Loading branch information
duanmengkk authored Dec 31, 2024
2 parents 97c791a + 0f790a8 commit fb886fb
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package globalnodecontroller

import (
"context"
"reflect"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -55,6 +56,31 @@ func compareMaps(map1, map2 map[string]string) bool {
return true
}

// CustomPredicateForGlobalNode is used for event filtering of the GlobalNode resource.
var CustomPredicateForGlobalNode = predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
return true
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
oldObj, okOld := updateEvent.ObjectOld.(*v1alpha1.GlobalNode)
newObj, okNew := updateEvent.ObjectNew.(*v1alpha1.GlobalNode)

if !okOld || !okNew {
return true
}

specChanged := !reflect.DeepEqual(oldObj.Spec, newObj.Spec)

return specChanged
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
GenericFunc: func(e event.GenericEvent) bool {
return true
},
}

func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error {
if r.Client == nil {
r.Client = mgr.GetClient()
Expand All @@ -70,6 +96,7 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error {
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
oldObj := updateEvent.ObjectOld.(*v1.Node)
newObj := updateEvent.ObjectNew.(*v1.Node)

return !compareMaps(oldObj.Labels, newObj.Labels)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
Expand All @@ -81,12 +108,13 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error {
})).
Watches(&source.Kind{Type: &v1alpha1.GlobalNode{}}, handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request {
gn := a.(*v1alpha1.GlobalNode)

return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: gn.Name,
}},
}
})).
}), builder.WithPredicates(CustomPredicateForGlobalNode)).
// Watches(&source.Kind{Type: &v1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.newNodeMapFunc())).
Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(r.newVirtualClusterMapFunc())).
Complete(r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package globalnodecontroller

import (
"context"
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -18,15 +20,24 @@ import (

const (
GlobalNodeStatusControllerName = "global-node-status-controller"
NodeNotReady = v1.NodeConditionType("NotReady")
NodeReady = v1.NodeReady
DefaultStatusUpdateInterval = 15 * time.Second
ClientHeartbeatThreshold = 10 * time.Second
nodeUpdateWorkerSize = 8
RequiredNotReadyCount = 5
)

type nodeHealthData struct {
notReadyCount int
}

type GlobalNodeStatusController struct {
root client.Client
statusInterval time.Duration

kosmosClient versioned.Interface
kosmosClient versioned.Interface
nodeHealthMap sync.Map // map[string]*nodeHealthData
}

func NewGlobalNodeStatusController(
Expand All @@ -37,6 +48,7 @@ func NewGlobalNodeStatusController(
root: root,
statusInterval: DefaultStatusUpdateInterval,
kosmosClient: kosmosClient,
nodeHealthMap: sync.Map{},
}
}
func (c *GlobalNodeStatusController) Start(ctx context.Context) error {
Expand All @@ -47,8 +59,6 @@ func (c *GlobalNodeStatusController) Start(ctx context.Context) error {
}
func (c *GlobalNodeStatusController) syncGlobalNodeStatus(ctx context.Context) {
globalNodes := make([]*v1alpha1.GlobalNode, 0)
//c.globalNodeLock.Lock()
//defer c.globalNodeLock.Unlock()

nodeList, err := c.kosmosClient.KosmosV1alpha1().GlobalNodes().List(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -70,14 +80,23 @@ func (c *GlobalNodeStatusController) updateGlobalNodeStatus(
ctx context.Context,
globalNodes []*v1alpha1.GlobalNode,
) error {
for _, globalNode := range globalNodes {
err := c.updateStatusForGlobalNode(ctx, globalNode)
if err != nil {
klog.Errorf("Failed to update status for global node %s: %v", globalNode.Name, err)
return err
errChan := make(chan error, len(globalNodes))

workqueue.ParallelizeUntil(ctx, nodeUpdateWorkerSize, len(globalNodes), func(piece int) {
node := globalNodes[piece]
if err := c.updateStatusForGlobalNode(ctx, node); err != nil {
klog.Errorf("Failed to update status for global node %s: %v", node.Name, err)
errChan <- err
}
})

close(errChan)

var retErr error
for err := range errChan {
retErr = err
}
return nil
return retErr
}

func (c *GlobalNodeStatusController) updateStatusForGlobalNode(
Expand All @@ -100,13 +119,30 @@ func (c *GlobalNodeStatusController) updateStatusForGlobalNode(
lastHeartbeatTime := condition.LastHeartbeatTime
timeDiff := time.Since(lastHeartbeatTime.Time)

statusType := "Ready"
statusType := NodeReady
if timeDiff > ClientHeartbeatThreshold {
statusType = "NotReady"
statusType = NodeNotReady
}

if string(condition.Type) != statusType {
condition.Type = v1.NodeConditionType(statusType)
dataRaw, _ := c.nodeHealthMap.LoadOrStore(globalNode.Name, &nodeHealthData{})
nh := dataRaw.(*nodeHealthData)

if statusType == NodeNotReady {
nh.notReadyCount++
if condition.Type == NodeReady {
klog.V(2).Infof("GlobalNode %s: notReadyCount=%d, newStatus=%s", globalNode.Name, nh.notReadyCount, statusType)
}
} else {
nh.notReadyCount = 0
}

if nh.notReadyCount > 0 && nh.notReadyCount < RequiredNotReadyCount {
c.nodeHealthMap.Store(globalNode.Name, nh)
return nil
}

if condition.Type != statusType {
condition.Type = statusType
condition.LastTransitionTime = metav1.NewTime(time.Now())

currentNode.Status.Conditions[0] = condition
Expand All @@ -122,10 +158,9 @@ func (c *GlobalNodeStatusController) updateStatusForGlobalNode(
}

klog.Infof("Successfully updated status for GlobalNode %s to %s", globalNode.Name, statusType)
} else {
klog.Infof("No status update required for GlobalNode %s, current status: %s", globalNode.Name, condition.Type)
nh.notReadyCount = 0
c.nodeHealthMap.Store(globalNode.Name, nh)
}

return nil
})
}

0 comments on commit fb886fb

Please sign in to comment.