From d2e901f1834c920c03c0a3f9205440ff49696dab Mon Sep 17 00:00:00 2001 From: luchunling Date: Wed, 11 Dec 2024 11:12:34 +0800 Subject: [PATCH] Add notready status upload part for one2cluster in UpdateRootNodeStatus switch Signed-off-by: lcl533 --- .../cluster-manager/app/options/options.go | 4 + .../cluster-manager/cluster_controller.go | 2 +- .../controllers/node_lease_controller.go | 34 ++- .../controllers/node_resources_controller.go | 5 +- .../utils/leaf_cluster_condition_cache.go | 94 ++++++++ .../utils/leaf_model_handler.go | 228 +++++++++++++++++- 6 files changed, 351 insertions(+), 16 deletions(-) create mode 100644 pkg/clustertree/cluster-manager/utils/leaf_cluster_condition_cache.go diff --git a/cmd/clustertree/cluster-manager/app/options/options.go b/cmd/clustertree/cluster-manager/app/options/options.go index 373065726..64ac3d211 100644 --- a/cmd/clustertree/cluster-manager/app/options/options.go +++ b/cmd/clustertree/cluster-manager/app/options/options.go @@ -50,6 +50,9 @@ type Options struct { BackoffOpts flags.BackoffOptions SyncPeriod time.Duration + + //Add notready status upload part for one2cluster in UpdateRootNodeStatus + UpdateRootNodeStatusNotready bool } func NewOptions() (*Options, error) { @@ -87,6 +90,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.StringSliceVar(&o.AutoCreateMCSPrefix, "auto-mcs-prefix", []string{}, "The prefix of namespace for service to auto create mcs resources") flags.StringSliceVar(&o.ReservedNamespaces, "reserved-namespaces", []string{"kube-system"}, "The namespaces protected by Kosmos that the controller-manager will skip.") flags.DurationVar(&o.SyncPeriod, "sync-period", 0, "the sync period for informer to resync.") + flags.BoolVar(&o.UpdateRootNodeStatusNotready, "Update-RootNode-Status-Notready", false, "Turn on or off add notready status upload part for one2cluster in UpdateRootNodeStatus") o.RateLimiterOpts.AddFlags(flags) o.BackoffOpts.AddFlags(flags) options.BindLeaderElectionFlags(&o.LeaderElection, flags) diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index c7cbada78..0b67bb484 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -194,7 +194,6 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req if err != nil { return reconcile.Result{}, fmt.Errorf("new manager with err %v, cluster %s", err, cluster.Name) } - leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.RootClientset, leafClient) c.LeafModelHandler = leafModelHandler @@ -286,6 +285,7 @@ func (c *ClusterController) setupControllers( } nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, leafNodeSelector, c.RootClientset, c.LeafModelHandler) + nodeLeaseController.Options = c.Options if err := mgr.Add(nodeLeaseController); err != nil { return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err) } diff --git a/pkg/clustertree/cluster-manager/controllers/node_lease_controller.go b/pkg/clustertree/cluster-manager/controllers/node_lease_controller.go index 31eedd15c..ce6cd776b 100644 --- a/pkg/clustertree/cluster-manager/controllers/node_lease_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/node_lease_controller.go @@ -17,6 +17,7 @@ import ( "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options" kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" ) @@ -27,7 +28,8 @@ const ( DefaultLeaseDuration = 40 DefaultRenewIntervalFraction = 0.25 - DefaultNodeStatusUpdateInterval = 1 * time.Minute + //DefaultNodeStatusUpdateInterval = 1 * time.Minute + leafClusterupdateInterval = 10 * time.Second ) type NodeLeaseController struct { @@ -42,6 +44,9 @@ type NodeLeaseController struct { nodes []*corev1.Node LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector nodeLock sync.Mutex + Options *options.Options + // eventRecorder record.EventRecorder + // eventBroadcaster record.EventBroadcaster } func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController { @@ -53,7 +58,8 @@ func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, LeafModelHandler: LeafModelHandler, LeafNodeSelectors: LeafNodeSelectors, leaseInterval: getRenewInterval(), - statusInterval: DefaultNodeStatusUpdateInterval, + //statusInterval: DefaultNodeStatusUpdateInterval, + statusInterval: leafClusterupdateInterval, } return c } @@ -74,9 +80,18 @@ func (c *NodeLeaseController) syncNodeStatus(ctx context.Context) { } c.nodeLock.Unlock() - err := c.updateNodeStatus(ctx, nodes, c.LeafNodeSelectors) - if err != nil { - klog.Errorf(err.Error()) + klog.Infof("Starting to update node status to notready in root cluster.") + if c.Options.UpdateRootNodeStatusNotready { + err := c.updateNodeStatusAddNotready(ctx, nodes, c.LeafNodeSelectors) + if err != nil { + klog.Errorf("Could not update node status in root cluster,Error: %v", err) + } + } + if !c.Options.UpdateRootNodeStatusNotready { + err := c.updateNodeStatus(ctx, nodes, c.LeafNodeSelectors) + if err != nil { + klog.Errorf(err.Error()) + } } } @@ -89,6 +104,15 @@ func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1. return nil } +// nolint +func (c *NodeLeaseController) updateNodeStatusAddNotready(ctx context.Context, n []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error { + err := c.LeafModelHandler.UpdateRootNodeStatusAddNotready(ctx, n, leafNodeSelector) + if err != nil { + klog.Errorf("Could not update node status add notreadyin root cluster,Error: %v", err) + } + return nil +} + func (c *NodeLeaseController) syncLease(ctx context.Context) { nodes := make([]*corev1.Node, 0) c.nodeLock.Lock() diff --git a/pkg/clustertree/cluster-manager/controllers/node_resources_controller.go b/pkg/clustertree/cluster-manager/controllers/node_resources_controller.go index a396ddd0a..f28d92fa4 100644 --- a/pkg/clustertree/cluster-manager/controllers/node_resources_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/node_resources_controller.go @@ -128,7 +128,10 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci } clone := nodeInRoot.DeepCopy() - clone.Status.Conditions = utils.NodeConditions() + // When Clone.status.Conditions is empty, it is set to Utils.NodeConditions () + if len(clone.Status.Conditions) == 0 { + clone.Status.Conditions = utils.NodeConditions() + } // Node2Node mode should sync leaf node's labels and annotations to root nodeInRoot if c.LeafModelHandler.GetLeafMode() == leafUtils.Node { diff --git a/pkg/clustertree/cluster-manager/utils/leaf_cluster_condition_cache.go b/pkg/clustertree/cluster-manager/utils/leaf_cluster_condition_cache.go new file mode 100644 index 000000000..cf5923228 --- /dev/null +++ b/pkg/clustertree/cluster-manager/utils/leaf_cluster_condition_cache.go @@ -0,0 +1,94 @@ +package utils + +import ( + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" +) + +type clusterData struct { + // readyCondition is the last observed ready condition of the cluster. + readyCondition corev1.ConditionStatus + // thresholdStartTime is the time that the ready condition changed. + thresholdStartTime time.Time +} + +func (c *clusterConditionStore) thresholdAdjustedReadyCondition(cluster *kosmosv1alpha1.Cluster, nodeInRoot *corev1.Node, observedReadyConditions []corev1.NodeCondition, clusterFailureThreshold time.Duration, clusterSuccessThreshold time.Duration) []corev1.NodeCondition { + c.successThreshold = clusterSuccessThreshold + c.failureThreshold = clusterFailureThreshold + //Find the ready condition of the node todo: optimize the code format + observedReadyCondition := FindStatusCondition(observedReadyConditions) + if observedReadyCondition == nil { + return observedReadyConditions + } + //Get the current status of the cluster (rootnode) + curReadyConditions := nodeInRoot.Status.Conditions + curReadyCondition := FindStatusCondition(curReadyConditions) + //Get the saved data + saved := c.get(cluster.Name) + //Check whether it is a newly added cluster + if saved == nil { + // the cluster is just joined + c.update(cluster.Name, &clusterData{ + readyCondition: observedReadyCondition.Status, + }) + return observedReadyConditions + } + //Check if the state has changed + now := time.Now() + if saved.readyCondition != observedReadyCondition.Status { + // ready condition status changed, record the threshold start time + saved = &clusterData{ + readyCondition: observedReadyCondition.Status, + thresholdStartTime: now, + } + c.update(cluster.Name, saved) + } + //Setting time thresholds + var threshold time.Duration + if observedReadyCondition.Status == corev1.ConditionTrue { + threshold = c.successThreshold + } else { + threshold = c.failureThreshold + } + if ((observedReadyCondition.Status == corev1.ConditionTrue && curReadyCondition.Status != corev1.ConditionTrue) || + (observedReadyCondition.Status != corev1.ConditionTrue && curReadyCondition.Status == corev1.ConditionTrue)) && + now.Before(saved.thresholdStartTime.Add(threshold)) { + // retain old status until threshold exceeded to avoid network unstable problems. + return curReadyConditions + } + return observedReadyConditions +} + +// FindStatusCondition finds the conditionType in conditions. +func FindStatusCondition(conditions []corev1.NodeCondition) *corev1.NodeCondition { + for i := range conditions { + if conditions[i].Type == "Ready" { + return &conditions[i] + } + } + return nil +} + +func (c *clusterConditionStore) get(cluster string) *clusterData { + condition, ok := c.clusterDataMap.Load(cluster) + if !ok { + return nil + } + return condition.(*clusterData) +} + +func (c *clusterConditionStore) update(cluster string, data *clusterData) { + // ready condition status changed, record the threshold start time + c.clusterDataMap.Store(cluster, data) +} + +type clusterConditionStore struct { + clusterDataMap sync.Map + successThreshold time.Duration + // failureThreshold is the duration of failure for the cluster to be considered unhealthy. + failureThreshold time.Duration +} diff --git a/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go b/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go index 42c2be322..ea3c8f190 100644 --- a/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go +++ b/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go @@ -3,7 +3,9 @@ package utils import ( "context" "fmt" + "net/http" "reflect" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -12,6 +14,9 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -19,6 +24,12 @@ import ( "github.com/kosmos.io/kosmos/pkg/utils" ) +// to do: test time...... +const ( + clusterFailureThreshold = 30 * time.Minute + clusterSuccessThreshold = 5 * time.Second +) + // LeafModelHandler is the interface to handle the leafModel logic type LeafModelHandler interface { // GetLeafMode returns the leafMode for a Cluster @@ -33,6 +44,9 @@ type LeafModelHandler interface { // UpdateRootNodeStatus updates the node's status in root cluster UpdateRootNodeStatus(ctx context.Context, node []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error + //Add notready status upload part for one2cluster in UpdateRootNodeStatus + UpdateRootNodeStatusAddNotready(ctx context.Context, node []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error + // CreateRootNode creates the node in root cluster CreateRootNode(ctx context.Context, listenPort int32, gitVersion string) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) } @@ -43,17 +57,19 @@ type ClassificationHandler struct { Cluster *kosmosv1alpha1.Cluster //LeafClient client.Client //RootClient client.Client - RootClientset kubernetes.Interface - LeafClientset kubernetes.Interface + RootClientset kubernetes.Interface + LeafClientset kubernetes.Interface + clusterConditionCache clusterConditionStore + Recorder record.EventRecorder } // GetLeafMode returns the leafMode for a Cluster -func (h ClassificationHandler) GetLeafMode() LeafMode { +func (h *ClassificationHandler) GetLeafMode() LeafMode { return h.leafMode } // GetLeafNodes returns nodes in leaf cluster by the rootNode -func (h ClassificationHandler) GetLeafNodes(ctx context.Context, rootNode *corev1.Node, selector kosmosv1alpha1.NodeSelector) (nodesInLeaf *corev1.NodeList, err error) { +func (h *ClassificationHandler) GetLeafNodes(ctx context.Context, rootNode *corev1.Node, selector kosmosv1alpha1.NodeSelector) (nodesInLeaf *corev1.NodeList, err error) { listOption := metav1.ListOptions{} if h.leafMode == Party { listOption.LabelSelector = metav1.FormatLabelSelector(selector.LabelSelector) @@ -71,7 +87,7 @@ func (h ClassificationHandler) GetLeafNodes(ctx context.Context, rootNode *corev } // GetLeafPods returns pods in leaf cluster by the rootNode -func (h ClassificationHandler) GetLeafPods(ctx context.Context, rootNode *corev1.Node, selector kosmosv1alpha1.NodeSelector) (pods *corev1.PodList, err error) { +func (h *ClassificationHandler) GetLeafPods(ctx context.Context, rootNode *corev1.Node, selector kosmosv1alpha1.NodeSelector) (pods *corev1.PodList, err error) { if h.leafMode == Party { pods, err = h.LeafClientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) if err != nil { @@ -105,8 +121,7 @@ func (h ClassificationHandler) GetLeafPods(ctx context.Context, rootNode *corev1 return pods, nil } -// UpdateRootNodeStatus updates the node's status in root cluster -func (h ClassificationHandler) UpdateRootNodeStatus(ctx context.Context, nodesInRoot []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error { +func (h *ClassificationHandler) UpdateRootNodeStatus(ctx context.Context, nodesInRoot []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error { errors := []error{} for _, node := range nodesInRoot { nodeNameInRoot := node.Name @@ -157,11 +172,15 @@ func (h ClassificationHandler) UpdateRootNodeStatus(ctx context.Context, nodesIn if err != nil { return fmt.Errorf("could not list pod in leaf cluster while update the join node %s status, err: %v", nodeNameInRoot, err) } + nodeInRoot, err = h.RootClientset.CoreV1().Nodes().Get(ctx, nodeNameInRoot, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("cannot get node in root cluster while update the join node status %s, err: %v", nodeNameInRoot, err) + } + rootCopy.ResourceVersion = nodeInRoot.ResourceVersion clusterResources := utils.CalculateClusterResources(nodesInLeaf, pods) rootCopy.Status.Allocatable = clusterResources rootCopy.Status.Capacity = clusterResources } - updateAddress, err := GetAddress(ctx, h.RootClientset, nodesInLeaf.Items[0].Status.Addresses) if err != nil { return err @@ -171,6 +190,7 @@ func (h ClassificationHandler) UpdateRootNodeStatus(ctx context.Context, nodesIn if _, err = h.RootClientset.CoreV1().Nodes().UpdateStatus(ctx, rootCopy, metav1.UpdateOptions{}); err != nil { return err } + klog.Infof("Successfully updated status for rootNode %s ", nodeNameInRoot) if h.leafMode == Node { err := updateTaints(h.RootClientset, nodeInLeafTaints, rootCopy.Name) @@ -188,6 +208,119 @@ func (h ClassificationHandler) UpdateRootNodeStatus(ctx context.Context, nodesIn return utilerrors.NewAggregate(errors) } +// UpdateRootNodeStatus updates the node's status in root cluster +/*todo: This code detects a specific API server. If the current API server fails, we need to consider how to connect to other API servers. +todo:There are currently two solutions: (1) Add high availability mode +todo:(2) Use configmap to store the API server list and traverse it +todo:(3) Add the APIserverlist field in the cluster CRD, which needs to be entered manually during deployment.*/ +func (h *ClassificationHandler) UpdateRootNodeStatusAddNotready(ctx context.Context, nodesInRoot []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error { + errorses := []error{} + for _, node := range nodesInRoot { + nodeNameInRoot := node.Name + listOptions := metav1.ListOptions{} + if h.leafMode == Party { + selector, ok := leafNodeSelector[nodeNameInRoot] + if !ok { + klog.Warningf("have no nodeSelector for the join node: v%", nodeNameInRoot) + continue + } + listOptions.LabelSelector = metav1.FormatLabelSelector(selector.LabelSelector) + } else if h.leafMode == Node { + listOptions.FieldSelector = fmt.Sprintf("metadata.name=%s", nodeNameInRoot) + } + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + nodeInRoot, err := h.RootClientset.CoreV1().Nodes().Get(ctx, nodeNameInRoot, metav1.GetOptions{}) + if err != nil { + // TODO: If a node is accidentally deleted, recreate it + return fmt.Errorf("cannot get node in root cluster while update the join node status %s, err: %v", nodeNameInRoot, err) + } + rootCopy := nodeInRoot.DeepCopy() + //Check whether the leaf cluster is online + online := h.getClusterHealthStatus() + observedReadyConditons := getObservedReadyStatus(online) + //Ignore the impact of network delay or jitter for a period of time + readyConditions := h.clusterConditionCache.thresholdAdjustedReadyCondition(h.Cluster, nodeInRoot, observedReadyConditons, clusterFailureThreshold, clusterSuccessThreshold) + readyCondition := FindStatusCondition(readyConditions) + //If it is still not online after retrying for a while, change noderoot status to notready + if !online && readyCondition.Status != corev1.ConditionTrue { + klog.V(2).Infof("Cluster(%s) still offline after %s, ensuring offline is set.", h.Cluster.Name, h.clusterConditionCache.failureThreshold) + // Send the corresponding event, indicating that the node status is not ready due to cluster offline update + h.Recorder.Eventf(h.Cluster, corev1.EventTypeNormal, "NodeStatusSetToNotReady", "Node %s status set to notready due to cluster %s being offline.", nodeNameInRoot, h.Cluster.Name) + rootCopy.Status.Conditions = readyConditions + if _, err = h.RootClientset.CoreV1().Nodes().UpdateStatus(ctx, rootCopy, metav1.UpdateOptions{}); err != nil { + return err + } + return nil + } + nodesInLeaf, err := h.LeafClientset.CoreV1().Nodes().List(ctx, listOptions) + if err != nil { + // TODO: If a node is accidentally deleted, recreate it + return fmt.Errorf("cannot get node in leaf cluster while update the join node %s status, err: %v", nodeNameInRoot, err) + } + if len(nodesInLeaf.Items) == 0 { + // TODO: If a node is accidentally deleted, recreate it + return fmt.Errorf("have no node in leaf cluster while update the join node %s status", nodeNameInRoot) + } + var nodeInLeafTaints []corev1.Taint + if h.leafMode == Node { + rootCopy.Status = *nodesInLeaf.Items[0].Status.DeepCopy() + nodeInLeafTaints = append(nodesInLeaf.Items[0].Spec.Taints, corev1.Taint{ + Key: utils.KosmosNodeTaintKey, + Value: utils.KosmosNodeTaintValue, + Effect: utils.KosmosNodeTaintEffect, + }) + } else { + //Check whether all subcluster master nodes are ready. If all are notready, update the rootnode status to notready + leafMasterReadyConditon := h.checkAllMasterNodesNotReady(ctx) + rootCopy.Status.Conditions = leafMasterReadyConditon + //Aggregation the resources of the leaf nodes + pods, err := h.GetLeafPods(ctx, rootCopy, leafNodeSelector[nodeNameInRoot]) + if err != nil { + return fmt.Errorf("could not list pod in leaf cluster while update the join node %s status, err: %v", nodeNameInRoot, err) + } + nodeInRoot, err = h.RootClientset.CoreV1().Nodes().Get(ctx, nodeNameInRoot, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("cannot get node in root cluster while update the join node status %s, err: %v", nodeNameInRoot, err) + } + rootCopy.ResourceVersion = nodeInRoot.ResourceVersion + clusterResources := utils.CalculateClusterResources(nodesInLeaf, pods) + rootCopy.Status.Allocatable = clusterResources + rootCopy.Status.Capacity = clusterResources + } + updateAddress, err := GetAddress(ctx, h.RootClientset, nodesInLeaf.Items[0].Status.Addresses) + if err != nil { + return err + } + rootCopy.Status.Addresses = updateAddress + + // klog.Infof("rootCopy.Status.Conditions is %s ", rootCopy.Status.Conditions) + if _, err = h.RootClientset.CoreV1().Nodes().UpdateStatus(ctx, rootCopy, metav1.UpdateOptions{}); err != nil { + if errors.IsConflict(err) { + klog.Warningf("Conflict detected while updating status for rootNode %s, retrying...", nodeNameInRoot) + } else { + klog.Errorf("Failed to update status for rootNode %s: %v", nodeNameInRoot, err) + } + return err + } + klog.Infof("Successfully updated status for rootNode %s ", nodeNameInRoot) + + if h.leafMode == Node { + err := updateTaints(h.RootClientset, nodeInLeafTaints, rootCopy.Name) + if err != nil { + return fmt.Errorf("update taints failed: %v", err) + } + } + + return nil + }) + if err != nil { + errorses = append(errorses, err) + } + } + return utilerrors.NewAggregate(errorses) +} + func updateTaints(client kubernetes.Interface, taints []corev1.Taint, nodeName string) error { node := corev1.Node{ Spec: corev1.NodeSpec{ @@ -239,7 +372,7 @@ func createNode(ctx context.Context, clientset kubernetes.Interface, clusterName } // CreateRootNode creates the node in root cluster -func (h ClassificationHandler) CreateRootNode(ctx context.Context, listenPort int32, gitVersion string) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) { +func (h *ClassificationHandler) CreateRootNode(ctx context.Context, listenPort int32, gitVersion string) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) { nodes := make([]*corev1.Node, 0) leafNodeSelectors := make(map[string]kosmosv1alpha1.NodeSelector) cluster := h.Cluster @@ -282,11 +415,17 @@ func (h ClassificationHandler) CreateRootNode(ctx context.Context, listenPort in // NewLeafModelHandler create a LeafModelHandler for Cluster func NewLeafModelHandler(cluster *kosmosv1alpha1.Cluster, rootClientset, leafClientset kubernetes.Interface) LeafModelHandler { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rootClientset.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "classification-handler"}) + classificationModel := &ClassificationHandler{ leafMode: ALL, Cluster: cluster, RootClientset: rootClientset, LeafClientset: leafClientset, + Recorder: recorder, } leafModels := cluster.Spec.ClusterTreeOptions.LeafModels @@ -301,3 +440,74 @@ func NewLeafModelHandler(cluster *kosmosv1alpha1.Cluster, rootClientset, leafCli } return classificationModel } + +// Perform a health check on the API server +func (h *ClassificationHandler) getClusterHealthStatus() (online bool) { + var healthStatus int + resp := h.LeafClientset.Discovery().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&healthStatus) + if resp.Error() != nil { + klog.Errorf("Health check failed.Current health status:%v, error is : %v ", healthStatus, resp.Error()) + } + if healthStatus != http.StatusOK { + klog.Warningf("Member cluster %v isn't healthy", h.Cluster.Name) + return false + } + return true +} + +// Returns the node status based on the health check results +func getObservedReadyStatus(online bool) []corev1.NodeCondition { + if !online { + return []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + Reason: "ClusterNotReachable", + Message: "cluster is not reachable.", + }, + } + } + return []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + Reason: "ClusterReady", + Message: "cluster is online and ready to accept workloads.", + }, + } +} +func (h *ClassificationHandler) checkAllMasterNodesNotReady(ctx context.Context) []corev1.NodeCondition { + klog.Infof("Starting to check if all master nodes in the leaf cluster are not ready.") + //filter out nodes with the LabelNodeRoleOldControlPlane or LabelNodeRoleControlPlane label + //todo:Check whether the tags are correct. + nodes, err := h.LeafClientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: utils.LabelNodeRoleControlPlane}) + if err != nil { + klog.Errorf("Error getting master nodes in leaf cluster: %v", err) + } + allMasterNotReady := true + for _, node := range nodes.Items { + masterNodeReady := false + for _, condition := range node.Status.Conditions { + if condition.Type == "Ready" && condition.Status == "True" { + masterNodeReady = true + break + } + } + if masterNodeReady { + allMasterNotReady = false + } + } + if allMasterNotReady { + klog.Warningf("All master nodes in the leaf cluster are not ready.") + h.Recorder.Eventf(h.Cluster, corev1.EventTypeWarning, "AllLeafMasterNodesNotReady", "All leaf cluster's master nodes are not ready.") + return []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + Reason: "LeafNodesNotReady", + Message: "All leaf cluster‘s master nodes are not ready.", + }, + } + } + return utils.NodeConditions() +}