Skip to content

Commit

Permalink
fadadfaf
Browse files Browse the repository at this point in the history
Signed-off-by: duanmengkk <[email protected]>
  • Loading branch information
duanmengkk committed Dec 24, 2023
1 parent be7f9fe commit 23bb1d7
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 65 deletions.
2 changes: 2 additions & 0 deletions cmd/clustertree/cluster-manager/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ func run(ctx context.Context, opts *options.Options) error {
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
Logger: mgr.GetLogger(),
ReservedNamespaces: opts.ReservedNamespaces,
RateLimiterOptions: opts.RateLimiterOpts,
BackoffOptions: opts.BackoffOpts,
}
if err = ServiceExportController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", mcs.ServiceExportControllerName, err)
Expand Down
8 changes: 8 additions & 0 deletions cmd/clustertree/cluster-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/component-base/config/options"
componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1"

"github.com/kosmos.io/kosmos/pkg/utils/flags"
)

const (
Expand Down Expand Up @@ -40,6 +42,10 @@ type Options struct {

// ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources
ReservedNamespaces []string

RateLimiterOpts flags.Options

BackoffOpts flags.BackoffOptions
}

type KubernetesOptions struct {
Expand Down Expand Up @@ -82,5 +88,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.OnewayStorageControllers, "oneway-storage-controllers", false, "Turn on or off oneway storage controllers.")
flags.StringSliceVar(&o.AutoCreateMCSPrefix, "auto-mcs-prefix", []string{}, "The prefix of namespace for service to auto create mcs resources")
flags.StringSliceVar(&o.ReservedNamespaces, "reserved-namespaces", []string{"kube-system"}, "The namespaces protected by Kosmos that the controller-manager will skip.")
o.RateLimiterOpts.AddFlags(flags)
o.BackoffOpts.AddFlags(flags)
options.BindLeaderElectionFlags(&o.LeaderElection, flags)
}
1 change: 1 addition & 0 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (c *ClusterController) setupControllers(
IPFamilyType: cluster.Spec.ClusterLinkOptions.IPFamily,
RootResourceManager: c.RootResourceManager,
ReservedNamespaces: c.Options.ReservedNamespaces,
BackoffOptions: c.Options.BackoffOpts,
}
if err := serviceImportController.AddController(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", mcs.LeafServiceImportControllerName, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (c *AutoCreateMCSController) Reconcile(ctx context.Context, request reconci
// The service is being deleted, in which case we should clear serviceExport and serviceImport.
if shouldDelete || !service.DeletionTimestamp.IsZero() {
if err := c.cleanUpMcsResources(request.Namespace, request.Name, clusterList); err != nil {
klog.Errorf("Cleanup MCS resources failed, err: %v", err)
return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}
return controllerruntime.Result{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mcs

import (
"context"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -16,6 +17,7 @@ import (
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -26,6 +28,7 @@ import (
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

"github.com/kosmos.io/kosmos/pkg/utils"
"github.com/kosmos.io/kosmos/pkg/utils/flags"
"github.com/kosmos.io/kosmos/pkg/utils/helper"
)

Expand All @@ -38,6 +41,8 @@ type ServiceExportController struct {
Logger logr.Logger
// ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources
ReservedNamespaces []string
RateLimiterOptions flags.Options
BackoffOptions flags.BackoffOptions
}

func (c *ServiceExportController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand All @@ -51,21 +56,27 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, request reconci
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}

if err := c.removeAnnotation(request.Namespace, request.Name); err != nil {
klog.Errorf("Remove serviceExport (%s/%s)'s annotation failed, Error: %v", serviceExport.Namespace, serviceExport.Name, err)
return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}
klog.Errorf("Get serviceExport (%s/%s)'s failed, Error: %v", serviceExport.Namespace, serviceExport.Name, err)
return controllerruntime.Result{Requeue: true}, err
}

// The serviceExport is being deleted, in which case we should clear endpointSlice.
if !serviceExport.DeletionTimestamp.IsZero() {
if err := c.removeAnnotation(request.Namespace, request.Name); err != nil {
return controllerruntime.Result{Requeue: true}, err
klog.Errorf("Remove serviceExport (%s/%s)'s annotation failed, Error: %v", serviceExport.Namespace, serviceExport.Name, err)
return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}
return c.removeFinalizer(serviceExport)
}

err := c.syncServiceExport(serviceExport)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
klog.Errorf("Sync serviceExport (%s/%s) failed, Error: %v", serviceExport.Namespace, serviceExport.Name, err)
return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}
return c.ensureFinalizer(serviceExport)
}
Expand Down Expand Up @@ -107,6 +118,10 @@ func (c *ServiceExportController) SetupWithManager(mgr manager.Manager) error {
handler.EnqueueRequestsFromMapFunc(endpointSliceServiceExportFn),
endpointSlicePredicate,
).
WithOptions(controller.Options{
RateLimiter: flags.DefaultControllerRateLimiter(c.RateLimiterOptions),
MaxConcurrentReconciles: 2,
}).
Complete(c)
}

Expand Down Expand Up @@ -148,8 +163,10 @@ func (c *ServiceExportController) removeAnnotation(namespace, name string) error
klog.V(4).Infof("EndpointSlice %s/%s is deleting and does not need to remove serviceExport annotation", namespace, newEps.Name)
continue
}
helper.RemoveAnnotation(newEps, utils.ServiceExportLabelKey)
err = c.updateEndpointSlice(newEps, c.RootClient)

err = c.updateEndpointSlice(newEps, c.RootClient, func(eps *discoveryv1.EndpointSlice) {
helper.RemoveAnnotation(eps, utils.ServiceExportLabelKey)
})
if err != nil {
klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", namespace, newEps.Name, err)
return err
Expand All @@ -158,24 +175,44 @@ func (c *ServiceExportController) removeAnnotation(namespace, name string) error
return nil
}

func (c *ServiceExportController) updateEndpointSlice(eps *discoveryv1.EndpointSlice, rootClient client.Client) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
// nolint:dupl
func (c *ServiceExportController) updateEndpointSlice(eps *discoveryv1.EndpointSlice, rootClient client.Client, modifyEps func(eps *discoveryv1.EndpointSlice)) error {
return retry.RetryOnConflict(flags.DefaultUpdateRetryBackoff(c.BackoffOptions), func() error {
modifyEps(eps)
updateErr := rootClient.Update(context.TODO(), eps)
if updateErr == nil {
return nil
}

klog.V(5).Infof("Failed to update endpointSlice %s/%s: %v", eps.Namespace, eps.Name, updateErr)
newEps := &discoveryv1.EndpointSlice{}
key := types.NamespacedName{
Namespace: eps.Namespace,
Name: eps.Name,
}
getErr := rootClient.Get(context.TODO(), key, newEps)
if getErr == nil || apierrors.IsNotFound(getErr) {
getErr := rootClient.Get(context.TODO(), client.ObjectKey{Namespace: eps.Namespace, Name: eps.Name}, newEps)
if getErr == nil {
//Make a copy, so we don't mutate the shared cache
eps = newEps.DeepCopy()
} else {
klog.Errorf("Failed to get updated endpointSlice %s/%s: %v", eps.Namespace, eps.Name, getErr)
klog.V(5).Infof("Failed to get updated endpointSlice %s/%s: %v", eps.Namespace, eps.Name, getErr)
}

return updateErr
})
}

// nolint:dupl
func (c *ServiceExportController) updateServiceExport(export *mcsv1alpha1.ServiceExport, rootClient client.Client, modifyExport func(export *mcsv1alpha1.ServiceExport)) error {
return retry.RetryOnConflict(flags.DefaultUpdateRetryBackoff(c.BackoffOptions), func() error {
modifyExport(export)
updateErr := rootClient.Update(context.TODO(), export)
if updateErr == nil {
return nil
}
klog.V(5).Infof("Failed to update serviceExport %s/%s: %v", export.Namespace, export.Name, updateErr)
newExport := &mcsv1alpha1.ServiceExport{}
getErr := rootClient.Get(context.TODO(), client.ObjectKey{Namespace: export.Namespace, Name: export.Name}, newExport)
if getErr == nil {
//Make a copy, so we don't mutate the shared cache
export = newExport.DeepCopy()
} else {
klog.V(5).Infof("Failed to get serviceExport %s/%s: %v", export.Namespace, export.Name, getErr)
}

return updateErr
Expand Down Expand Up @@ -206,8 +243,10 @@ func (c *ServiceExportController) syncServiceExport(export *mcsv1alpha1.ServiceE
klog.V(4).Infof("EndpointSlice %s/%s is deleting and does not need to remove serviceExport annotation", export.Namespace, newEps.Name)
continue
}
helper.AddEndpointSliceAnnotation(newEps, utils.ServiceExportLabelKey, utils.MCSLabelValue)
err = c.updateEndpointSlice(newEps, c.RootClient)

err = c.updateEndpointSlice(newEps, c.RootClient, func(eps *discoveryv1.EndpointSlice) {
helper.AddEndpointSliceAnnotation(eps, utils.ServiceExportLabelKey, utils.MCSLabelValue)
})
if err != nil {
klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", export.Namespace, newEps.Name, err)
return err
Expand All @@ -223,10 +262,12 @@ func (c *ServiceExportController) ensureFinalizer(export *mcsv1alpha1.ServiceExp
return controllerruntime.Result{}, nil
}

controllerutil.AddFinalizer(export, utils.MCSFinalizer)
err := c.RootClient.Update(context.TODO(), export)
err := c.updateServiceExport(export, c.RootClient, func(export *mcsv1alpha1.ServiceExport) {
controllerutil.AddFinalizer(export, utils.MCSFinalizer)
})
if err != nil {
return controllerruntime.Result{Requeue: true}, err
klog.Errorf("Update serviceExport (%s/%s) failed, Error: %v", export.Namespace, export.Name, err)
return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}

return controllerruntime.Result{}, nil
Expand All @@ -237,10 +278,12 @@ func (c *ServiceExportController) removeFinalizer(export *mcsv1alpha1.ServiceExp
return controllerruntime.Result{}, nil
}

controllerutil.RemoveFinalizer(export, utils.MCSFinalizer)
err := c.RootClient.Update(context.TODO(), export)
err := c.updateServiceExport(export, c.RootClient, func(export *mcsv1alpha1.ServiceExport) {
controllerutil.RemoveFinalizer(export, utils.MCSFinalizer)
})
if err != nil {
return controllerruntime.Result{Requeue: true}, err
klog.Errorf("Update serviceExport %s/%s's finalizer failed, Error: %v", export.Namespace, export.Name, err)
return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err
}

return controllerruntime.Result{}, nil
Expand Down
Loading

0 comments on commit 23bb1d7

Please sign in to comment.