diff --git a/pkg/yurtmanager/controller/util/node/controller_utils.go b/pkg/yurtmanager/controller/util/node/controller_utils.go index 58408538ba4..7ecedf3cbeb 100644 --- a/pkg/yurtmanager/controller/util/node/controller_utils.go +++ b/pkg/yurtmanager/controller/util/node/controller_utils.go @@ -451,8 +451,8 @@ func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la // If the pod is bound to the node, it will return true; otherwise, it will return false. // The pod is bound to the node if the pod has the following annotations: // - apps.openyurt.io/binding: "true" -// - openyurt.beta.io/autonomy: "true" -// - openyurt.io/autonomy-duration: "duration" +// - node.beta.openyurt.io/autonomy: "true" +// - node.openyurt.io/autonomy-duration: "duration" func IsPodBoundenToNode(node *corev1.Node) bool { if node.Annotations == nil { return false diff --git a/pkg/yurtmanager/controller/yurtcoordinator/podbinding/pod_binding_controller.go b/pkg/yurtmanager/controller/yurtcoordinator/podbinding/pod_binding_controller.go index d5f117884cd..b6f58bc695b 100644 --- a/pkg/yurtmanager/controller/yurtcoordinator/podbinding/pod_binding_controller.go +++ b/pkg/yurtmanager/controller/yurtcoordinator/podbinding/pod_binding_controller.go @@ -19,16 +19,22 @@ package podbinding import ( "context" "fmt" + "reflect" + "strconv" "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -40,76 +46,147 @@ import ( ) const ( - defaultTolerationSeconds int64 = 300 + originalNotReadyTolerationDurationAnnotation = "apps.openyurt.io/original-not-ready-toleration-duration" + originalUnreachableTolerationDurationAnnotation = "apps.openyurt.io/original-unreachable-toleration-duration" ) var ( - controllerKind = appsv1.SchemeGroupVersion.WithKind("Node") - - notReadyToleration = corev1.Toleration{ - Key: corev1.TaintNodeNotReady, - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoExecute, - } - - unreachableToleration = corev1.Toleration{ - Key: corev1.TaintNodeUnreachable, - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoExecute, + controllerKind = appsv1.SchemeGroupVersion.WithKind("Node") + TolerationKeyToAnnotation = map[string]string{ + corev1.TaintNodeNotReady: originalNotReadyTolerationDurationAnnotation, + corev1.TaintNodeUnreachable: originalUnreachableTolerationDurationAnnotation, } ) -func Format(format string, args ...interface{}) string { - s := fmt.Sprintf(format, args...) - return fmt.Sprintf("%s: %s", names.PodBindingController, s) -} - type ReconcilePodBinding struct { client.Client } // Add creates a PodBingding controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller // and Start it when the Manager is Started. -func Add(ctx context.Context, c *appconfig.CompletedConfig, mgr manager.Manager) error { - klog.Info(Format("podbinding-controller add controller %s", controllerKind.String())) - return add(mgr, c, newReconciler(c, mgr)) -} +func Add(ctx context.Context, cfg *appconfig.CompletedConfig, mgr manager.Manager) error { + klog.Infof("podbinding-controller add controller %s", controllerKind.String()) -// newReconciler returns a new reconcile.Reconciler -func newReconciler(_ *appconfig.CompletedConfig, mgr manager.Manager) reconcile.Reconciler { - return &ReconcilePodBinding{ + reconciler := &ReconcilePodBinding{ Client: yurtClient.GetClientByControllerNameOrDie(mgr, names.PodBindingController), } -} -// add adds a new Controller to mgr with r as the reconcile.Reconciler -func add(mgr manager.Manager, cfg *appconfig.CompletedConfig, r reconcile.Reconciler) error { c, err := controller.New(names.PodBindingController, mgr, controller.Options{ - Reconciler: r, MaxConcurrentReconciles: int(cfg.ComponentConfig.PodBindingController.ConcurrentPodBindingWorkers), + Reconciler: reconciler, MaxConcurrentReconciles: int(cfg.ComponentConfig.PodBindingController.ConcurrentPodBindingWorkers), }) if err != nil { return err } - return c.Watch(source.Kind[client.Object](mgr.GetCache(), &corev1.Node{}, &handler.EnqueueRequestForObject{})) - //err = c.Watch(&source.Kind{Type: &corev1.Node{}}, &handler.EnqueueRequestForObject{}) - //if err != nil { - // return err - //} - // - //klog.V(4).Info(Format("registering the field indexers of podbinding controller")) - // IndexField for spec.nodeName is registered in NodeLifeCycle, so we remove it here. - //err = mgr.GetFieldIndexer().IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(rawObj client.Object) []string { - // pod, ok := rawObj.(*corev1.Pod) - // if ok { - // return []string{pod.Spec.NodeName} - // } - // return []string{} - //}) - //if err != nil { - // klog.Error(Format("could not register field indexers for podbinding controller, %v", err)) - //} - //return err + nodeHandler := handler.Funcs{ + UpdateFunc: func(ctx context.Context, updateEvent event.TypedUpdateEvent[client.Object], wq workqueue.RateLimitingInterface) { + newNode := updateEvent.ObjectNew.(*corev1.Node) + pods, err := reconciler.getPodsAssignedToNode(newNode.Name) + if err != nil { + return + } + + for i := range pods { + // skip DaemonSet pods and static pod + if isDaemonSetPodOrStaticPod(&pods[i]) { + continue + } + if len(pods[i].Spec.NodeName) != 0 { + wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: pods[i].Spec.NodeName}}) + } + } + }, + } + + nodePredicate := predicate.Funcs{ + CreateFunc: func(evt event.CreateEvent) bool { + return false + }, + DeleteFunc: func(evt event.DeleteEvent) bool { + return false + }, + UpdateFunc: func(evt event.UpdateEvent) bool { + oldNode, ok := evt.ObjectOld.(*corev1.Node) + if !ok { + return false + } + newNode, ok := evt.ObjectNew.(*corev1.Node) + if !ok { + return false + } + // only enqueue if autonomy annotations changed + if (oldNode.Annotations[projectinfo.GetAutonomyAnnotation()] != newNode.Annotations[projectinfo.GetAutonomyAnnotation()]) || + (oldNode.Annotations[projectinfo.GetNodeAutonomyDurationAnnotation()] != newNode.Annotations[projectinfo.GetNodeAutonomyDurationAnnotation()]) { + return true + } + return false + }, + GenericFunc: func(evt event.GenericEvent) bool { + return false + }, + } + if err := c.Watch(source.Kind[client.Object](mgr.GetCache(), &corev1.Node{}, &nodeHandler, nodePredicate)); err != nil { + return err + } + + podPredicate := predicate.Funcs{ + CreateFunc: func(evt event.CreateEvent) bool { + pod, ok := evt.Object.(*corev1.Pod) + if !ok { + return false + } + // skip daemonset pod and static pod + if isDaemonSetPodOrStaticPod(pod) { + return false + } + + // check all pods with node name when yurt-manager restarts + if len(pod.Spec.NodeName) != 0 { + return true + } + return false + }, + UpdateFunc: func(evt event.UpdateEvent) bool { + oldPod, ok := evt.ObjectOld.(*corev1.Pod) + if !ok { + return false + } + newPod, ok := evt.ObjectNew.(*corev1.Pod) + if !ok { + return false + } + // skip daemonset pod and static pod + if isDaemonSetPodOrStaticPod(newPod) { + return false + } + + // reconcile pod in the following cases: + // 1. pod is assigned to a node + // 2. pod tolerations is changed + // 3. original not ready toleration of pod is changed + // 4. original unreachable toleration of pod is changed + if (oldPod.Spec.NodeName != newPod.Spec.NodeName) || + !reflect.DeepEqual(oldPod.Spec.Tolerations, newPod.Spec.Tolerations) || + (oldPod.Annotations[originalNotReadyTolerationDurationAnnotation] != newPod.Annotations[originalNotReadyTolerationDurationAnnotation]) || + (oldPod.Annotations[originalUnreachableTolerationDurationAnnotation] != newPod.Annotations[originalUnreachableTolerationDurationAnnotation]) { + return true + } + + return false + }, + DeleteFunc: func(evt event.DeleteEvent) bool { + return false + }, + GenericFunc: func(evt event.GenericEvent) bool { + return false + }, + } + + if err := c.Watch(source.Kind[client.Object](mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}, podPredicate)); err != nil { + return err + } + + return nil } // +kubebuilder:rbac:groups="",resources=nodes,verbs=get @@ -117,49 +194,68 @@ func add(mgr manager.Manager, cfg *appconfig.CompletedConfig, r reconcile.Reconc // Reconcile reads that state of Node in cluster and makes changes if node autonomy state has been changed func (r *ReconcilePodBinding) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - var err error - node := &corev1.Node{} - if err = r.Get(ctx, req.NamespacedName, node); err != nil { - klog.V(4).Info(Format("node not found for %q\n", req.NamespacedName)) + pod := &corev1.Pod{} + if err := r.Get(ctx, req.NamespacedName, pod); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) } - klog.V(4).Info(Format("node request: %s\n", node.Name)) + klog.Infof("reconcile pod request: %s/%s", pod.Namespace, pod.Name) - if err := r.processNode(node); err != nil { + if err := r.reconcilePod(pod); err != nil { return reconcile.Result{}, err } return reconcile.Result{}, nil } -func (r *ReconcilePodBinding) processNode(node *corev1.Node) error { - // if node has autonomy annotation, we need to see if pods on this node except DaemonSet/Static ones need a treat - pods, err := r.getPodsAssignedToNode(node.Name) - if err != nil { - return err +func (r *ReconcilePodBinding) reconcilePod(pod *corev1.Pod) error { + // skip pod which is not assigned to node + if len(pod.Spec.NodeName) == 0 { + return nil } - for i := range pods { - pod := &pods[i] - klog.V(5).Info(Format("pod %d on node %s: %s", i, node.Name, pod.Name)) - // skip DaemonSet pods and static pod - if isDaemonSetPodOrStaticPod(pod) { - continue - } + node := &corev1.Node{} + if err := r.Get(context.Background(), client.ObjectKey{Name: pod.Spec.NodeName}, node); err != nil { + return client.IgnoreNotFound(err) + } - // skip not running pods - if pod.Status.Phase != corev1.PodRunning { - continue + storedPod := pod.DeepCopy() + if isAutonomous, duration := resolveNodeAutonomySetting(node); isAutonomous { + // update pod tolerationSeconds according to node autonomy annotation, + // store the original toleration seconds into pod annotations. + for i := range pod.Spec.Tolerations { + if (pod.Spec.Tolerations[i].Key == corev1.TaintNodeNotReady || pod.Spec.Tolerations[i].Key == corev1.TaintNodeUnreachable) && + (pod.Spec.Tolerations[i].Effect == corev1.TaintEffectNoExecute) { + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + if _, ok := pod.Annotations[TolerationKeyToAnnotation[pod.Spec.Tolerations[i].Key]]; !ok { + pod.Annotations[TolerationKeyToAnnotation[pod.Spec.Tolerations[i].Key]] = fmt.Sprintf("%d", *pod.Spec.Tolerations[i].TolerationSeconds) + } + pod.Spec.Tolerations[i].TolerationSeconds = duration + } } - - // pod binding takes precedence against node autonomy - if nodeutil.IsPodBoundenToNode(node) { - durationSeconds := getPodTolerationSeconds(node) - if err := r.configureTolerationForPod(pod, durationSeconds); err != nil { - klog.Error(Format("could not configure toleration of pod, %v", err)) + } else { + // restore toleration seconds from original toleration seconds annotations + for i := range pod.Spec.Tolerations { + if (pod.Spec.Tolerations[i].Key == corev1.TaintNodeNotReady || pod.Spec.Tolerations[i].Key == corev1.TaintNodeUnreachable) && + (pod.Spec.Tolerations[i].Effect == corev1.TaintEffectNoExecute) { + if durationStr, ok := pod.Annotations[TolerationKeyToAnnotation[pod.Spec.Tolerations[i].Key]]; ok { + duration, err := strconv.ParseInt(durationStr, 10, 64) + if err != nil { + continue + } + pod.Spec.Tolerations[i].TolerationSeconds = &duration + } } } } + + if !reflect.DeepEqual(storedPod, pod) { + if err := r.Update(context.TODO(), pod, &client.UpdateOptions{}); err != nil { + klog.Errorf("could not update pod(%s/%s), %v", pod.Namespace, pod.Name, err) + return err + } + } return nil } @@ -173,35 +269,12 @@ func (r *ReconcilePodBinding) getPodsAssignedToNode(name string) ([]corev1.Pod, podList := &corev1.PodList{} err := r.List(context.TODO(), podList, listOptions) if err != nil { - klog.Error(Format("could not get podList for node(%s), %v", name, err)) + klog.Errorf("could not get podList for node(%s), %v", name, err) return nil, err } return podList.Items, nil } -func (r *ReconcilePodBinding) configureTolerationForPod(pod *corev1.Pod, tolerationSeconds *int64) error { - // reset toleration seconds - notReadyToleration.TolerationSeconds = tolerationSeconds - unreachableToleration.TolerationSeconds = tolerationSeconds - toleratesNodeNotReady := addOrUpdateTolerationInPodSpec(&pod.Spec, ¬ReadyToleration) - toleratesNodeUnreachable := addOrUpdateTolerationInPodSpec(&pod.Spec, &unreachableToleration) - - if toleratesNodeNotReady || toleratesNodeUnreachable { - if tolerationSeconds == nil { - klog.V(4).Info(Format("pod(%s/%s) => toleratesNodeNotReady=%v, toleratesNodeUnreachable=%v, tolerationSeconds=0", pod.Namespace, pod.Name, toleratesNodeNotReady, toleratesNodeUnreachable)) - } else { - klog.V(4).Info(Format("pod(%s/%s) => toleratesNodeNotReady=%v, toleratesNodeUnreachable=%v, tolerationSeconds=%d", pod.Namespace, pod.Name, toleratesNodeNotReady, toleratesNodeUnreachable, *tolerationSeconds)) - } - err := r.Update(context.TODO(), pod, &client.UpdateOptions{}) - if err != nil { - klog.Error(Format("could not update toleration of pod(%s/%s), %v", pod.Namespace, pod.Name, err)) - return err - } - } - - return nil -} - func isDaemonSetPodOrStaticPod(pod *corev1.Pod) bool { if pod != nil { for i := range pod.OwnerReferences { @@ -218,71 +291,45 @@ func isDaemonSetPodOrStaticPod(pod *corev1.Pod) bool { return false } -// addOrUpdateTolerationInPodSpec tries to add a toleration to the toleration list in PodSpec. -// Returns true if something was updated, false otherwise. -func addOrUpdateTolerationInPodSpec(spec *corev1.PodSpec, toleration *corev1.Toleration) bool { - podTolerations := spec.Tolerations - - var newTolerations []corev1.Toleration - updated := false - for i := range podTolerations { - if toleration.MatchToleration(&podTolerations[i]) { - if (toleration.TolerationSeconds == nil && podTolerations[i].TolerationSeconds == nil) || - (toleration.TolerationSeconds != nil && podTolerations[i].TolerationSeconds != nil && - (*toleration.TolerationSeconds == *podTolerations[i].TolerationSeconds)) { - return false - } - - newTolerations = append(newTolerations, *toleration) - updated = true - continue - } - - newTolerations = append(newTolerations, podTolerations[i]) - } - - if !updated { - newTolerations = append(newTolerations, *toleration) - } - - spec.Tolerations = newTolerations - return true -} - -// getPodTolerationSeconds returns the tolerationSeconds for the pod on the node. -// The tolerationSeconds is calculated based on the following rules: -// 1. The default tolerationSeconds is 300 if node autonomy and autonomy duration are not set. -// 2. Node autonomy is set, the tolerationSeconds is nil. -// 3. If the node has node autonomy duration annotation, the tolerationSeconds is the duration. -// 4. If the autonomy duration is parsed as 0, the tolerationSeconds is nil which means the pod will not be evicted. -func getPodTolerationSeconds(node *corev1.Node) *int64 { - tolerationSeconds := defaultTolerationSeconds +// resolveNodeAutonomySetting is used for resolving node autonomy information. +// The node is configured as autonomous if the node has the following annotations: +// -[deprecated] apps.openyurt.io/binding: "true" +// -[deprecated] node.beta.openyurt.io/autonomy: "true" +// -[recommended] node.openyurt.io/autonomy-duration: "duration" +// +// The first return value indicates whether the node has autonomous mode enabled: +// true means autonomy is enabled, while false means it is not. +// The second return value is only relevant when the first return value is true and +// can be ignored otherwise. This value represents the duration of the node's autonomy. +// If the duration of heartbeat loss is leass then this period, pods on the node will not be evicted. +// However, if the duration of heartbeat loss exceeds this period, then the pods on the node will be evicted. +func resolveNodeAutonomySetting(node *corev1.Node) (bool, *int64) { if len(node.Annotations) == 0 { - return &tolerationSeconds + return false, nil } // Pod binding takes precedence against node autonomy if node.Annotations[nodeutil.PodBindingAnnotation] == "true" || node.Annotations[projectinfo.GetAutonomyAnnotation()] == "true" { - return nil + return true, nil } // Node autonomy duration has the least precedence duration, ok := node.Annotations[projectinfo.GetNodeAutonomyDurationAnnotation()] if !ok { - return &tolerationSeconds + return false, nil } durationTime, err := time.ParseDuration(duration) if err != nil { - klog.Error(Format("could not parse duration %s, %v", duration, err)) - return nil + klog.Errorf("could not parse autonomy duration %s, %v", duration, err) + return false, nil } - if durationTime == 0 { - return nil + if durationTime <= 0 { + return true, nil } - tolerationSeconds = int64(durationTime.Seconds()) - return &tolerationSeconds + tolerationSeconds := int64(durationTime.Seconds()) + return true, &tolerationSeconds } diff --git a/pkg/yurtmanager/controller/yurtcoordinator/podbinding/pod_binding_controller_test.go b/pkg/yurtmanager/controller/yurtcoordinator/podbinding/pod_binding_controller_test.go index df1840d4fd6..d8bf46857e3 100644 --- a/pkg/yurtmanager/controller/yurtcoordinator/podbinding/pod_binding_controller_test.go +++ b/pkg/yurtmanager/controller/yurtcoordinator/podbinding/pod_binding_controller_test.go @@ -19,517 +19,553 @@ package podbinding import ( "context" "reflect" + "strings" "testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" - - nodeutil "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/util/node" ) -var ( - TestNodesName = []string{"node1", "node2", "node3", "node4"} - TestPodsName = []string{"pod1", "pod2", "pod3", "pod4"} -) +func podIndexer(rawObj client.Object) []string { + pod, ok := rawObj.(*corev1.Pod) + if !ok { + return []string{} + } + if len(pod.Spec.NodeName) == 0 { + return []string{} + } + return []string{pod.Spec.NodeName} +} -func prepareNodes() []client.Object { - nodes := []client.Object{ - &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node1", - }, - }, - &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node2", - Annotations: map[string]string{ - "node.beta.openyurt.io/autonomy": "true", +type FakeCountingClient struct { + client.Client + UpdateCount int +} + +func (c *FakeCountingClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + c.UpdateCount++ + return c.Client.Update(ctx, obj, opts...) +} + +func TestReconcile(t *testing.T) { + second1 := int64(300) + second2 := int64(100) + testcases := map[string]struct { + pod *corev1.Pod + node *corev1.Node + resultPod *corev1.Pod + resultErr error + resultCount int + }{ + "update pod toleration seconds as node autonomy setting": { + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, }, - }, - }, - &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node3", - Annotations: map[string]string{ - "apps.openyurt.io/binding": "true", + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + }, }, }, - }, - &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node4", - Annotations: map[string]string{ - "node.openyurt.io/autonomy-duration": "0", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "node.openyurt.io/autonomy-duration": "100s", + }, }, }, - }, - &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node5", - Annotations: map[string]string{ - "node.openyurt.io/autonomy-duration": "2h", + resultPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + originalNotReadyTolerationDurationAnnotation: "300", + originalUnreachableTolerationDurationAnnotation: "300", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second2, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second2, + }, + }, }, }, + resultCount: 1, }, - &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node6", - Annotations: map[string]string{ - "node.openyurt.io/autonomy-duration": "", + "update pod toleration seconds with node autonomy duration is 0": { + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + }, }, }, - }, - &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node7", - Annotations: map[string]string{}, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "node.openyurt.io/autonomy-duration": "0s", + }, + }, }, - }, - &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node8", - Annotations: map[string]string{ - "other.annotation": "true", + resultPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + originalNotReadyTolerationDurationAnnotation: "300", + originalUnreachableTolerationDurationAnnotation: "300", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + }, + }, }, }, + resultCount: 1, }, - } - return nodes -} - -func preparePods() []client.Object { - second1 := int64(300) - second2 := int64(100) - pods := []client.Object{ - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: metav1.NamespaceDefault, - OwnerReferences: []metav1.OwnerReference{ - { - Kind: "DaemonSet", + "restore pod toleration seconds as node autonomy setting": { + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + originalNotReadyTolerationDurationAnnotation: "100", + originalUnreachableTolerationDurationAnnotation: "100", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, }, }, }, - }, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod2", - Namespace: metav1.NamespaceDefault, - Annotations: map[string]string{ - corev1.MirrorPodAnnotationKey: "03b446125f489d8b04a90de0899657ca", + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", }, }, - Spec: corev1.PodSpec{ - Tolerations: []corev1.Toleration{ - { - Key: corev1.TaintNodeNotReady, - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoExecute, + resultPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + originalNotReadyTolerationDurationAnnotation: "100", + originalUnreachableTolerationDurationAnnotation: "100", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second2, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second2, + }, }, }, - NodeName: "node1", }, + resultCount: 1, }, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod3", - Namespace: metav1.NamespaceDefault, - }, - Spec: corev1.PodSpec{ - Tolerations: []corev1.Toleration{ - { - Key: corev1.TaintNodeNotReady, - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoExecute, - TolerationSeconds: &second1, - }, - { - Key: corev1.TaintNodeUnreachable, - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoExecute, - TolerationSeconds: &second1, - }, - }, - NodeName: "node1", + "pod toleration seconds is not changed with invalid duration": { + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + originalNotReadyTolerationDurationAnnotation: "300", + originalUnreachableTolerationDurationAnnotation: "300", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + }, + }, }, - }, - &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod4", - Namespace: metav1.NamespaceDefault, + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + "node.openyurt.io/autonomy-duration": "invalid duration", + }, + }, }, - Spec: corev1.PodSpec{ - Tolerations: []corev1.Toleration{ - { - Key: corev1.TaintNodeNotReady, - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoExecute, - TolerationSeconds: &second2, + resultPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + originalNotReadyTolerationDurationAnnotation: "300", + originalUnreachableTolerationDurationAnnotation: "300", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, }, }, }, + resultCount: 0, }, - } - - return pods -} - -func TestReconcile(t *testing.T) { - pods := preparePods() - nodes := prepareNodes() - scheme := runtime.NewScheme() - if err := clientgoscheme.AddToScheme(scheme); err != nil { - t.Fatal("Fail to add kubernetes clint-go custom resource") - } - c := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(pods...).WithIndex(&corev1.Pod{}, "spec.nodeName", podIndexer).WithObjects(nodes...).Build() - - for i := range TestNodesName { - var req = reconcile.Request{NamespacedName: types.NamespacedName{Name: TestNodesName[i]}} - rsp := ReconcilePodBinding{ - Client: c, - } - - _, err := rsp.Reconcile(context.TODO(), req) - if err != nil { - t.Errorf("Reconcile() error = %v", err) - return - } - - pod := &corev1.Pod{} - err = c.Get(context.TODO(), types.NamespacedName{Namespace: metav1.NamespaceDefault, Name: TestPodsName[i]}, pod) - if err != nil { - continue - } - t.Logf("pod %s Tolerations is %+v", TestPodsName[i], pod.Spec.Tolerations) - } -} - -func TestConfigureTolerationForPod(t *testing.T) { - pods := preparePods() - nodes := prepareNodes() - c := fakeclient.NewClientBuilder().WithObjects(pods...).WithObjects(nodes...).Build() - - second := int64(300) - tests := []struct { - name string - pod *corev1.Pod - tolerationSeconds *int64 - wantErr bool - }{ - { - name: "test1", - pod: pods[0].(*corev1.Pod), - tolerationSeconds: &second, - wantErr: false, - }, - { - name: "test2", - pod: pods[1].(*corev1.Pod), - tolerationSeconds: &second, - wantErr: false, - }, - { - name: "test3", - pod: pods[2].(*corev1.Pod), - tolerationSeconds: &second, - wantErr: false, - }, - { - name: "test4", - pod: pods[3].(*corev1.Pod), - tolerationSeconds: &second, - wantErr: false, - }, - { - name: "test5", + "pod related node is not found": { pod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "pod5", + Name: "pod1", Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + originalNotReadyTolerationDurationAnnotation: "300", + originalUnreachableTolerationDurationAnnotation: "300", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + }, }, }, - tolerationSeconds: &second, - wantErr: true, - }, - { - name: "test6", - pod: &corev1.Pod{ + resultPod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "pod5", + Name: "pod1", Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + originalNotReadyTolerationDurationAnnotation: "300", + originalUnreachableTolerationDurationAnnotation: "300", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + { + Key: corev1.TaintNodeNotReady, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + { + Key: corev1.TaintNodeUnreachable, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoExecute, + TolerationSeconds: &second1, + }, + }, }, }, - tolerationSeconds: nil, - wantErr: true, + resultCount: 0, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := &ReconcilePodBinding{ - Client: c, + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + builder := fakeclient.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(tc.pod).WithIndex(&corev1.Pod{}, "spec.nodeName", podIndexer) + if tc.node != nil { + builder.WithObjects(tc.node) } - if err := r.configureTolerationForPod(tt.pod, tt.tolerationSeconds); (err != nil) != tt.wantErr { - t.Errorf("configureTolerationForPod() error = %v, wantErr %v", err, tt.wantErr) + + fClient := &FakeCountingClient{ + Client: builder.Build(), } - }) - } -} -func podIndexer(rawObj client.Object) []string { - pod, ok := rawObj.(*corev1.Pod) - if !ok { - return []string{} - } - if len(pod.Spec.NodeName) == 0 { - return []string{} - } - return []string{pod.Spec.NodeName} -} + reconciler := ReconcilePodBinding{ + Client: fClient, + } -func TestGetPodsAssignedToNode(t *testing.T) { - pods := preparePods() - c := fakeclient.NewClientBuilder().WithObjects(pods...).WithIndex(&corev1.Pod{}, "spec.nodeName", podIndexer).Build() - tests := []struct { - name string - nodeName string - want []corev1.Pod - wantErr bool - }{ - { - name: "test1", - nodeName: "node1", - want: []corev1.Pod{ - *pods[1].(*corev1.Pod), - *pods[2].(*corev1.Pod), - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := &ReconcilePodBinding{ - Client: c, + var req = reconcile.Request{NamespacedName: types.NamespacedName{Namespace: tc.pod.Namespace, Name: tc.pod.Name}} + + _, err := reconciler.Reconcile(context.TODO(), req) + if tc.resultErr != nil { + if err == nil || !strings.Contains(err.Error(), tc.resultErr.Error()) { + t.Errorf("expect error %s, but got %s", tc.resultErr.Error(), err.Error()) + } } - // By the way, the fake client not support ListOptions.FieldSelector, only Namespace and LabelSelector - // For more details, see sigs.k8s.io/controller-runtime@v0.10.3/pkg/client/fake/client.go:366 - got, err := r.getPodsAssignedToNode(tt.nodeName) - if (err != nil) != tt.wantErr { - t.Errorf("getPodsAssignedToNode() error = %v, wantErr %v", err, tt.wantErr) - return + + if fClient.UpdateCount != tc.resultCount { + t.Errorf("expect update count %d, but got %d", tc.resultCount, fClient.UpdateCount) } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("getPodsAssignedToNode() got = %v\n, want %v\n", got, tt.want) + + if tc.resultPod != nil { + currentPod := &corev1.Pod{} + err = reconciler.Get(context.TODO(), types.NamespacedName{Namespace: tc.pod.Namespace, Name: tc.pod.Name}, currentPod) + if err != nil { + t.Errorf("couldn't get current pod, %v", err) + return + } + + if !reflect.DeepEqual(tc.resultPod.Annotations, currentPod.Annotations) { + t.Errorf("expect pod annotations %v, but got %v", tc.resultPod.Annotations, currentPod.Annotations) + } + + if !reflect.DeepEqual(tc.resultPod.Spec.Tolerations, tc.resultPod.Spec.Tolerations) { + t.Errorf("expect pod annotations %v, but got %v", tc.resultPod.Spec.Tolerations, currentPod.Spec.Tolerations) + } } }) } } -func TestAddOrUpdateTolerationInPodSpec(t *testing.T) { - pods := preparePods() - second := int64(300) - tests := []struct { - name string - pod *corev1.Pod - want bool +func TestGetPodsAssignedToNode(t *testing.T) { + testcases := map[string]struct { + nodeName string + pods []client.Object + resultPods sets.Set[string] + resultErr error }{ - { - name: "toleration1", - pod: pods[0].(*corev1.Pod), - want: true, - }, - { - name: "toleration2", - pod: pods[1].(*corev1.Pod), - want: false, - }, - { - name: "toleration3", - pod: pods[2].(*corev1.Pod), - want: false, + "all pods are related to node": { + nodeName: "node1", + pods: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: metav1.NamespaceDefault, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + }, + }, + resultPods: sets.New("pod1", "pod2"), }, - { - name: "toleration4", - pod: pods[3].(*corev1.Pod), - want: true, + "not all pods are related to node": { + nodeName: "node1", + pods: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: metav1.NamespaceDefault, + }, + Spec: corev1.PodSpec{ + NodeName: "node2", + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + Namespace: metav1.NamespaceDefault, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + }, + }, + resultPods: sets.New("pod1", "pod3"), }, } - for _, tt := range tests { - toleration := corev1.Toleration{ - Key: corev1.TaintNodeNotReady, - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoExecute, - TolerationSeconds: &second, - } - if tt.name == "toleration2" { - toleration = corev1.Toleration{ - Key: corev1.TaintNodeNotReady, - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoExecute, - } - } - t.Run(tt.name, func(t *testing.T) { - if got := addOrUpdateTolerationInPodSpec(&tt.pod.Spec, &toleration); got != tt.want { - t.Errorf("addOrUpdateTolerationInPodSpec() = %v, want %v", got, tt.want) + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + builder := fakeclient.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(tc.pods...).WithIndex(&corev1.Pod{}, "spec.nodeName", podIndexer) + fClient := &FakeCountingClient{ + Client: builder.Build(), } - }) - } -} -func TestIsDaemonSetPodOrStaticPod(t *testing.T) { - pods := preparePods() - tests := []struct { - name string - pod *corev1.Pod - want bool - }{ - { - name: "pod0", - pod: nil, - want: false, - }, - { - name: "pod1", - pod: pods[0].(*corev1.Pod), - want: true, - }, - { - name: "pod2", - pod: pods[1].(*corev1.Pod), - want: true, - }, - { - name: "pod3", - pod: pods[2].(*corev1.Pod), - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := isDaemonSetPodOrStaticPod(tt.pod); got != tt.want { - t.Errorf("isDaemonSetPodOrStaticPod() = %v, want %v", got, tt.want) + reconciler := ReconcilePodBinding{ + Client: fClient, + } + pods, err := reconciler.getPodsAssignedToNode(tc.nodeName) + if tc.resultErr != nil { + if err == nil || !strings.Contains(err.Error(), tc.resultErr.Error()) { + t.Errorf("expect error %s, but got %s", tc.resultErr.Error(), err.Error()) + } } - }) - } -} -func TestIsPodBoundenToNode(t *testing.T) { - nodes := prepareNodes() - tests := []struct { - name string - node *corev1.Node - want bool - }{ - { - name: "node1", - node: nodes[0].(*corev1.Node), - want: false, - }, - { - name: "node2", - node: nodes[1].(*corev1.Node), - want: true, - }, - { - name: "node3", - node: nodes[2].(*corev1.Node), - want: true, - }, - { - name: "node4", - node: nodes[3].(*corev1.Node), - want: true, - }, - { - name: "node5", - node: nodes[4].(*corev1.Node), - want: true, - }, - { - name: "node6", - node: nodes[5].(*corev1.Node), - want: false, - }, - { - name: "node7", - node: nodes[6].(*corev1.Node), - want: false, - }, - { - name: "node8", - node: nodes[7].(*corev1.Node), - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := nodeutil.IsPodBoundenToNode(tt.node); got != tt.want { - t.Errorf("IsPodBoundenToNode() = %v, want %v", got, tt.want) + if len(tc.resultPods) != 0 { + if len(pods) != len(tc.resultPods) { + t.Errorf("expect pods count %d, but got %d", len(tc.resultPods), len(pods)) + } + + currentPods := sets.New[string]() + for i := range pods { + currentPods.Insert(pods[i].Name) + } + if !currentPods.Equal(tc.resultPods) { + t.Errorf("expect pods %v, but got %v", tc.resultPods.UnsortedList(), currentPods.UnsortedList()) + } } }) } } -func TestGetPodTolerationSeconds(t *testing.T) { - expectedToleration := int64(7200) - defaultTolerationSeconds := int64(300) - nodes := prepareNodes() - tests := []struct { - name string - node *corev1.Node - want *int64 +func TestIsDaemonSetPodOrStaticPod(t *testing.T) { + testcases := map[string]struct { + pod *corev1.Pod + result bool }{ - { - name: "node1", - node: nodes[0].(*corev1.Node), - want: &defaultTolerationSeconds, - }, - { - name: "node2", - node: nodes[1].(*corev1.Node), - want: nil, - }, - { - name: "node3", - node: nodes[2].(*corev1.Node), - want: nil, - }, - { - name: "node4", - node: nodes[3].(*corev1.Node), - want: nil, - }, - { - name: "node5", - node: nodes[4].(*corev1.Node), - want: &expectedToleration, - }, - { - name: "node6", - node: nodes[5].(*corev1.Node), - want: nil, + "normal pod": { + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + }, + }, + result: false, }, - { - name: "node7", - node: nodes[6].(*corev1.Node), - want: &defaultTolerationSeconds, + "daemon pod": { + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "DaemonSet", + Name: "kube-proxy", + }, + }, + }, + }, + result: true, }, - { - name: "node8", - node: nodes[7].(*corev1.Node), - want: &defaultTolerationSeconds, + "static pod": { + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + "kubernetes.io/config.mirror": "abcdef123456789", + "kubernetes.io/config.seen": "2025-01-02", + "kubernetes.io/config.source": "file", + }, + }, + }, + result: true, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := getPodTolerationSeconds(tt.node); !reflect.DeepEqual(got, tt.want) { - t.Errorf("getPodTolerationSeconds() = %v, want %v", got, tt.want) + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + if got := isDaemonSetPodOrStaticPod(tc.pod); got != tc.result { + t.Errorf("isDaemonSetPodOrStaticPod() got = %v, expect %v", got, tc.result) } }) }