Skip to content

Commit

Permalink
Merge pull request #1787 from River-sh/fix_raven_controllers
Browse files Browse the repository at this point in the history
optimize raven controllers
  • Loading branch information
luckymrwang authored Nov 10, 2023
2 parents 866cf75 + ce8eafd commit 118f6a3
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ type ReconcileService struct {
client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
option util.Option
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -80,7 +79,6 @@ func newReconciler(c *appconfig.CompletedConfig, mgr manager.Manager) reconcile.
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(names.GatewayInternalServiceController),
option: util.NewOption(),
}
}

Expand Down Expand Up @@ -137,13 +135,12 @@ func (r *ReconcileService) Reconcile(ctx context.Context, req reconcile.Request)
}

enableProxy, _ := util.CheckServer(ctx, r.Client)
r.option.SetProxyOption(enableProxy)
if err := r.reconcileService(ctx, req, gwList); err != nil {
if err := r.reconcileService(ctx, req, gwList, enableProxy); err != nil {
err = fmt.Errorf(Format("unable to reconcile service: %s", err))
return reconcile.Result{}, err
}

if err := r.reconcileEndpoint(ctx, req, gwList); err != nil {
if err := r.reconcileEndpoint(ctx, req, gwList, enableProxy); err != nil {
err = fmt.Errorf(Format("unable to reconcile endpoint: %s", err))
return reconcile.Result{}, err
}
Expand All @@ -169,8 +166,8 @@ func (r *ReconcileService) listExposedGateway(ctx context.Context) ([]*ravenv1be
return exposedGateways, nil
}

func (r *ReconcileService) reconcileService(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway) error {
if len(gatewayList) == 0 || !r.option.GetProxyOption() {
func (r *ReconcileService) reconcileService(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway, enableProxy bool) error {
if len(gatewayList) == 0 || !enableProxy {
return r.cleanService(ctx, req)
}
return r.updateService(ctx, req, gatewayList)
Expand Down Expand Up @@ -284,13 +281,13 @@ func splitPorts(str string) []string {
return ret
}

func (r *ReconcileService) reconcileEndpoint(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway) error {
func (r *ReconcileService) reconcileEndpoint(ctx context.Context, req ctrl.Request, gatewayList []*ravenv1beta1.Gateway, enableProxy bool) error {
var service corev1.Service
err := r.Get(ctx, req.NamespacedName, &service)
if err != nil && !apierrs.IsNotFound(err) {
return err
}
if apierrs.IsNotFound(err) || service.DeletionTimestamp != nil || len(gatewayList) == 0 || !r.option.GetProxyOption() {
if apierrs.IsNotFound(err) || service.DeletionTimestamp != nil || len(gatewayList) == 0 || !enableProxy {
return r.cleanEndpoint(ctx, req)
}
return r.updateEndpoint(ctx, req, &service, gatewayList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func MockReconcile() *ReconcileService {
return &ReconcileService{
Client: fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build(),
recorder: record.NewFakeRecorder(100),
option: util.NewOption(),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"net"
"strconv"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -64,39 +63,26 @@ func Add(ctx context.Context, c *appconfig.CompletedConfig, mgr manager.Manager)

var _ reconcile.Reconciler = &ReconcileService{}

type serviceInformation struct {
mu sync.Mutex
type serviceRecord struct {
data map[string]string
}

func newServiceInfo() *serviceInformation {
return &serviceInformation{data: make(map[string]string, 0)}
func newServiceRecord() *serviceRecord {
return &serviceRecord{data: make(map[string]string, 0)}
}
func (s *serviceInformation) write(key, val string) {
s.mu.Lock()
defer s.mu.Unlock()
func (s *serviceRecord) write(key, val string) {
s.data[key] = val
}

func (s *serviceInformation) read(key string) string {
s.mu.Lock()
defer s.mu.Unlock()
func (s *serviceRecord) read(key string) string {
return s.data[key]
}

func (s *serviceInformation) cleanup() {
s.mu.Lock()
defer s.mu.Unlock()
s.data = make(map[string]string)
}

// ReconcileService reconciles a Gateway object
type ReconcileService struct {
client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
option util.Option
svcInfo *serviceInformation
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -105,8 +91,6 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(names.GatewayPublicServiceController),
option: util.NewOption(),
svcInfo: newServiceInfo(),
}
}

Expand Down Expand Up @@ -153,69 +137,33 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
func (r *ReconcileService) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
klog.V(2).Info(Format("started reconciling public service for gateway %s", req.Name))
defer klog.V(2).Info(Format("finished reconciling public service for gateway %s", req.Name))

enableProxy, enableTunnel := util.CheckServer(ctx, r.Client)
gw, err := r.getGateway(ctx, req)
if err != nil && !apierrs.IsNotFound(err) {
klog.Error(Format("failed to get gateway %s, error %s", req.Name, err.Error()))
return reconcile.Result{}, err
}
if apierrs.IsNotFound(err) {
gw = &ravenv1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: req.Name}}
if err != nil {
if apierrs.IsNotFound(err) {
gw = &ravenv1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: req.Name}}
enableTunnel = false
enableProxy = false
} else {
klog.Error(Format("failed to get gateway %s, error %s", req.Name, err.Error()))
return reconcile.Result{}, err
}
}
r.svcInfo.cleanup()
r.setOptions(ctx, gw, apierrs.IsNotFound(err))
if err := r.reconcileService(ctx, gw.DeepCopy()); err != nil {
svcRecord := newServiceRecord()
if err := r.reconcileService(ctx, gw.DeepCopy(), svcRecord, enableTunnel, enableProxy); err != nil {
err = fmt.Errorf(Format("unable to reconcile service: %s", err))
klog.Error(err.Error())
return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, err
}

if err := r.reconcileEndpoints(ctx, gw.DeepCopy()); err != nil {
if err := r.reconcileEndpoints(ctx, gw.DeepCopy(), svcRecord, enableTunnel, enableProxy); err != nil {
err = fmt.Errorf(Format("unable to reconcile endpoint: %s", err))
return reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second}, err
}
return reconcile.Result{}, nil
}

func (r *ReconcileService) setOptions(ctx context.Context, gw *ravenv1beta1.Gateway, isNotFound bool) {
r.option.SetProxyOption(true)
r.option.SetTunnelOption(true)
if isNotFound {
r.option.SetProxyOption(false)
r.option.SetTunnelOption(false)
klog.V(4).Info(Format("set option for proxy (%t) and tunnel (%t), reason gateway %s is not found",
false, false, gw.GetName()))
return
}

if gw.DeletionTimestamp != nil {
r.option.SetProxyOption(false)
r.option.SetTunnelOption(false)
klog.V(4).Info(Format("set option for proxy (%t) and tunnel (%t), reason: gateway %s is deleted ",
false, false, gw.GetName()))
return
}

if gw.Spec.ExposeType != ravenv1beta1.ExposeTypeLoadBalancer {
r.option.SetProxyOption(false)
r.option.SetTunnelOption(false)
klog.V(4).Info(Format("set option for proxy (%t) and tunnel (%t), reason: gateway %s exposed type is %s ",
false, false, gw.GetName(), gw.Spec.ExposeType))
return
}

enableProxy, enableTunnel := util.CheckServer(ctx, r.Client)
if !enableTunnel {
r.option.SetTunnelOption(enableTunnel)
klog.V(4).Info(Format("set option for tunnel (%t), reason: raven-cfg close tunnel ", false))
}

if !enableProxy {
r.option.SetProxyOption(enableProxy)
klog.V(4).Info(Format("set option for tunnel (%t), reason: raven-cfg close proxy ", false))
}
return
}

func (r *ReconcileService) getGateway(ctx context.Context, req reconcile.Request) (*ravenv1beta1.Gateway, error) {
var gw ravenv1beta1.Gateway
err := r.Get(ctx, req.NamespacedName, &gw)
Expand All @@ -225,24 +173,23 @@ func (r *ReconcileService) getGateway(ctx context.Context, req reconcile.Request
return gw.DeepCopy(), nil
}

func (r *ReconcileService) generateServiceName(services []corev1.Service) {
func recordServiceNames(services []corev1.Service, record *serviceRecord) {
for _, svc := range services {
epName := svc.Labels[util.LabelCurrentGatewayEndpoints]
epType := svc.Labels[raven.LabelCurrentGatewayType]
if epName == "" || epType == "" {
continue
}
r.svcInfo.write(formatKey(epName, epType), svc.GetName())
record.write(formatKey(epName, epType), svc.GetName())
}
return
}

func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta1.Gateway) error {
enableProxy := r.option.GetProxyOption()
func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta1.Gateway, record *serviceRecord, enableTunnel, enableProxy bool) error {
if enableProxy {
klog.V(2).Info(Format("start manage proxy service for gateway %s", gw.GetName()))
defer klog.V(2).Info(Format("finish manage proxy service for gateway %s", gw.GetName()))
if err := r.manageService(ctx, gw, ravenv1beta1.Proxy); err != nil {
if err := r.manageService(ctx, gw, ravenv1beta1.Proxy, record); err != nil {
return fmt.Errorf("failed to manage service for proxy server %s", err.Error())
}
} else {
Expand All @@ -253,11 +200,10 @@ func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta
}
}

enableTunnel := r.option.GetTunnelOption()
if enableTunnel {
klog.V(2).Info(Format("start manage tunnel service for gateway %s", gw.GetName()))
defer klog.V(2).Info(Format("finish manage tunnel service for gateway %s", gw.GetName()))
if err := r.manageService(ctx, gw, ravenv1beta1.Tunnel); err != nil {
if err := r.manageService(ctx, gw, ravenv1beta1.Tunnel, record); err != nil {
return fmt.Errorf("failed to manage service for tunnel server %s", err.Error())
}
} else {
Expand All @@ -270,12 +216,11 @@ func (r *ReconcileService) reconcileService(ctx context.Context, gw *ravenv1beta
return nil
}

func (r *ReconcileService) reconcileEndpoints(ctx context.Context, gw *ravenv1beta1.Gateway) error {
enableProxy := r.option.GetProxyOption()
func (r *ReconcileService) reconcileEndpoints(ctx context.Context, gw *ravenv1beta1.Gateway, record *serviceRecord, enableTunnel, enableProxy bool) error {
if enableProxy {
klog.V(2).Info(Format("start manage proxy service endpoints for gateway %s", gw.GetName()))
defer klog.V(2).Info(Format("finish manage proxy service endpoints for gateway %s", gw.GetName()))
if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Proxy); err != nil {
if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Proxy, record); err != nil {
return fmt.Errorf("failed to manage endpoints for proxy server %s", err.Error())
}
} else {
Expand All @@ -285,11 +230,10 @@ func (r *ReconcileService) reconcileEndpoints(ctx context.Context, gw *ravenv1be
return fmt.Errorf("failed to clear endpoints for proxy server %s", err.Error())
}
}
enableTunnel := r.option.GetTunnelOption()
if enableTunnel {
klog.V(2).Info(Format("start manage tunnel service endpoints for gateway %s", gw.GetName()))
defer klog.V(2).Info(Format("finish manage tunnel service endpoints for gateway %s", gw.GetName()))
if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Tunnel); err != nil {
if err := r.manageEndpoints(ctx, gw, ravenv1beta1.Tunnel, record); err != nil {
return fmt.Errorf("failed to manage endpoints for tunnel server %s", err.Error())
}
} else {
Expand Down Expand Up @@ -336,15 +280,15 @@ func (r *ReconcileService) clearEndpoints(ctx context.Context, gatewayName, gate
return nil
}

func (r *ReconcileService) manageService(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string) error {
func (r *ReconcileService) manageService(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string, record *serviceRecord) error {
curSvcList, err := r.listService(ctx, gateway.GetName(), gatewayType)
if err != nil {
return fmt.Errorf("failed list service for gateway %s type %s , error %s", gateway.GetName(), gatewayType, err.Error())
}
proxyPort, tunnelPort := r.getTargetPort()
specSvcList := acquiredSpecService(gateway, gatewayType, proxyPort, tunnelPort)
addSvc, updateSvc, deleteSvc := classifyService(curSvcList, specSvcList)
r.generateServiceName(specSvcList.Items)
recordServiceNames(specSvcList.Items, record)
for i := 0; i < len(addSvc); i++ {
if err := r.Create(ctx, addSvc[i]); err != nil {
if apierrs.IsAlreadyExists(err) {
Expand All @@ -367,12 +311,12 @@ func (r *ReconcileService) manageService(ctx context.Context, gateway *ravenv1be
return nil
}

func (r *ReconcileService) manageEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string) error {
func (r *ReconcileService) manageEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string, record *serviceRecord) error {
currEpsList, err := r.listEndpoints(ctx, gateway.GetName(), gatewayType)
if err != nil {
return fmt.Errorf("failed list service for gateway %s type %s , error %s", gateway.GetName(), gatewayType, err.Error())
}
specEpsList := r.acquiredSpecEndpoints(ctx, gateway, gatewayType)
specEpsList := r.acquiredSpecEndpoints(ctx, gateway, gatewayType, record)
addEps, updateEps, deleteEps := classifyEndpoints(currEpsList, specEpsList)
for i := 0; i < len(addEps); i++ {
if err := r.Create(ctx, addEps[i]); err != nil {
Expand Down Expand Up @@ -454,7 +398,7 @@ func (r *ReconcileService) listEndpoints(ctx context.Context, gatewayName, gatew
return &epsList, nil
}

func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string) *corev1.EndpointsList {
func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *ravenv1beta1.Gateway, gatewayType string, record *serviceRecord) *corev1.EndpointsList {
proxyPort, tunnelPort := r.getTargetPort()
endpoints := make([]corev1.Endpoints, 0)
for _, aep := range gateway.Status.ActiveEndpoints {
Expand All @@ -467,7 +411,7 @@ func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *r
}
switch aep.Type {
case ravenv1beta1.Proxy:
name := r.svcInfo.read(formatKey(aep.NodeName, ravenv1beta1.Proxy))
name := record.read(formatKey(aep.NodeName, ravenv1beta1.Proxy))
if name == "" {
continue
}
Expand All @@ -494,7 +438,7 @@ func (r *ReconcileService) acquiredSpecEndpoints(ctx context.Context, gateway *r
},
})
case ravenv1beta1.Tunnel:
name := r.svcInfo.read(formatKey(aep.NodeName, ravenv1beta1.Tunnel))
name := record.read(formatKey(aep.NodeName, ravenv1beta1.Tunnel))
if name == "" {
continue
}
Expand Down Expand Up @@ -540,6 +484,9 @@ func acquiredSpecService(gateway *ravenv1beta1.Gateway, gatewayType string, prox
if gateway == nil {
return &corev1.ServiceList{Items: services}
}
if gateway.Spec.ExposeType != ravenv1beta1.ExposeTypeLoadBalancer {
return &corev1.ServiceList{Items: services}
}
for _, aep := range gateway.Status.ActiveEndpoints {
if aep.Type != gatewayType {
continue
Expand All @@ -558,6 +505,7 @@ func acquiredSpecService(gateway *ravenv1beta1.Gateway, gatewayType string, prox
raven.LabelCurrentGatewayType: ravenv1beta1.Proxy,
util.LabelCurrentGatewayEndpoints: aep.NodeName,
},
Annotations: map[string]string{"svc.openyurt.io/discard": "true"},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeLoadBalancer,
Expand All @@ -584,6 +532,7 @@ func acquiredSpecService(gateway *ravenv1beta1.Gateway, gatewayType string, prox
raven.LabelCurrentGatewayType: ravenv1beta1.Tunnel,
util.LabelCurrentGatewayEndpoints: aep.NodeName,
},
Annotations: map[string]string{"svc.openyurt.io/discard": "true"},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeLoadBalancer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ func MockReconcile() *ReconcileService {
return &ReconcileService{
Client: fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build(),
recorder: record.NewFakeRecorder(100),
option: util.NewOption(),
svcInfo: newServiceInfo(),
}
}

Expand Down
Loading

0 comments on commit 118f6a3

Please sign in to comment.