diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go index a454111ef..4b4153303 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go @@ -211,7 +211,7 @@ func (c *AutoCreateMCSController) SetupWithManager(mgr manager.Manager) error { func (c *AutoCreateMCSController) cleanUpMcsResources(ctx context.Context, namespace string, name string, clusterList *kosmosv1alpha1.ClusterList) error { // delete serviceExport in root cluster - if err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Delete serviceExport in root cluster failed %s/%s, Error: %v", namespace, name, err) return err @@ -229,7 +229,7 @@ func (c *AutoCreateMCSController) cleanUpMcsResources(ctx context.Context, names klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err) return err } - if err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Delete serviceImport in leaf cluster failed %s/%s, Error: %v", namespace, name, err) return err @@ -266,6 +266,14 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err) return err } + + if err = c.createNamespace(leafManager.Client, service.Namespace); err != nil { + if !apierrors.IsAlreadyExists(err) { + klog.Errorf("Create namespace %s in leaf cluster failed, Error: %v", service.Namespace, err) + return err + } + } + serviceImport := &mcsv1alpha1.ServiceImport{ ObjectMeta: metav1.ObjectMeta{ Name: service.Name, @@ -290,3 +298,16 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se } return nil } + +func (c *AutoCreateMCSController) createNamespace(client client.Client, namespace string) error { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + } + err := client.Create(context.TODO(), ns) + if err != nil { + return err + } + return nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go index ad17ecc2e..d8da08d9f 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go @@ -16,6 +16,7 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -45,28 +46,28 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, request reconci klog.V(4).Infof("============ %s has been reconciled =============", request.NamespacedName.String()) }() - var shouldDelete bool serviceExport := &mcsv1alpha1.ServiceExport{} if err := c.RootClient.Get(ctx, request.NamespacedName, serviceExport); err != nil { - if !apierrors.IsNotFound(err) { - return controllerruntime.Result{Requeue: true}, err + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil } - shouldDelete = true + + return controllerruntime.Result{Requeue: true}, err } // The serviceExport is being deleted, in which case we should clear endpointSlice. - if shouldDelete || !serviceExport.DeletionTimestamp.IsZero() { - if err := c.removeAnnotation(ctx, request.Namespace, request.Name); err != nil { + if !serviceExport.DeletionTimestamp.IsZero() { + if err := c.removeAnnotation(request.Namespace, request.Name); err != nil { return controllerruntime.Result{Requeue: true}, err } - return controllerruntime.Result{}, nil + return c.removeFinalizer(serviceExport) } err := c.syncServiceExport(ctx, serviceExport) if err != nil { return controllerruntime.Result{Requeue: true}, err } - return controllerruntime.Result{}, nil + return c.ensureFinalizer(serviceExport) } func (c *ServiceExportController) SetupWithManager(mgr manager.Manager) error { @@ -122,7 +123,7 @@ func (c *ServiceExportController) shouldEnqueue(object client.Object) bool { return true } -func (c *ServiceExportController) removeAnnotation(ctx context.Context, namespace, name string) error { +func (c *ServiceExportController) removeAnnotation(namespace, name string) error { var err error selector := labels.SelectorFromSet( map[string]string{ @@ -130,10 +131,11 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, namespac }, ) epsList := &discoveryv1.EndpointSliceList{} - err = c.RootClient.List(ctx, epsList, &client.ListOptions{ + err = c.RootClient.List(context.TODO(), epsList, &client.ListOptions{ Namespace: namespace, LabelSelector: selector, }) + if err != nil { klog.Errorf("List endpointSlice in %s failed, Error: %v", namespace, err) return err @@ -147,7 +149,7 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, namespac continue } helper.RemoveAnnotation(newEps, utils.ServiceExportLabelKey) - err = c.updateEndpointSlice(ctx, newEps, c.RootClient) + err = c.updateEndpointSlice(newEps, c.RootClient) if err != nil { klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", namespace, newEps.Name, err) return err @@ -156,9 +158,9 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, namespac return nil } -func (c *ServiceExportController) updateEndpointSlice(ctx context.Context, eps *discoveryv1.EndpointSlice, rootClient client.Client) error { +func (c *ServiceExportController) updateEndpointSlice(eps *discoveryv1.EndpointSlice, rootClient client.Client) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { - updateErr := rootClient.Update(ctx, eps) + updateErr := rootClient.Update(context.TODO(), eps) if updateErr == nil { return nil } @@ -168,8 +170,8 @@ func (c *ServiceExportController) updateEndpointSlice(ctx context.Context, eps * Namespace: eps.Namespace, Name: eps.Name, } - getErr := rootClient.Get(ctx, key, newEps) - if getErr == nil { + getErr := rootClient.Get(context.TODO(), key, newEps) + if getErr == nil || apierrors.IsNotFound(getErr) { //Make a copy, so we don't mutate the shared cache eps = newEps.DeepCopy() } else { @@ -205,7 +207,7 @@ func (c *ServiceExportController) syncServiceExport(ctx context.Context, export continue } helper.AddEndpointSliceAnnotation(newEps, utils.ServiceExportLabelKey, utils.MCSLabelValue) - err = c.updateEndpointSlice(ctx, newEps, c.RootClient) + err = c.updateEndpointSlice(newEps, c.RootClient) if err != nil { klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", export.Namespace, newEps.Name, err) return err @@ -215,3 +217,31 @@ func (c *ServiceExportController) syncServiceExport(ctx context.Context, export c.EventRecorder.Event(export, corev1.EventTypeNormal, "Synced", "serviceExport has been synced to endpointSlice's annotation successfully") return nil } + +func (c *ServiceExportController) ensureFinalizer(export *mcsv1alpha1.ServiceExport) (reconcile.Result, error) { + if controllerutil.ContainsFinalizer(export, utils.MCSFinalizer) { + return controllerruntime.Result{}, nil + } + + controllerutil.AddFinalizer(export, utils.MCSFinalizer) + err := c.RootClient.Update(context.TODO(), export) + if err != nil { + return controllerruntime.Result{Requeue: true}, err + } + + return controllerruntime.Result{}, nil +} + +func (c *ServiceExportController) removeFinalizer(export *mcsv1alpha1.ServiceExport) (reconcile.Result, error) { + if !controllerutil.ContainsFinalizer(export, utils.MCSFinalizer) { + return controllerruntime.Result{}, nil + } + + controllerutil.RemoveFinalizer(export, utils.MCSFinalizer) + err := c.RootClient.Update(context.TODO(), export) + if err != nil { + return controllerruntime.Result{Requeue: true}, err + } + + return controllerruntime.Result{}, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go index c3d22174a..7a53042bc 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go @@ -19,6 +19,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/strings/slices" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" @@ -42,7 +43,6 @@ type ServiceImportController struct { Logger logr.Logger processor utils.AsyncWorker RootResourceManager *utils.ResourceManager - ctx context.Context // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources ReservedNamespaces []string } @@ -67,7 +67,6 @@ func (c *ServiceImportController) Start(ctx context.Context) error { ReconcileFunc: c.Reconcile, } c.processor = utils.NewAsyncWorker(opt) - c.ctx = ctx serviceImportInformerFactory := externalversions.NewSharedInformerFactory(c.RootKosmosClient, 0) serviceImportInformer := serviceImportInformerFactory.Multicluster().V1alpha1().ServiceImports() @@ -109,34 +108,34 @@ func (c *ServiceImportController) Reconcile(key utils.QueueKey) error { klog.V(4).Infof("============ %s has been reconciled in cluster %s =============", clusterWideKey.NamespaceKey(), c.LeafNodeName) }() - var shouldDelete bool serviceImport := &mcsv1alpha1.ServiceImport{} - if err := c.LeafClient.Get(c.ctx, types.NamespacedName{Namespace: clusterWideKey.Namespace, Name: clusterWideKey.Name}, serviceImport); err != nil { - if !apierrors.IsNotFound(err) { - klog.Errorf("Get %s in cluster %s failed, Error: %v", clusterWideKey.NamespaceKey(), c.LeafNodeName, err) - return err + if err := c.LeafClient.Get(context.TODO(), types.NamespacedName{Namespace: clusterWideKey.Namespace, Name: clusterWideKey.Name}, serviceImport); err != nil { + if apierrors.IsNotFound(err) { + return nil } - shouldDelete = true + + klog.Errorf("Get %s in cluster %s failed, Error: %v", clusterWideKey.NamespaceKey(), c.LeafNodeName, err) + return err } // The serviceImport is being deleted, in which case we should clear endpointSlice. - if shouldDelete || !serviceImport.DeletionTimestamp.IsZero() { - if err := c.cleanupServiceAndEndpointSlice(c.ctx, clusterWideKey.Namespace, clusterWideKey.Name); err != nil { + if !serviceImport.DeletionTimestamp.IsZero() { + if err := c.cleanupServiceAndEndpointSlice(clusterWideKey.Namespace, clusterWideKey.Name); err != nil { return err } - return nil + return c.removeFinalizer(serviceImport) } - err := c.syncServiceImport(c.ctx, serviceImport) + err := c.syncServiceImport(serviceImport) if err != nil { return err } - return nil + return c.ensureFinalizer(serviceImport) } -func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Context, namespace, name string) error { +func (c *ServiceImportController) cleanupServiceAndEndpointSlice(namespace, name string) error { service := &corev1.Service{} - if err := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, service); err != nil { + if err := c.LeafClient.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, service); err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("ServiceImport %s/%s is deleting and Service %s/%s is not found, ignore it", namespace, name, namespace, name) return nil @@ -150,7 +149,7 @@ func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Con return nil } - if err := c.LeafClient.Delete(ctx, service); err != nil { + if err := c.LeafClient.Delete(context.TODO(), service); err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("ServiceImport %s/%s is deleting and Service %s/%s is not found, ignore it", namespace, name, namespace, name) return nil @@ -160,7 +159,7 @@ func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Con } endpointSlice := &discoveryv1.EndpointSlice{} - err := c.LeafClient.DeleteAllOf(ctx, endpointSlice, &client.DeleteAllOfOptions{ + err := c.LeafClient.DeleteAllOf(context.TODO(), endpointSlice, &client.DeleteAllOfOptions{ ListOptions: client.ListOptions{ Namespace: namespace, LabelSelector: labels.SelectorFromSet(map[string]string{ @@ -179,7 +178,7 @@ func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Con return nil } -func (c *ServiceImportController) syncServiceImport(ctx context.Context, serviceImport *mcsv1alpha1.ServiceImport) error { +func (c *ServiceImportController) syncServiceImport(serviceImport *mcsv1alpha1.ServiceImport) error { rootService, err := c.RootResourceManager.ServiceLister.Services(serviceImport.Namespace).Get(serviceImport.Name) if err != nil { if apierrors.IsNotFound(err) { @@ -190,7 +189,7 @@ func (c *ServiceImportController) syncServiceImport(ctx context.Context, service return err } - if err := c.importServiceHandler(ctx, rootService, serviceImport); err != nil { + if err := c.importServiceHandler(context.TODO(), rootService, serviceImport); err != nil { klog.Errorf("Create or update service %s/%s in client cluster %s failed, error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) return err } @@ -210,7 +209,7 @@ func (c *ServiceImportController) syncServiceImport(ctx context.Context, service addresses = append(addresses, newAddress) } } - err = c.importEndpointSliceHandler(ctx, epsCopy, serviceImport) + err = c.importEndpointSliceHandler(epsCopy, serviceImport) if err != nil { klog.Errorf("Create or update service %s/%s in client cluster failed, error: %v", serviceImport.Namespace, serviceImport.Name, err) return err @@ -219,7 +218,7 @@ func (c *ServiceImportController) syncServiceImport(ctx context.Context, service addressString := strings.Join(addresses, ",") helper.AddServiceImportAnnotation(serviceImport, utils.ServiceEndpointsKey, addressString) - if err = c.updateServiceImport(ctx, serviceImport, addressString); err != nil { + if err = c.updateServiceImport(serviceImport, addressString); err != nil { klog.Errorf("Update serviceImport (%s/%s) annotation in cluster %s failed, Error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) return err } @@ -228,7 +227,7 @@ func (c *ServiceImportController) syncServiceImport(ctx context.Context, service return nil } -func (c *ServiceImportController) importEndpointSliceHandler(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice, serviceImport *mcsv1alpha1.ServiceImport) error { +func (c *ServiceImportController) importEndpointSliceHandler(endpointSlice *discoveryv1.EndpointSlice, serviceImport *mcsv1alpha1.ServiceImport) error { if metav1.HasAnnotation(serviceImport.ObjectMeta, utils.DisconnectedEndpointsKey) { annotationValue := helper.GetLabelOrAnnotationValue(serviceImport.Annotations, utils.DisconnectedEndpointsKey) disConnectedAddress := strings.Split(annotationValue, ",") @@ -241,15 +240,15 @@ func (c *ServiceImportController) importEndpointSliceHandler(ctx context.Context return nil } - return c.createOrUpdateEndpointSliceInClient(ctx, endpointSlice, serviceImport.Name) + return c.createOrUpdateEndpointSliceInClient(endpointSlice, serviceImport.Name) } -func (c *ServiceImportController) createOrUpdateEndpointSliceInClient(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice, serviceName string) error { +func (c *ServiceImportController) createOrUpdateEndpointSliceInClient(endpointSlice *discoveryv1.EndpointSlice, serviceName string) error { newSlice := retainEndpointSlice(endpointSlice, serviceName) - if err := c.LeafClient.Create(ctx, newSlice); err != nil { + if err := c.LeafClient.Create(context.TODO(), newSlice); err != nil { if apierrors.IsAlreadyExists(err) { - err = c.updateEndpointSlice(ctx, newSlice) + err = c.updateEndpointSlice(newSlice) if err != nil { klog.Errorf("Update endpointSlice(%s/%s) in cluster %s failed, Error: %v", newSlice.Namespace, newSlice.Name, c.LeafNodeName, err) return err @@ -262,16 +261,16 @@ func (c *ServiceImportController) createOrUpdateEndpointSliceInClient(ctx contex return nil } -func (c *ServiceImportController) updateEndpointSlice(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice) error { +func (c *ServiceImportController) updateEndpointSlice(endpointSlice *discoveryv1.EndpointSlice) error { newEps := endpointSlice.DeepCopy() return retry.RetryOnConflict(retry.DefaultRetry, func() error { - updateErr := c.LeafClient.Update(ctx, newEps) + updateErr := c.LeafClient.Update(context.TODO(), newEps) if updateErr == nil { return nil } updated := &discoveryv1.EndpointSlice{} - getErr := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: newEps.Namespace, Name: newEps.Name}, updated) + getErr := c.LeafClient.Get(context.TODO(), types.NamespacedName{Namespace: newEps.Namespace, Name: newEps.Name}, updated) if getErr == nil { //Make a copy, so we don't mutate the shared cache newEps = updated.DeepCopy() @@ -359,15 +358,15 @@ func (c *ServiceImportController) createOrUpdateServiceInClient(ctx context.Cont return nil } -func (c *ServiceImportController) updateServiceImport(ctx context.Context, serviceImport *mcsv1alpha1.ServiceImport, addresses string) error { +func (c *ServiceImportController) updateServiceImport(serviceImport *mcsv1alpha1.ServiceImport, addresses string) error { newImport := serviceImport.DeepCopy() return retry.RetryOnConflict(retry.DefaultRetry, func() error { - updateErr := c.LeafClient.Update(ctx, newImport) + updateErr := c.LeafClient.Update(context.TODO(), newImport) if updateErr == nil { return nil } updated := &mcsv1alpha1.ServiceImport{} - getErr := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: newImport.Namespace, Name: newImport.Name}, updated) + getErr := c.LeafClient.Get(context.TODO(), types.NamespacedName{Namespace: newImport.Namespace, Name: newImport.Name}, updated) if getErr == nil { // Make a copy, so we don't mutate the shared cache newImport = updated.DeepCopy() @@ -509,6 +508,34 @@ func (c *ServiceImportController) checkServiceType(service *corev1.Service) erro return nil } +func (c *ServiceImportController) removeFinalizer(serviceImport *mcsv1alpha1.ServiceImport) error { + if !controllerutil.ContainsFinalizer(serviceImport, utils.MCSFinalizer) { + return nil + } + + controllerutil.RemoveFinalizer(serviceImport, utils.MCSFinalizer) + err := c.LeafClient.Update(context.TODO(), serviceImport) + if err != nil { + return err + } + + return nil +} + +func (c *ServiceImportController) ensureFinalizer(serviceImport *mcsv1alpha1.ServiceImport) error { + if controllerutil.ContainsFinalizer(serviceImport, utils.MCSFinalizer) { + return nil + } + + controllerutil.AddFinalizer(serviceImport, utils.MCSFinalizer) + err := c.LeafClient.Update(context.TODO(), serviceImport) + if err != nil { + return err + } + + return nil +} + func isServiceIPSet(service *corev1.Service) bool { return service.Spec.ClusterIP != corev1.ClusterIPNone && service.Spec.ClusterIP != "" } diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index da08a63db..89649fdb7 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -83,8 +83,6 @@ const ( EnvNodeName = "NODE_NAME" ) -const ClusterStartControllerFinalizer = "kosmos.io/cluster-start-finazlizer" - // mcs const ( ServiceKey = "kubernetes.io/service-name" @@ -162,6 +160,12 @@ const ( MasterRooTCAName = "master-root-ca.crt" ) +// finalizers +const ( + ClusterStartControllerFinalizer = "kosmos.io/cluster-start-finalizer" + MCSFinalizer = "kosmos.io/multi-cluster-service-finalizer" +) + var GVR_CONFIGMAP = schema.GroupVersionResource{ Group: "", Version: "v1",