diff --git a/pkg/yurtmanager/controller/raven/gatewayinternalservice/gateway_internal_service_controller.go b/pkg/yurtmanager/controller/raven/gatewayinternalservice/gateway_internal_service_controller.go index bc25e4bf0a8..c200b52d3ea 100644 --- a/pkg/yurtmanager/controller/raven/gatewayinternalservice/gateway_internal_service_controller.go +++ b/pkg/yurtmanager/controller/raven/gatewayinternalservice/gateway_internal_service_controller.go @@ -71,7 +71,6 @@ type ReconcileService struct { client.Client scheme *runtime.Scheme recorder record.EventRecorder - option util.Option } // newReconciler returns a new reconcile.Reconciler @@ -80,7 +79,6 @@ func newReconciler(c *appconfig.CompletedConfig, mgr manager.Manager) reconcile. Client: mgr.GetClient(), scheme: mgr.GetScheme(), recorder: mgr.GetEventRecorderFor(names.GatewayInternalServiceController), - option: util.NewOption(), } } @@ -137,13 +135,12 @@ func (r *ReconcileService) Reconcile(ctx context.Context, req reconcile.Request) } enableProxy, _ := util.CheckServer(ctx, r.Client) - r.option.SetProxyOption(enableProxy) - if err := r.reconcileService(ctx, req, gwList); err != nil { + if err := r.reconcileService(ctx, req, gwList, enableProxy); err != nil { err = fmt.Errorf(Format("unable to reconcile service: %s", err)) return reconcile.Result{}, err } - if err := r.reconcileEndpoint(ctx, req, gwList); err != nil { + if err := r.reconcileEndpoint(ctx, req, gwList, enableProxy); err != nil { err = fmt.Errorf(Format("unable to reconcile endpoint: %s", err)) return reconcile.Result{}, err } @@ -169,8 +166,8 @@ func (r *ReconcileService) listExposedGateway(ctx context.Context) ([]*ravenv1be return exposedGateways, nil } -func (r *ReconcileService) reconcileService(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway) error { - if len(gatewayList) == 0 || !r.option.GetProxyOption() { +func (r *ReconcileService) reconcileService(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway, enableProxy bool) error { + if len(gatewayList) == 0 || !enableProxy { return r.cleanService(ctx, req) } return r.updateService(ctx, req, gatewayList) @@ -284,13 +281,13 @@ func splitPorts(str string) []string { return ret } -func (r *ReconcileService) reconcileEndpoint(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway) error { +func (r *ReconcileService) reconcileEndpoint(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway, enableProxy bool) error { var service corev1.Service err := r.Get(ctx, req.NamespacedName, &service) if err != nil && !apierrs.IsNotFound(err) { return err } - if apierrs.IsNotFound(err) || service.DeletionTimestamp != nil || len(gatewayList) == 0 || !r.option.GetProxyOption() { + if apierrs.IsNotFound(err) || service.DeletionTimestamp != nil || len(gatewayList) == 0 || !enableProxy { return r.cleanEndpoint(ctx, req) } return r.updateEndpoint(ctx, req, &service, gatewayList) diff --git a/pkg/yurtmanager/controller/raven/gatewayinternalservice/gateway_internal_service_controller_test.go b/pkg/yurtmanager/controller/raven/gatewayinternalservice/gateway_internal_service_controller_test.go index 9225973b0b2..91a39a7c3bd 100644 --- a/pkg/yurtmanager/controller/raven/gatewayinternalservice/gateway_internal_service_controller_test.go +++ b/pkg/yurtmanager/controller/raven/gatewayinternalservice/gateway_internal_service_controller_test.go @@ -199,7 +199,6 @@ func MockReconcile() *ReconcileService { return &ReconcileService{ Client: fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build(), recorder: record.NewFakeRecorder(100), - option: util.NewOption(), } } diff --git a/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go b/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go index 448c123fafc..1593eff2847 100644 --- a/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go +++ b/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go @@ -21,7 +21,6 @@ import ( "fmt" "net" "strconv" - "sync" "time" corev1 "k8s.io/api/core/v1" @@ -64,39 +63,26 @@ func Add(ctx context.Context, c *appconfig.CompletedConfig, mgr manager.Manager) var _ reconcile.Reconciler = &ReconcileService{} -type serviceInformation struct { - mu sync.Mutex +type serviceRecord struct { data map[string]string } -func newServiceInfo() *serviceInformation { - return &serviceInformation{data: make(map[string]string, 0)} +func newServiceRecord() *serviceRecord { + return &serviceRecord{data: make(map[string]string, 0)} } -func (s *serviceInformation) write(key, val string) { - s.mu.Lock() - defer s.mu.Unlock() +func (s *serviceRecord) write(key, val string) { s.data[key] = val } -func (s *serviceInformation) read(key string) string { - s.mu.Lock() - defer s.mu.Unlock() +func (s *serviceRecord) read(key string) string { return s.data[key] } -func (s *serviceInformation) cleanup() { - s.mu.Lock() - defer s.mu.Unlock() - s.data = make(map[string]string) -} - // ReconcileService reconciles a Gateway object type ReconcileService struct { client.Client scheme *runtime.Scheme recorder record.EventRecorder - option util.Option - svcInfo *serviceInformation } // newReconciler returns a new reconcile.Reconciler @@ -105,8 +91,6 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { Client: mgr.GetClient(), scheme: mgr.GetScheme(), recorder: mgr.GetEventRecorderFor(names.GatewayPublicServiceController), - option: util.NewOption(), - svcInfo: newServiceInfo(), } } @@ -153,69 +137,33 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { func (r *ReconcileService) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { klog.V(2).Info(Format("started reconciling public service for gateway %s", req.Name)) defer klog.V(2).Info(Format("finished reconciling public service for gateway %s", req.Name)) + + enableProxy, enableTunnel := util.CheckServer(ctx, r.Client) gw, err := r.getGateway(ctx, req) - if err != nil && !apierrs.IsNotFound(err) { - klog.Error(Format("failed to get gateway %s, error %s", req.Name, err.Error())) - return reconcile.Result{}, err - } - if apierrs.IsNotFound(err) { - gw = &ravenv1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: req.Name}} + if err != nil { + if apierrs.IsNotFound(err) { + gw = &ravenv1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: req.Name}} + enableTunnel = false + enableProxy = false + } else { + klog.Error(Format("failed to get gateway %s, error %s", req.Name, err.Error())) + return reconcile.Result{}, err + } } - r.svcInfo.cleanup() - r.setOptions(ctx, gw, apierrs.IsNotFound(err)) - if err := r.reconcileService(ctx, gw.DeepCopy()); err != nil { + svcRecord := newServiceRecord() + if err := r.reconcileService(ctx, gw.DeepCopy(), svcRecord, enableTunnel, enableProxy); err != nil { err = fmt.Errorf(Format("unable to reconcile service: %s", err)) klog.Error(err.Error()) return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, err } - if err := r.reconcileEndpoints(ctx, gw.DeepCopy()); err != nil { + if err := r.reconcileEndpoints(ctx, gw.DeepCopy(), svcRecord, enableTunnel, enableProxy); err != nil { err = fmt.Errorf(Format("unable to reconcile endpoint: %s", err)) return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, err } return reconcile.Result{}, nil } -func (r *ReconcileService) setOptions(ctx context.Context, gw *ravenv1beta1.Gateway, isNotFound bool) { - r.option.SetProxyOption(true) - r.option.SetTunnelOption(true) - if isNotFound { - r.option.SetProxyOption(false) - r.option.SetTunnelOption(false) - klog.V(4).Info(Format("set option for proxy (%t) and tunnel (%t), reason gateway %s is not found", - false, false, gw.GetName())) - return - } - - if gw.DeletionTimestamp != nil { - r.option.SetProxyOption(false) - r.option.SetTunnelOption(false) - klog.V(4).Info(Format("set option for proxy (%t) and tunnel (%t), reason: gateway %s is deleted ", - false, false, gw.GetName())) - return - } - - if gw.Spec.ExposeType != ravenv1beta1.ExposeTypeLoadBalancer { - r.option.SetProxyOption(false) - r.option.SetTunnelOption(false) - klog.V(4).Info(Format("set option for proxy (%t) and tunnel (%t), reason: gateway %s exposed type is %s ", - false, false, gw.GetName(), gw.Spec.ExposeType)) - return - } - - enableProxy, enableTunnel := util.CheckServer(ctx, r.Client) - if !enableTunnel { - r.option.SetTunnelOption(enableTunnel) - klog.V(4).Info(Format("set option for tunnel (%t), reason: raven-cfg close tunnel ", false)) - } - - if !enableProxy { - r.option.SetProxyOption(enableProxy) - klog.V(4).Info(Format("set option for tunnel (%t), reason: raven-cfg close proxy ", false)) - } - return -} - func (r *ReconcileService) getGateway(ctx context.Context, req reconcile.Request) (*ravenv1beta1.Gateway, error) { var gw ravenv1beta1.Gateway err := r.Get(ctx, req.NamespacedName, &gw) @@ -225,24 +173,23 @@ func (r *ReconcileService) getGateway(ctx context.Context, req reconcile.Request return gw.DeepCopy(), nil } -func (r *ReconcileService) generateServiceName(services []corev1.Service) { +func recordServiceNames(services []corev1.Service, record *serviceRecord) { for _, svc := range services { epName := svc.Labels[util.LabelCurrentGatewayEndpoints] epType := svc.Labels[raven.LabelCurrentGatewayType] if epName == "" || epType == "" { continue } - r.svcInfo.write(formatKey(epName, epType), svc.GetName()) + record.write(formatKey(epName, epType), svc.GetName()) } return } -func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta1.Gateway) error { - enableProxy := r.option.GetProxyOption() +func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta1.Gateway, record *serviceRecord, enableTunnel, enableProxy bool) error { if enableProxy { klog.V(2).Info(Format("start manage proxy service for gateway %s", gw.GetName())) defer klog.V(2).Info(Format("finish manage proxy service for gateway %s", gw.GetName())) - if err := r.manageService(ctx, gw, ravenv1beta1.Proxy); err != nil { + if err := r.manageService(ctx, gw, ravenv1beta1.Proxy, record); err != nil { return fmt.Errorf("failed to manage service for proxy server %s", err.Error()) } } else { @@ -253,11 +200,10 @@ func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta } } - enableTunnel := r.option.GetTunnelOption() if enableTunnel { klog.V(2).Info(Format("start manage tunnel service for gateway %s", gw.GetName())) defer klog.V(2).Info(Format("finish manage tunnel service for gateway %s", gw.GetName())) - if err := r.manageService(ctx, gw, ravenv1beta1.Tunnel); err != nil { + if err := r.manageService(ctx, gw, ravenv1beta1.Tunnel, record); err != nil { return fmt.Errorf("failed to manage service for tunnel server %s", err.Error()) } } else { @@ -270,12 +216,11 @@ func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta return nil } -func (r *ReconcileService) reconcileEndpoints(ctx context.Context, gw *ravenv1beta1.Gateway) error { - enableProxy := r.option.GetProxyOption() +func (r *ReconcileService) reconcileEndpoints(ctx context.Context, gw *ravenv1beta1.Gateway, record *serviceRecord, enableTunnel, enableProxy bool) error { if enableProxy { klog.V(2).Info(Format("start manage proxy service endpoints for gateway %s", gw.GetName())) defer klog.V(2).Info(Format("finish manage proxy service endpoints for gateway %s", gw.GetName())) - if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Proxy); err != nil { + if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Proxy, record); err != nil { return fmt.Errorf("failed to manage endpoints for proxy server %s", err.Error()) } } else { @@ -285,11 +230,10 @@ func (r *ReconcileService) reconcileEndpoints(ctx context.Context, gw *ravenv1be return fmt.Errorf("failed to clear endpoints for proxy server %s", err.Error()) } } - enableTunnel := r.option.GetTunnelOption() if enableTunnel { klog.V(2).Info(Format("start manage tunnel service endpoints for gateway %s", gw.GetName())) defer klog.V(2).Info(Format("finish manage tunnel service endpoints for gateway %s", gw.GetName())) - if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Tunnel); err != nil { + if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Tunnel, record); err != nil { return fmt.Errorf("failed to manage endpoints for tunnel server %s", err.Error()) } } else { @@ -336,7 +280,7 @@ func (r *ReconcileService) clearEndpoints(ctx context.Context, gatewayName, gate return nil } -func (r *ReconcileService) manageService(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string) error { +func (r *ReconcileService) manageService(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string, record *serviceRecord) error { curSvcList, err := r.listService(ctx, gateway.GetName(), gatewayType) if err != nil { return fmt.Errorf("failed list service for gateway %s type %s , error %s", gateway.GetName(), gatewayType, err.Error()) @@ -344,7 +288,7 @@ func (r *ReconcileService) manageService(ctx context.Context, gateway *ravenv1be proxyPort, tunnelPort := r.getTargetPort() specSvcList := acquiredSpecService(gateway, gatewayType, proxyPort, tunnelPort) addSvc, updateSvc, deleteSvc := classifyService(curSvcList, specSvcList) - r.generateServiceName(specSvcList.Items) + recordServiceNames(specSvcList.Items, record) for i := 0; i < len(addSvc); i++ { if err := r.Create(ctx, addSvc[i]); err != nil { if apierrs.IsAlreadyExists(err) { @@ -367,12 +311,12 @@ func (r *ReconcileService) manageService(ctx context.Context, gateway *ravenv1be return nil } -func (r *ReconcileService) manageEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string) error { +func (r *ReconcileService) manageEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string, record *serviceRecord) error { currEpsList, err := r.listEndpoints(ctx, gateway.GetName(), gatewayType) if err != nil { return fmt.Errorf("failed list service for gateway %s type %s , error %s", gateway.GetName(), gatewayType, err.Error()) } - specEpsList := r.acquiredSpecEndpoints(ctx, gateway, gatewayType) + specEpsList := r.acquiredSpecEndpoints(ctx, gateway, gatewayType, record) addEps, updateEps, deleteEps := classifyEndpoints(currEpsList, specEpsList) for i := 0; i < len(addEps); i++ { if err := r.Create(ctx, addEps[i]); err != nil { @@ -454,7 +398,7 @@ func (r *ReconcileService) listEndpoints(ctx context.Context, gatewayName, gatew return &epsList, nil } -func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string) *corev1.EndpointsList { +func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string, record *serviceRecord) *corev1.EndpointsList { proxyPort, tunnelPort := r.getTargetPort() endpoints := make([]corev1.Endpoints, 0) for _, aep := range gateway.Status.ActiveEndpoints { @@ -467,7 +411,7 @@ func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *r } switch aep.Type { case ravenv1beta1.Proxy: - name := r.svcInfo.read(formatKey(aep.NodeName, ravenv1beta1.Proxy)) + name := record.read(formatKey(aep.NodeName, ravenv1beta1.Proxy)) if name == "" { continue } @@ -494,7 +438,7 @@ func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *r }, }) case ravenv1beta1.Tunnel: - name := r.svcInfo.read(formatKey(aep.NodeName, ravenv1beta1.Tunnel)) + name := record.read(formatKey(aep.NodeName, ravenv1beta1.Tunnel)) if name == "" { continue } @@ -540,6 +484,9 @@ func acquiredSpecService(gateway *ravenv1beta1.Gateway, gatewayType string, prox if gateway == nil { return &corev1.ServiceList{Items: services} } + if gateway.Spec.ExposeType != ravenv1beta1.ExposeTypeLoadBalancer { + return &corev1.ServiceList{Items: services} + } for _, aep := range gateway.Status.ActiveEndpoints { if aep.Type != gatewayType { continue @@ -558,6 +505,7 @@ func acquiredSpecService(gateway *ravenv1beta1.Gateway, gatewayType string, prox raven.LabelCurrentGatewayType: ravenv1beta1.Proxy, util.LabelCurrentGatewayEndpoints: aep.NodeName, }, + Annotations: map[string]string{"svc.openyurt.io/discard": "true"}, }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeLoadBalancer, @@ -584,6 +532,7 @@ func acquiredSpecService(gateway *ravenv1beta1.Gateway, gatewayType string, prox raven.LabelCurrentGatewayType: ravenv1beta1.Tunnel, util.LabelCurrentGatewayEndpoints: aep.NodeName, }, + Annotations: map[string]string{"svc.openyurt.io/discard": "true"}, }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeLoadBalancer, diff --git a/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller_test.go b/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller_test.go index 4151f0e7f74..c5deb43b8a3 100644 --- a/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller_test.go +++ b/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller_test.go @@ -203,8 +203,6 @@ func MockReconcile() *ReconcileService { return &ReconcileService{ Client: fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build(), recorder: record.NewFakeRecorder(100), - option: util.NewOption(), - svcInfo: newServiceInfo(), } } diff --git a/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_enqueue_handlers.go b/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_enqueue_handlers.go index 5147ad7c89a..02870446bb4 100644 --- a/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_enqueue_handlers.go +++ b/pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_enqueue_handlers.go @@ -38,7 +38,7 @@ func (h *EnqueueRequestForGatewayEvent) Create(e event.CreateEvent, q workqueue. klog.Error(Format("fail to assert runtime Object %s/%s to v1beta1.Gateway,", e.Object.GetNamespace(), e.Object.GetName())) return } - if gw.Spec.ExposeType == "" { + if gw.Spec.ExposeType != ravenv1beta1.ExposeTypeLoadBalancer { return } klog.V(2).Infof(Format("enqueue gateway %s as create event", gw.GetName())) @@ -68,10 +68,7 @@ func (h *EnqueueRequestForGatewayEvent) Delete(e event.DeleteEvent, q workqueue. klog.Error(Format("fail to assert runtime Object %s/%s to v1beta1.Gateway,", e.Object.GetNamespace(), e.Object.GetName())) return } - if gw.Spec.ExposeType == "" { - return - } - if gw.DeletionTimestamp != nil { + if gw.Spec.ExposeType != ravenv1beta1.ExposeTypeLoadBalancer { return } klog.V(2).Infof(Format("enqueue gateway %s as delete event", gw.GetName())) @@ -83,11 +80,13 @@ func (h *EnqueueRequestForGatewayEvent) Generic(e event.GenericEvent, q workqueu } func needUpdate(newObj, oldObj *ravenv1beta1.Gateway) bool { - if newObj.Spec.ExposeType != oldObj.Spec.ExposeType { - return true - } - if util.HashObject(newObj.Status.ActiveEndpoints) != util.HashObject(oldObj.Status.ActiveEndpoints) { - return true + if newObj.Spec.ExposeType == ravenv1beta1.ExposeTypeLoadBalancer || oldObj.Spec.ExposeType == ravenv1beta1.ExposeTypeLoadBalancer { + if newObj.Spec.ExposeType != oldObj.Spec.ExposeType { + return true + } + if util.HashObject(newObj.Status.ActiveEndpoints) != util.HashObject(oldObj.Status.ActiveEndpoints) { + return true + } } return false } diff --git a/pkg/yurtmanager/controller/raven/util/options.go b/pkg/yurtmanager/controller/raven/util/options.go deleted file mode 100644 index 8348224def5..00000000000 --- a/pkg/yurtmanager/controller/raven/util/options.go +++ /dev/null @@ -1,60 +0,0 @@ -/* -Copyright 2023 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the License); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an AS IS BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import "sync" - -type Option interface { - SetProxyOption(enable bool) - SetTunnelOption(enable bool) - GetProxyOption() bool - GetTunnelOption() bool -} - -type ServerOption struct { - mu sync.RWMutex - enableProxy bool - enableTunnel bool -} - -func NewOption() Option { - return &ServerOption{enableTunnel: false, enableProxy: false} -} - -func (o *ServerOption) SetProxyOption(enable bool) { - o.mu.Lock() - defer o.mu.Unlock() - o.enableProxy = enable -} - -func (o *ServerOption) SetTunnelOption(enable bool) { - o.mu.Lock() - defer o.mu.Unlock() - o.enableTunnel = enable -} - -func (o *ServerOption) GetProxyOption() bool { - o.mu.Lock() - defer o.mu.Unlock() - return o.enableProxy -} - -func (o *ServerOption) GetTunnelOption() bool { - o.mu.Lock() - defer o.mu.Unlock() - return o.enableTunnel -}