Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize raven controllers #1787

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ type ReconcileService struct {
client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
option util.Option
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -80,7 +79,6 @@ func newReconciler(c *appconfig.CompletedConfig, mgr manager.Manager) reconcile.
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(names.GatewayInternalServiceController),
option: util.NewOption(),
}
}

Expand Down Expand Up @@ -137,13 +135,12 @@ func (r *ReconcileService) Reconcile(ctx context.Context, req reconcile.Request)
}

enableProxy, _ := util.CheckServer(ctx, r.Client)
r.option.SetProxyOption(enableProxy)
if err := r.reconcileService(ctx, req, gwList); err != nil {
if err := r.reconcileService(ctx, req, gwList, enableProxy); err != nil {
err = fmt.Errorf(Format("unable to reconcile service: %s", err))
return reconcile.Result{}, err
}

if err := r.reconcileEndpoint(ctx, req, gwList); err != nil {
if err := r.reconcileEndpoint(ctx, req, gwList, enableProxy); err != nil {
err = fmt.Errorf(Format("unable to reconcile endpoint: %s", err))
return reconcile.Result{}, err
}
Expand All @@ -169,8 +166,8 @@ func (r *ReconcileService) listExposedGateway(ctx context.Context) ([]*ravenv1be
return exposedGateways, nil
}

func (r *ReconcileService) reconcileService(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway) error {
if len(gatewayList) == 0 || !r.option.GetProxyOption() {
func (r *ReconcileService) reconcileService(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway, enableProxy bool) error {
if len(gatewayList) == 0 || !enableProxy {
return r.cleanService(ctx, req)
}
return r.updateService(ctx, req, gatewayList)
Expand Down Expand Up @@ -284,13 +281,13 @@ func splitPorts(str string) []string {
return ret
}

func (r *ReconcileService) reconcileEndpoint(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway) error {
func (r *ReconcileService) reconcileEndpoint(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway, enableProxy bool) error {
var service corev1.Service
err := r.Get(ctx, req.NamespacedName, &service)
if err != nil && !apierrs.IsNotFound(err) {
return err
}
if apierrs.IsNotFound(err) || service.DeletionTimestamp != nil || len(gatewayList) == 0 || !r.option.GetProxyOption() {
if apierrs.IsNotFound(err) || service.DeletionTimestamp != nil || len(gatewayList) == 0 || !enableProxy {
return r.cleanEndpoint(ctx, req)
}
return r.updateEndpoint(ctx, req, &service, gatewayList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func MockReconcile() *ReconcileService {
return &ReconcileService{
Client: fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build(),
recorder: record.NewFakeRecorder(100),
option: util.NewOption(),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"fmt"
"net"
"strconv"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -64,39 +63,26 @@

var _ reconcile.Reconciler = &ReconcileService{}

type serviceInformation struct {
mu sync.Mutex
type serviceRecord struct {
data map[string]string
}

func newServiceInfo() *serviceInformation {
return &serviceInformation{data: make(map[string]string, 0)}
func newServiceRecord() *serviceRecord {
return &serviceRecord{data: make(map[string]string, 0)}
}
func (s *serviceInformation) write(key, val string) {
s.mu.Lock()
defer s.mu.Unlock()
func (s *serviceRecord) write(key, val string) {
s.data[key] = val
}

func (s *serviceInformation) read(key string) string {
s.mu.Lock()
defer s.mu.Unlock()
func (s *serviceRecord) read(key string) string {
return s.data[key]
}

func (s *serviceInformation) cleanup() {
s.mu.Lock()
defer s.mu.Unlock()
s.data = make(map[string]string)
}

// ReconcileService reconciles a Gateway object
type ReconcileService struct {
client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
option util.Option
svcInfo *serviceInformation
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -105,8 +91,6 @@
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(names.GatewayPublicServiceController),
option: util.NewOption(),
svcInfo: newServiceInfo(),
}
}

Expand Down Expand Up @@ -153,69 +137,33 @@
func (r *ReconcileService) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
klog.V(2).Info(Format("started reconciling public service for gateway %s", req.Name))
defer klog.V(2).Info(Format("finished reconciling public service for gateway %s", req.Name))

enableProxy, enableTunnel := util.CheckServer(ctx, r.Client)
gw, err := r.getGateway(ctx, req)
if err != nil && !apierrs.IsNotFound(err) {
klog.Error(Format("failed to get gateway %s, error %s", req.Name, err.Error()))
return reconcile.Result{}, err
}
if apierrs.IsNotFound(err) {
gw = &ravenv1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: req.Name}}
if err != nil {
if apierrs.IsNotFound(err) {
gw = &ravenv1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: req.Name}}
enableTunnel = false
enableProxy = false
} else {
klog.Error(Format("failed to get gateway %s, error %s", req.Name, err.Error()))
return reconcile.Result{}, err
}

Check warning on line 151 in pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go#L144-L151

Added lines #L144 - L151 were not covered by tests
}
r.svcInfo.cleanup()
r.setOptions(ctx, gw, apierrs.IsNotFound(err))
if err := r.reconcileService(ctx, gw.DeepCopy()); err != nil {
svcRecord := newServiceRecord()
if err := r.reconcileService(ctx, gw.DeepCopy(), svcRecord, enableTunnel, enableProxy); err != nil {
err = fmt.Errorf(Format("unable to reconcile service: %s", err))
klog.Error(err.Error())
return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, err
}

if err := r.reconcileEndpoints(ctx, gw.DeepCopy()); err != nil {
if err := r.reconcileEndpoints(ctx, gw.DeepCopy(), svcRecord, enableTunnel, enableProxy); err != nil {
err = fmt.Errorf(Format("unable to reconcile endpoint: %s", err))
return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, err
}
return reconcile.Result{}, nil
}

func (r *ReconcileService) setOptions(ctx context.Context, gw *ravenv1beta1.Gateway, isNotFound bool) {
r.option.SetProxyOption(true)
r.option.SetTunnelOption(true)
if isNotFound {
r.option.SetProxyOption(false)
r.option.SetTunnelOption(false)
klog.V(4).Info(Format("set option for proxy (%t) and tunnel (%t), reason gateway %s is not found",
false, false, gw.GetName()))
return
}

if gw.DeletionTimestamp != nil {
r.option.SetProxyOption(false)
r.option.SetTunnelOption(false)
klog.V(4).Info(Format("set option for proxy (%t) and tunnel (%t), reason: gateway %s is deleted ",
false, false, gw.GetName()))
return
}

if gw.Spec.ExposeType != ravenv1beta1.ExposeTypeLoadBalancer {
r.option.SetProxyOption(false)
r.option.SetTunnelOption(false)
klog.V(4).Info(Format("set option for proxy (%t) and tunnel (%t), reason: gateway %s exposed type is %s ",
false, false, gw.GetName(), gw.Spec.ExposeType))
return
}

enableProxy, enableTunnel := util.CheckServer(ctx, r.Client)
if !enableTunnel {
r.option.SetTunnelOption(enableTunnel)
klog.V(4).Info(Format("set option for tunnel (%t), reason: raven-cfg close tunnel ", false))
}

if !enableProxy {
r.option.SetProxyOption(enableProxy)
klog.V(4).Info(Format("set option for tunnel (%t), reason: raven-cfg close proxy ", false))
}
return
}

func (r *ReconcileService) getGateway(ctx context.Context, req reconcile.Request) (*ravenv1beta1.Gateway, error) {
var gw ravenv1beta1.Gateway
err := r.Get(ctx, req.NamespacedName, &gw)
Expand All @@ -225,24 +173,23 @@
return gw.DeepCopy(), nil
}

func (r *ReconcileService) generateServiceName(services []corev1.Service) {
func recordServiceNames(services []corev1.Service, record *serviceRecord) {
for _, svc := range services {
epName := svc.Labels[util.LabelCurrentGatewayEndpoints]
epType := svc.Labels[raven.LabelCurrentGatewayType]
if epName == "" || epType == "" {
continue
}
r.svcInfo.write(formatKey(epName, epType), svc.GetName())
record.write(formatKey(epName, epType), svc.GetName())
}
return
}

func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta1.Gateway) error {
enableProxy := r.option.GetProxyOption()
func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta1.Gateway, record *serviceRecord, enableTunnel, enableProxy bool) error {
if enableProxy {
klog.V(2).Info(Format("start manage proxy service for gateway %s", gw.GetName()))
defer klog.V(2).Info(Format("finish manage proxy service for gateway %s", gw.GetName()))
if err := r.manageService(ctx, gw, ravenv1beta1.Proxy); err != nil {
if err := r.manageService(ctx, gw, ravenv1beta1.Proxy, record); err != nil {
return fmt.Errorf("failed to manage service for proxy server %s", err.Error())
}
} else {
Expand All @@ -253,11 +200,10 @@
}
}

enableTunnel := r.option.GetTunnelOption()
if enableTunnel {
klog.V(2).Info(Format("start manage tunnel service for gateway %s", gw.GetName()))
defer klog.V(2).Info(Format("finish manage tunnel service for gateway %s", gw.GetName()))
if err := r.manageService(ctx, gw, ravenv1beta1.Tunnel); err != nil {
if err := r.manageService(ctx, gw, ravenv1beta1.Tunnel, record); err != nil {
return fmt.Errorf("failed to manage service for tunnel server %s", err.Error())
}
} else {
Expand All @@ -270,12 +216,11 @@
return nil
}

func (r *ReconcileService) reconcileEndpoints(ctx context.Context, gw *ravenv1beta1.Gateway) error {
enableProxy := r.option.GetProxyOption()
func (r *ReconcileService) reconcileEndpoints(ctx context.Context, gw *ravenv1beta1.Gateway, record *serviceRecord, enableTunnel, enableProxy bool) error {
if enableProxy {
klog.V(2).Info(Format("start manage proxy service endpoints for gateway %s", gw.GetName()))
defer klog.V(2).Info(Format("finish manage proxy service endpoints for gateway %s", gw.GetName()))
if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Proxy); err != nil {
if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Proxy, record); err != nil {
return fmt.Errorf("failed to manage endpoints for proxy server %s", err.Error())
}
} else {
Expand All @@ -285,11 +230,10 @@
return fmt.Errorf("failed to clear endpoints for proxy server %s", err.Error())
}
}
enableTunnel := r.option.GetTunnelOption()
if enableTunnel {
klog.V(2).Info(Format("start manage tunnel service endpoints for gateway %s", gw.GetName()))
defer klog.V(2).Info(Format("finish manage tunnel service endpoints for gateway %s", gw.GetName()))
if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Tunnel); err != nil {
if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Tunnel, record); err != nil {
return fmt.Errorf("failed to manage endpoints for tunnel server %s", err.Error())
}
} else {
Expand Down Expand Up @@ -336,15 +280,15 @@
return nil
}

func (r *ReconcileService) manageService(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string) error {
func (r *ReconcileService) manageService(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string, record *serviceRecord) error {
curSvcList, err := r.listService(ctx, gateway.GetName(), gatewayType)
if err != nil {
return fmt.Errorf("failed list service for gateway %s type %s , error %s", gateway.GetName(), gatewayType, err.Error())
}
proxyPort, tunnelPort := r.getTargetPort()
specSvcList := acquiredSpecService(gateway, gatewayType, proxyPort, tunnelPort)
addSvc, updateSvc, deleteSvc := classifyService(curSvcList, specSvcList)
r.generateServiceName(specSvcList.Items)
recordServiceNames(specSvcList.Items, record)
for i := 0; i < len(addSvc); i++ {
if err := r.Create(ctx, addSvc[i]); err != nil {
if apierrs.IsAlreadyExists(err) {
Expand All @@ -367,12 +311,12 @@
return nil
}

func (r *ReconcileService) manageEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string) error {
func (r *ReconcileService) manageEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string, record *serviceRecord) error {
currEpsList, err := r.listEndpoints(ctx, gateway.GetName(), gatewayType)
if err != nil {
return fmt.Errorf("failed list service for gateway %s type %s , error %s", gateway.GetName(), gatewayType, err.Error())
}
specEpsList := r.acquiredSpecEndpoints(ctx, gateway, gatewayType)
specEpsList := r.acquiredSpecEndpoints(ctx, gateway, gatewayType, record)
addEps, updateEps, deleteEps := classifyEndpoints(currEpsList, specEpsList)
for i := 0; i < len(addEps); i++ {
if err := r.Create(ctx, addEps[i]); err != nil {
Expand Down Expand Up @@ -454,7 +398,7 @@
return &epsList, nil
}

func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string) *corev1.EndpointsList {
func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string, record *serviceRecord) *corev1.EndpointsList {
proxyPort, tunnelPort := r.getTargetPort()
endpoints := make([]corev1.Endpoints, 0)
for _, aep := range gateway.Status.ActiveEndpoints {
Expand All @@ -467,7 +411,7 @@
}
switch aep.Type {
case ravenv1beta1.Proxy:
name := r.svcInfo.read(formatKey(aep.NodeName, ravenv1beta1.Proxy))
name := record.read(formatKey(aep.NodeName, ravenv1beta1.Proxy))
if name == "" {
continue
}
Expand All @@ -494,7 +438,7 @@
},
})
case ravenv1beta1.Tunnel:
name := r.svcInfo.read(formatKey(aep.NodeName, ravenv1beta1.Tunnel))
name := record.read(formatKey(aep.NodeName, ravenv1beta1.Tunnel))

Check warning on line 441 in pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go#L441

Added line #L441 was not covered by tests
if name == "" {
continue
}
Expand Down Expand Up @@ -540,6 +484,9 @@
if gateway == nil {
return &corev1.ServiceList{Items: services}
}
if gateway.Spec.ExposeType != ravenv1beta1.ExposeTypeLoadBalancer {
return &corev1.ServiceList{Items: services}
}

Check warning on line 489 in pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go#L488-L489

Added lines #L488 - L489 were not covered by tests
for _, aep := range gateway.Status.ActiveEndpoints {
if aep.Type != gatewayType {
continue
Expand All @@ -558,6 +505,7 @@
raven.LabelCurrentGatewayType: ravenv1beta1.Proxy,
util.LabelCurrentGatewayEndpoints: aep.NodeName,
},
Annotations: map[string]string{"svc.openyurt.io/discard": "true"},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeLoadBalancer,
Expand All @@ -584,6 +532,7 @@
raven.LabelCurrentGatewayType: ravenv1beta1.Tunnel,
util.LabelCurrentGatewayEndpoints: aep.NodeName,
},
Annotations: map[string]string{"svc.openyurt.io/discard": "true"},

Check warning on line 535 in pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurtmanager/controller/raven/gatewaypublicservice/gateway_public_service_controller.go#L535

Added line #L535 was not covered by tests
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeLoadBalancer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ func MockReconcile() *ReconcileService {
return &ReconcileService{
Client: fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build(),
recorder: record.NewFakeRecorder(100),
option: util.NewOption(),
svcInfo: newServiceInfo(),
}
}

Expand Down
Loading
Loading